From 7826bd1a3785ec25c2d5caabe55199b4ec706d23 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Tue, 14 Jan 2025 14:50:40 -0500 Subject: [PATCH] NIFI-14157: Allow InvokeScriptedProcessor scripts to implement OnPrimaryNodeStateChange --- .../script/InvokeScriptedProcessor.java | 9 +++- .../processors/script/TestInvokeGroovy.java | 29 +++++++++++ .../groovy/test_OnPrimaryStateChange.groovy | 52 +++++++++++++++++++ .../util/StandardProcessorTestRunner.java | 10 ++++ 4 files changed, 99 insertions(+), 1 deletion(-) create mode 100644 nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_OnPrimaryStateChange.groovy diff --git a/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java b/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java index 74215f6df1c8..bf7f01e0860b 100644 --- a/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java +++ b/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java @@ -30,6 +30,8 @@ import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; +import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.RequiredPermission; import org.apache.nifi.components.ValidationContext; @@ -212,7 +214,6 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String public void setup(final ProcessContext context) { scriptingComponentHelper.setupVariables(context); setup(); - invokeScriptedProcessorMethod("onScheduled", context); } @@ -232,6 +233,12 @@ public void setup() { } } + @OnPrimaryNodeStateChange + public void onPrimaryNodeStateChange(final PrimaryNodeState newState) { + + invokeScriptedProcessorMethod("onPrimaryNodeStateChange", newState); + } + /** * Handles changes to this processor's properties. If changes are made to * script- or engine-related properties, the script will be reloaded. diff --git a/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java b/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java index ffb028b3fd4d..7e239b4cd5fe 100644 --- a/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java +++ b/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/TestInvokeGroovy.java @@ -241,6 +241,31 @@ public void testReadRecordsWithRecordPath() throws Exception { ff.assertContentEquals("48\n47\n14\n"); } + /** + * Tests a script that has a Groovy Processor that implements its own onPrimaryNodeStateChange + * + * @throws Exception Any error encountered while testing + */ + @Test + public void testOnPrimaryNodeStateChange() { + runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy"); + runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, "target/test/resources/groovy/test_OnPrimaryStateChange.groovy"); + runner.setProperty(ScriptingComponentUtils.MODULES, "target/test/resources/groovy"); + InvokeScriptedProcessor invokeScriptedProcessor = ((InvokeScriptedProcessor) scriptingComponent); + invokeScriptedProcessor.setup(runner.getProcessContext()); + runner.setIsConfiguredForClustering(true); + runner.run(1, false, true); + runner.setPrimaryNode(true); + runner.clearTransferState(); + runner.run(1, true, false); + runner.assertAllFlowFilesTransferred("success"); + List flowFiles = runner.getFlowFilesForRelationship("success"); + assertNotNull(flowFiles); + assertEquals(1, flowFiles.size()); + MockFlowFile flowFile = flowFiles.get(0); + flowFile.assertAttributeEquals("isPrimaryNode", "true"); + } + private static class OverrideInvokeScriptedProcessor extends InvokeScriptedProcessor { private int numTimesModifiedCalled = 0; @@ -258,4 +283,8 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String numTimesModifiedCalled++; } } + + private static class OnPrimaryNodeStateChangeMethodWasCalledException extends RuntimeException { + + } } diff --git a/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_OnPrimaryStateChange.groovy b/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_OnPrimaryStateChange.groovy new file mode 100644 index 000000000000..bb2b82ff6be7 --- /dev/null +++ b/nifi-extension-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_OnPrimaryStateChange.groovy @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange +import org.apache.nifi.annotation.notification.PrimaryNodeState +import org.apache.nifi.processor.AbstractProcessor +import org.apache.nifi.processor.ProcessContext +import org.apache.nifi.processor.ProcessSession +import org.apache.nifi.processor.Relationship + + +class MyRecordProcessor extends AbstractProcessor { + + def REL_SUCCESS = new Relationship.Builder().name("success").description('FlowFiles that were successfully processed are routed here').build() + def REL_FAILURE = new Relationship.Builder().name("failure").description('FlowFiles are routed here if an error occurs during processing').build() + + static boolean primaryNode = false + + @OnPrimaryNodeStateChange + void onPrimaryNodeStateChange(final PrimaryNodeState newState) { + primaryNode = true + } + + @Override + Set getRelationships() { + [REL_SUCCESS, REL_FAILURE] as Set + } + + @Override + void onTrigger(ProcessContext context, ProcessSession session) { + def flowFile = session.create() + session.putAttribute(flowFile, 'isPrimaryNode', primaryNode.toString()) + session.transfer(flowFile, REL_SUCCESS) + } +} + +processor = new MyRecordProcessor() \ No newline at end of file diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 7f1d84807531..292c7a9e3730 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -26,6 +26,8 @@ import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; +import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.components.DescribedValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; @@ -979,6 +981,14 @@ public void setIsConfiguredForClustering(final boolean isConfiguredForClustering @Override public void setPrimaryNode(boolean primaryNode) { + if (context.isPrimary() != primaryNode) { + try { + ReflectionUtils.invokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, processor, + primaryNode ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED); + } catch (final Exception e) { + Assertions.fail("Could not invoke methods annotated with @OnPrimaryNodeStateChange annotation due to: " + e); + } + } context.setPrimaryNode(primaryNode); }