From 9daf936571d8f69719ed2dab4c8274e16e87f00f Mon Sep 17 00:00:00 2001 From: Avishka-Shamendra Date: Mon, 25 Mar 2024 10:40:16 +0530 Subject: [PATCH] Add property to allow null parameters with cud functions --- .../execution/rdbms/CUDStreamProcessor.java | 20 ++- .../rdbms/util/RDBMSStreamProcessorUtil.java | 5 + .../execution/rdbms/RDBMSCUDTestCase.java | 164 ++++++++++++++++++ 3 files changed, 187 insertions(+), 2 deletions(-) diff --git a/component/src/main/java/io/siddhi/extension/execution/rdbms/CUDStreamProcessor.java b/component/src/main/java/io/siddhi/extension/execution/rdbms/CUDStreamProcessor.java index fe18377b..cda9a992 100644 --- a/component/src/main/java/io/siddhi/extension/execution/rdbms/CUDStreamProcessor.java +++ b/component/src/main/java/io/siddhi/extension/execution/rdbms/CUDStreamProcessor.java @@ -133,6 +133,14 @@ "perform CUD operations.", defaultValue = "false", possibleParameters = {"true", "false"} + ), + @SystemParameter( + name = "allow.null.params.with.CUD", + description = "When set to 'true', this parameter allows the RDBMS CUD function to accept" + + "parameters with NULL values. " + + "When set to 'false', NULL parameters will not be allowed with CUD functions.", + defaultValue = "false", + possibleParameters = {"true", "false"} ) }, returnAttributes = { @@ -208,6 +216,7 @@ public class CUDStreamProcessor extends StreamProcessor { private List attributeList = new ArrayList<>(); private String transactionCorrelationId; private boolean enableCudOperationAutocommit = true; + private boolean allowNullValuedParams; @Override protected StateFactory init(MetaStreamEvent metaStreamEvent, AbstractDefinition inputDefinition, @@ -216,6 +225,8 @@ protected StateFactory init(MetaStreamEvent metaStreamEvent, AbstractDefi boolean findToBeExecuted, SiddhiQueryContext siddhiQueryContext) { boolean performCUDOps = Boolean.parseBoolean( configReader.readConfig("perform.CUD.operations", "false")); + allowNullValuedParams = Boolean.parseBoolean( + configReader.readConfig("allow.null.params.with.CUD", "false")); if (!performCUDOps) { throw new SiddhiAppValidationException("Performing CUD operations through " + "rdbms cud function is disabled. This is configured through system parameter, " + @@ -330,9 +341,14 @@ protected void process(ComplexEventChunk streamEventChunk, Processo } for (int i = 0; i < this.expressionExecutors.size(); i++) { ExpressionExecutor attributeExpressionExecutor = this.expressionExecutors.get(i); + Object value = attributeExpressionExecutor.execute(event); + if (value == null && !allowNullValuedParams) { + throw new SiddhiAppValidationException("Null values have been detected " + + "in the parameters passed to the CUD function. " + + "CUD functions do not permit null parameters."); + } RDBMSStreamProcessorUtil.populateStatementWithSingleElement(stmt, i + 1, - attributeExpressionExecutor.getReturnType(), - attributeExpressionExecutor.execute(event)); + attributeExpressionExecutor.getReturnType(), value); } } stmt.addBatch(); diff --git a/component/src/main/java/io/siddhi/extension/execution/rdbms/util/RDBMSStreamProcessorUtil.java b/component/src/main/java/io/siddhi/extension/execution/rdbms/util/RDBMSStreamProcessorUtil.java index 791ac13a..e3dcc879 100644 --- a/component/src/main/java/io/siddhi/extension/execution/rdbms/util/RDBMSStreamProcessorUtil.java +++ b/component/src/main/java/io/siddhi/extension/execution/rdbms/util/RDBMSStreamProcessorUtil.java @@ -217,6 +217,11 @@ public static HikariDataSource getDataSourceService(String dataSourceName) { */ public static void populateStatementWithSingleElement(PreparedStatement stmt, int ordinal, Attribute.Type type, Object value) throws SQLException { + // Handle 'null' valued params separately + if (value == null) { + stmt.setObject(ordinal, null); + return; + } switch (type) { case BOOL: stmt.setBoolean(ordinal, (Boolean) value); diff --git a/component/src/test/java/io/siddhi/extension/execution/rdbms/RDBMSCUDTestCase.java b/component/src/test/java/io/siddhi/extension/execution/rdbms/RDBMSCUDTestCase.java index 2ec3b99b..7163ed7a 100644 --- a/component/src/test/java/io/siddhi/extension/execution/rdbms/RDBMSCUDTestCase.java +++ b/component/src/test/java/io/siddhi/extension/execution/rdbms/RDBMSCUDTestCase.java @@ -551,4 +551,168 @@ public void receive(Event[] events) { siddhiAppRuntime.shutdown(); ((HikariDataSource) dataSource).close(); } + + + @Test() + public void rdbmsCUDNullParamTest1() throws InterruptedException { + log.info("rdbmsCUDNullParamTest1 - Test allow.null.params.with.CUD property behavior. " + + "When property is 'true', parametrized cud function with " + + "null value should be accepted"); + + String databaseType = System.getenv("DATABASE_TYPE"); + if (databaseType == null) { + databaseType = RDBMSTableTestUtils.TestType.H2.toString(); + } + RDBMSTableTestUtils.TestType type = RDBMSTableTestUtils.TestType.valueOf(databaseType); + + YAMLConfigManager yamlConfigManager = new YAMLConfigManager( + "extensions: \n" + + " - extension: \n" + + " namespace: rdbms\n" + + " name: cud\n" + + " properties:\n" + + " allow.null.params.with.CUD: true\n" + + " perform.CUD.operations: true"); + + SiddhiManager siddhiManager = new SiddhiManager(); + siddhiManager.setConfigManager(yamlConfigManager); + + DataSource dataSource = RDBMSTableTestUtils.initDataSource(); + siddhiManager.setDataSource("TEST_DATASOURCE", dataSource); + + String definitions = "" + + "define stream InsertStream(symbol string, price float, volume long);\n" + + "\n" + + "@Store(type=\"rdbms\", jdbc.url=\"" + url + "\", jdbc.driver.name=\"" + driverClassName + "\"," + + "username=\"" + user + "\", password=\"" + password + "\", pool.properties=\"maximumPoolSize:1\")" + + "define table " + TABLE_NAME + " (symbol string, price float, volume long); " + + "\n"; + + String parameterizedSqlQuery; + boolean isOracle11 = false; + if (type.equals(RDBMSTableTestUtils.TestType.ORACLE)) { + parameterizedSqlQuery = "INSERT INTO " + TABLE_NAME + "(symbol, price, volume) VALUES (?,?,?)"; + isOracle11 = Boolean.parseBoolean(System.getenv("IS_ORACLE_11")); + } else { + parameterizedSqlQuery = "INSERT INTO " + TABLE_NAME + "(symbol, price, volume) VALUES (?,?,?);"; + } + if (!type.equals(RDBMSTableTestUtils.TestType.ORACLE)) { + parameterizedSqlQuery = parameterizedSqlQuery.concat(";"); + } + + String parameterizedCud = "" + + "from InsertStream#rdbms:cud(\"TEST_DATASOURCE\", \"" + parameterizedSqlQuery + + "\", symbol, price, volume) " + + "select numRecords " + + "insert into OutputStream ;" + + "\n"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(definitions + parameterizedCud); + InputHandler insertStream = siddhiAppRuntime.getInputHandler("InsertStream"); + siddhiAppRuntime.start(); + + siddhiAppRuntime.addCallback("OutputStream", new StreamCallback() { + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + for (Event event : events) { + isEventArrived = true; + eventCount.incrementAndGet(); + actualData.add(event.getData()); + } + } + }); + + insertStream.send(new Object[]{"X", 1.0f, 30L}); + SiddhiTestHelper.waitForEvents(2000, 1, eventCount, 60000); + siddhiAppRuntime.shutdown(); + ((HikariDataSource) dataSource).close(); + + Assert.assertTrue(isEventArrived, "Event Not Arrived"); + Assert.assertEquals(eventCount.get(), 1, "Event count did not match"); + + if (isOracle11) { + Assert.assertEquals(actualData.get(0)[0], -2); + } else { + Assert.assertEquals(actualData.get(0)[0], 1); + } + } + + @Test() + public void rdbmsCUDNullParamTest2() throws InterruptedException { + log.info("rdbmsCUDNullParamTest2 - Test allow.null.params.with.CUD property behavior. " + + "When property is 'false', parametrized cud functions with " + + "null value should be rejected"); + + String databaseType = System.getenv("DATABASE_TYPE"); + if (databaseType == null) { + databaseType = RDBMSTableTestUtils.TestType.H2.toString(); + } + RDBMSTableTestUtils.TestType type = RDBMSTableTestUtils.TestType.valueOf(databaseType); + + YAMLConfigManager yamlConfigManager = new YAMLConfigManager( + "extensions: \n" + + " - extension: \n" + + " namespace: rdbms\n" + + " name: cud\n" + + " properties:\n" + + " perform.CUD.operations: true"); + + SiddhiManager siddhiManager = new SiddhiManager(); + siddhiManager.setConfigManager(yamlConfigManager); + + DataSource dataSource = RDBMSTableTestUtils.initDataSource(); + siddhiManager.setDataSource("TEST_DATASOURCE", dataSource); + + String definitions = "" + + "define stream InsertStream(symbol string, price float, volume long);\n" + + "\n" + + "@Store(type=\"rdbms\", jdbc.url=\"" + url + "\", jdbc.driver.name=\"" + driverClassName + "\"," + + "username=\"" + user + "\", password=\"" + password + "\", pool.properties=\"maximumPoolSize:1\")" + + "define table " + TABLE_NAME + " (symbol string, price float, volume long); " + + "\n"; + + String parameterizedSqlQuery; + if (type.equals(RDBMSTableTestUtils.TestType.ORACLE)) { + parameterizedSqlQuery = "INSERT INTO " + TABLE_NAME + "(symbol, price, volume) VALUES (?,?,?)"; + } else { + parameterizedSqlQuery = "INSERT INTO " + TABLE_NAME + "(symbol, price, volume) VALUES (?,?,?);"; + } + if (!type.equals(RDBMSTableTestUtils.TestType.ORACLE)) { + parameterizedSqlQuery = parameterizedSqlQuery.concat(";"); + } + + String parameterizedCud = "" + + "from InsertStream#rdbms:cud(\"TEST_DATASOURCE\", \"" + parameterizedSqlQuery + + "\", symbol, price, volume) " + + "select numRecords " + + "insert into OutputStream ;" + + "\n"; + + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(definitions + parameterizedCud); + InputHandler insertStream = siddhiAppRuntime.getInputHandler("InsertStream"); + siddhiAppRuntime.start(); + + siddhiAppRuntime.addCallback("OutputStream", new StreamCallback() { + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + for (Event event : events) { + isEventArrived = true; + eventCount.incrementAndGet(); + actualData.add(event.getData()); + } + } + }); + + insertStream.send(new Object[]{"Y", 1.0f, null}); + SiddhiTestHelper.waitForEvents(2000, 1, eventCount, 6000); + siddhiAppRuntime.shutdown(); + ((HikariDataSource) dataSource).close(); + + // Event should not arrive and 0 events should be received + Assert.assertFalse(isEventArrived, "Event Arrived"); + Assert.assertEquals(eventCount.get(), 0, "Event count did not match"); + } + }