Skip to content

Commit dc0dd3a

Browse files
authored
NIFI-15016 Ensure nested versioned groups detect scheduled-state changes (#10356)
Signed-off-by: David Handermann <[email protected]>
1 parent 909c06d commit dc0dd3a

File tree

2 files changed

+85
-2
lines changed

2 files changed

+85
-2
lines changed

nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -564,8 +564,8 @@ private void compare(final VersionedProcessGroup groupA, final VersionedProcessG
564564
// - explicitly requested comparison for embedded versioned groups
565565
final boolean shouldCompareVersioned = flowCoordinateDifferences.stream()
566566
.anyMatch(diff -> !diff.getFieldName().isPresent() || !diff.getFieldName().get().equals(FLOW_VERSION)) || flowComparatorVersionedStrategy == FlowComparatorVersionedStrategy.DEEP;
567-
final boolean compareGroupContents = (groupACoordinates == null && groupBCoordinates == null)
568-
|| (groupACoordinates != null && groupBCoordinates != null && shouldCompareVersioned);
567+
final boolean bothGroupsVersioned = groupACoordinates != null && groupBCoordinates != null;
568+
final boolean compareGroupContents = !bothGroupsVersioned || shouldCompareVersioned || hasProcessGroupContents(groupA) || hasProcessGroupContents(groupB);
569569

570570

571571
if (compareGroupContents) {
@@ -620,6 +620,22 @@ private void extractPGComponentsDifferences(final VersionedProcessGroup groupA,
620620
this::compare));
621621
}
622622

623+
private boolean hasProcessGroupContents(final VersionedProcessGroup group) {
624+
if (group == null) {
625+
return false;
626+
}
627+
628+
return !group.getConnections().isEmpty()
629+
|| !group.getProcessors().isEmpty()
630+
|| !group.getControllerServices().isEmpty()
631+
|| !group.getFunnels().isEmpty()
632+
|| !group.getInputPorts().isEmpty()
633+
|| !group.getLabels().isEmpty()
634+
|| !group.getOutputPorts().isEmpty()
635+
|| !group.getProcessGroups().isEmpty()
636+
|| !group.getRemoteProcessGroups().isEmpty();
637+
}
638+
623639

624640
private void compareFlowCoordinates(final VersionedProcessGroup groupA, final VersionedProcessGroup groupB, final Set<FlowDifference> differences) {
625641
final VersionedFlowCoordinates coordinatesA = groupA.getVersionedFlowCoordinates();

nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/test/java/org/apache/nifi/registry/flow/diff/TestStandardFlowComparator.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.nifi.flow.VersionedAsset;
2424
import org.apache.nifi.flow.VersionedComponent;
2525
import org.apache.nifi.flow.VersionedControllerService;
26+
import org.apache.nifi.flow.VersionedFlowCoordinates;
2627
import org.apache.nifi.flow.VersionedParameter;
2728
import org.apache.nifi.flow.VersionedParameterContext;
2829
import org.apache.nifi.flow.VersionedProcessGroup;
@@ -285,6 +286,63 @@ public void testDeepStrategyWithChildPGs() {
285286
&& difference.getComponentA().getComponentType() == ComponentType.CONTROLLER_SERVICE));
286287
}
287288

289+
@Test
290+
public void testScheduledStateChangeDetectedForProcessorInNestedVersionedGroup() {
291+
final String rootPgIdentifier = "rootPG";
292+
final String nestedPgIdentifier = "nestedPG";
293+
final String procIdentifier = "processorZ";
294+
final VersionedProcessGroup registryRoot = new VersionedProcessGroup();
295+
registryRoot.setIdentifier(rootPgIdentifier);
296+
297+
final VersionedProcessGroup localRoot = new VersionedProcessGroup();
298+
localRoot.setIdentifier(rootPgIdentifier);
299+
300+
final VersionedProcessGroup registryNested = new VersionedProcessGroup();
301+
registryNested.setIdentifier(nestedPgIdentifier);
302+
registryNested.setVersionedFlowCoordinates(createVersionedFlowCoordinates());
303+
registryRoot.getProcessGroups().add(registryNested);
304+
305+
final VersionedProcessGroup localNested = new VersionedProcessGroup();
306+
localNested.setIdentifier(nestedPgIdentifier);
307+
localNested.setVersionedFlowCoordinates(createVersionedFlowCoordinates());
308+
localRoot.getProcessGroups().add(localNested);
309+
310+
final VersionedProcessor registryProcessor = new VersionedProcessor();
311+
registryProcessor.setIdentifier(procIdentifier);
312+
registryProcessor.setScheduledState(ScheduledState.ENABLED);
313+
registryProcessor.setProperties(Collections.emptyMap());
314+
registryProcessor.setPropertyDescriptors(Collections.emptyMap());
315+
registryNested.getProcessors().add(registryProcessor);
316+
317+
final VersionedProcessor localProcessor = new VersionedProcessor();
318+
localProcessor.setIdentifier(procIdentifier);
319+
localProcessor.setScheduledState(ScheduledState.DISABLED);
320+
localProcessor.setProperties(Collections.emptyMap());
321+
localProcessor.setPropertyDescriptors(Collections.emptyMap());
322+
localNested.getProcessors().add(localProcessor);
323+
324+
final ComparableDataFlow registryFlow = new StandardComparableDataFlow("registry", registryRoot);
325+
final ComparableDataFlow localFlow = new StandardComparableDataFlow("local", localRoot);
326+
327+
final StandardFlowComparator testComparator = new StandardFlowComparator(
328+
registryFlow,
329+
localFlow,
330+
Collections.emptySet(),
331+
new StaticDifferenceDescriptor(),
332+
Function.identity(),
333+
VersionedComponent::getIdentifier,
334+
FlowComparatorVersionedStrategy.SHALLOW);
335+
336+
final Set<FlowDifference> differences = testComparator.compare().getDifferences();
337+
338+
final boolean scheduledStateDiffFound = differences.stream()
339+
.anyMatch(diff -> diff.getDifferenceType() == DifferenceType.SCHEDULED_STATE_CHANGED
340+
&& diff.getComponentB() != null
341+
&& procIdentifier.equals(diff.getComponentB().getIdentifier()));
342+
343+
assertTrue(scheduledStateDiffFound, "Expected scheduled state change for processor inside nested process group to be detected");
344+
}
345+
288346
private VersionedParameter createParameter(final String name, final String value, final boolean sensitive) {
289347
return createParameter(name, value, sensitive, null);
290348
}
@@ -304,4 +362,13 @@ private VersionedAsset createAsset(final String id, final String name) {
304362
asset.setName(name);
305363
return asset;
306364
}
365+
366+
private VersionedFlowCoordinates createVersionedFlowCoordinates() {
367+
final VersionedFlowCoordinates coordinates = new VersionedFlowCoordinates();
368+
coordinates.setRegistryId("registry");
369+
coordinates.setBucketId("bucketId");
370+
coordinates.setFlowId("flowId");
371+
coordinates.setVersion("1");
372+
return coordinates;
373+
}
307374
}

0 commit comments

Comments
 (0)