Skip to content

Commit

Permalink
-
Browse files Browse the repository at this point in the history
  • Loading branch information
kramerul committed Nov 20, 2023
1 parent 6bfad77 commit e15ba78
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 58 deletions.
19 changes: 19 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash

set -e

./gradlew clean publishToMavenLocal
mvn install:install-file \
-Dfile=core/build/libs/calcite-core-1.35.0-SNAPSHOT.jar \
-DgroupId=org.apache.calcite \
-DartifactId=calcite-core-sap \
-Dversion=1.35.0 \
-Dpackaging=jar \
-DlocalRepositoryPath=../nucleus-resources-app/patches/
mvn install:install-file \
-Dfile=core/build/libs/calcite-core-1.35.0-SNAPSHOT.jar \
-DgroupId=org.apache.calcite \
-DartifactId=calcite-core-sap \
-Dversion=1.35.0 \
-Dpackaging=jar \
-DlocalRepositoryPath=/Users/d001323/.m2/repository
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.calcite.adapter.jdbc;

import org.apache.calcite.rel.core.CorrelationId;

import java.lang.reflect.Type;

public interface JdbcCorrelationVariableBuilder {
int createCorrelationVariable(CorrelationId id, int ordinal, Type type);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,69 @@

import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.rel2sql.RelToSqlConverter;
import org.apache.calcite.rel.rel2sql.SqlImplementor;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.rex.RexCorrelVariable;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlDynamicParam;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.Util;

import java.lang.reflect.Type;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
import java.util.Set;

/**
* State for generating a SQL statement.
*/
public class JdbcImplementor extends RelToSqlConverter {
public JdbcImplementor(SqlDialect dialect, JavaTypeFactory typeFactory) {

private final JdbcCorrelationVariableBuilder correlationVariableBuilder;
private final JavaTypeFactory typeFactory;

public JdbcImplementor(SqlDialect dialect, JavaTypeFactory typeFactory, JdbcCorrelationVariableBuilder correlationVariableBuilder) {
super(dialect);
Util.discard(typeFactory);
this. typeFactory= typeFactory;
this.correlationVariableBuilder = correlationVariableBuilder;
}

public JdbcImplementor(SqlDialect dialect, JavaTypeFactory typeFactory) {
this(dialect, typeFactory, new JdbcCorrelationVariableBuilder() {
private int counter = 1;
@Override
public int createCorrelationVariable(CorrelationId id, int ordinal, Type type) {
return counter++;
}
});
}

public Result implement(RelNode node) {
return dispatch(node);
}

@Override
protected Context getAliasContext(RexCorrelVariable variable){
Context context = correlTableMap.get(variable.id);
if ( context != null) return context;
List<RelDataTypeField> fieldList = variable.getType().getFieldList();
return new Context(dialect,fieldList.size()) {
@Override
public SqlNode field(int ordinal) {
RelDataTypeField field = fieldList.get(ordinal);
return new SqlDynamicParam(correlationVariableBuilder.createCorrelationVariable(variable.id,ordinal, typeFactory.getJavaClass(field.getType())),SqlParserPos.ZERO);
}

@Override
public SqlImplementor implementor() {
return JdbcImplementor.this;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,16 @@
*/
package org.apache.calcite.adapter.jdbc;

import com.google.common.collect.ImmutableList;
import org.apache.calcite.DataContext;
import org.apache.calcite.adapter.enumerable.EnumerableRel;
import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
import org.apache.calcite.adapter.enumerable.JavaRowFormat;
import org.apache.calcite.adapter.enumerable.PhysType;
import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
import org.apache.calcite.adapter.enumerable.RexImpTable;
import org.apache.calcite.adapter.enumerable.*;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.config.CalciteSystemProperty;
import org.apache.calcite.linq4j.tree.BlockBuilder;
import org.apache.calcite.linq4j.tree.ConstantExpression;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.linq4j.tree.ParameterExpression;
import org.apache.calcite.linq4j.tree.Primitive;
import org.apache.calcite.linq4j.tree.UnaryExpression;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.linq4j.tree.*;
import org.apache.calcite.plan.*;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.convert.ConverterImpl;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.runtime.Hook;
Expand All @@ -48,23 +35,22 @@
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.util.SqlString;
import org.apache.calcite.util.BuiltInMethod;

import org.checkerframework.checker.nullness.qual.Nullable;

import javax.sql.DataSource;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Type;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.TimeZone;
import java.util.stream.Collectors;
import javax.sql.DataSource;

import static org.apache.calcite.linq4j.Nullness.castNonNull;

import static java.util.Objects.requireNonNull;
import static org.apache.calcite.linq4j.Nullness.castNonNull;

/**
* Relational expression representing a scan of a table in a JDBC data source.
Expand Down Expand Up @@ -105,7 +91,16 @@ protected JdbcToEnumerableConverter(
final JdbcConvention jdbcConvention =
(JdbcConvention) requireNonNull(child.getConvention(),
() -> "child.getConvention() is null for " + child);
SqlString sqlString = generateSql(jdbcConvention.dialect);
ImmutableList.Builder<Expression> parameterBuilder = new ImmutableList.Builder<>();
JdbcCorrelationVariableBuilder correlationVariableBuilder = new JdbcCorrelationVariableBuilder() {
int index = 1;
@Override
public int createCorrelationVariable(CorrelationId id, int ordinal, Type type) {
parameterBuilder.add(implementor.getCorrelVariableGetter(id.getName()).field(builder0,ordinal,type));
return index++;
}
};
SqlString sqlString = generateSql(jdbcConvention.dialect,correlationVariableBuilder);
String sql = sqlString.getSql();
if (CalciteSystemProperty.DEBUG.value()) {
System.out.println("[" + sql + "]");
Expand Down Expand Up @@ -174,13 +169,21 @@ protected JdbcToEnumerableConverter(

if (sqlString.getDynamicParameters() != null
&& !sqlString.getDynamicParameters().isEmpty()) {
final Expression preparedStatementConsumer_ =
builder0.append("preparedStatementConsumer",
Expressions.call(BuiltInMethod.CREATE_ENRICHER.method,
Expressions.newArrayInit(Integer.class, 1,
toIndexesTableExpression(sqlString)),
DataContext.ROOT));

ImmutableList<Expression> parameters = parameterBuilder.build();
final Expression preparedStatementConsumer_;
if (parameters.isEmpty()) {
preparedStatementConsumer_ =
builder0.append("preparedStatementConsumer",
Expressions.call(BuiltInMethod.CREATE_ENRICHER.method,
Expressions.newArrayInit(Integer.class, 1,
toIndexesTableExpression(sqlString)),
DataContext.ROOT));
}
else {
preparedStatementConsumer_ = builder0.append("preparedStatementConsumer",
Expressions.call(BuiltInMethod.CREATE_CORRELATION_ENRICHER.method,
Expressions.newArrayInit(Object.class,1,parameters)));
}
enumerable =
builder0.append("enumerable",
Expressions.call(
Expand Down Expand Up @@ -356,10 +359,10 @@ private static String jdbcGetMethod(@Nullable Primitive primitive) {
: "get" + SqlFunctions.initcap(castNonNull(primitive.primitiveName));
}

private SqlString generateSql(SqlDialect dialect) {
private SqlString generateSql(SqlDialect dialect, JdbcCorrelationVariableBuilder correlationVariableBuilder) {
final JdbcImplementor jdbcImplementor =
new JdbcImplementor(dialect,
(JavaTypeFactory) getCluster().getTypeFactory());
(JavaTypeFactory) getCluster().getTypeFactory(), correlationVariableBuilder);
final JdbcImplementor.Result result =
jdbcImplementor.visitRoot(this.getInput());
return result.asStatement().toSqlString(dialect);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,12 @@ public SqlNode toSql(@Nullable RexProgram program, RexNode rex) {
final Context correlAliasContext = getAliasContext(variable);
final RexFieldAccess lastAccess = accesses.pollLast();
assert lastAccess != null;
sqlIdentifier = (SqlIdentifier) correlAliasContext
SqlNode node = correlAliasContext
.field(lastAccess.getField().getIndex());
if ( ! ( node instanceof SqlIdentifier)) {
return node;
}
sqlIdentifier = (SqlIdentifier) node;
break;
case ROW:
final SqlNode expr = toSql(program, referencedExpr);
Expand Down Expand Up @@ -1450,6 +1454,12 @@ public static SqlNode toSql(RexLiteral literal) {
}
}

protected Context getAliasContext(RexCorrelVariable variable){
return requireNonNull(
correlTableMap.get(variable.id),
() -> "variable " + variable.id + " is not found");
}

/** Simple implementation of {@link Context} that cannot handle sub-queries
* or correlations. Because it is so simple, you do not need to create a
* {@link SqlImplementor} or {@link org.apache.calcite.tools.RelBuilder}
Expand Down Expand Up @@ -1479,9 +1489,7 @@ protected abstract class BaseContext extends Context {
}

@Override protected Context getAliasContext(RexCorrelVariable variable) {
return requireNonNull(
correlTableMap.get(variable.id),
() -> "variable " + variable.id + " is not found");
return SqlImplementor.this.getAliasContext(variable);
}

@Override public SqlImplementor implementor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,18 @@
import org.apache.calcite.linq4j.function.Function1;
import org.apache.calcite.linq4j.tree.Primitive;
import org.apache.calcite.util.Static;

import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.DataSource;
import java.math.BigDecimal;
import java.net.URL;
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Connection;
import java.sql.Date;
import java.sql.NClob;
import java.sql.PreparedStatement;
import java.sql.Ref;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.RowId;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLXML;
import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.sql.*;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import java.util.Map;

import static org.apache.calcite.linq4j.Nullness.castNonNull;

Expand Down Expand Up @@ -198,6 +182,14 @@ public static PreparedStatementEnricher createEnricher(Integer[] indexes,
};
}

public static PreparedStatementEnricher createCorrelationEnricher(Object[] values) {
return preparedStatement -> {
for (int i = 0; i < values.length; i++) {
setDynamicParam(preparedStatement, i+1,values[i]);
}
};
}

/** Assigns a value to a dynamic parameter in a prepared statement, calling
* the appropriate {@code setXxx} method based on the type of the value. */
private static void setDynamicParam(PreparedStatement preparedStatement,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ public enum BuiltInMethod {
DataSource.class, String.class, Function1.class,
ResultSetEnumerable.PreparedStatementEnricher.class),
CREATE_ENRICHER(ResultSetEnumerable.class, "createEnricher", Integer[].class,
DataContext.class),
DataContext.class),
CREATE_CORRELATION_ENRICHER(ResultSetEnumerable.class, "createCorrelationEnricher", Object[].class),
HASH_JOIN(ExtendedEnumerable.class, "hashJoin", Enumerable.class,
Function1.class,
Function1.class, Function2.class, EqualityComparer.class,
Expand Down

0 comments on commit e15ba78

Please sign in to comment.