Skip to content

Commit 73267a4

Browse files
committed
NIFI-15016 - Ensure nested versioned groups detect scheduled-state changes
1 parent 6033076 commit 73267a4

File tree

2 files changed

+82
-2
lines changed

2 files changed

+82
-2
lines changed

nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/main/java/org/apache/nifi/registry/flow/diff/StandardFlowComparator.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -564,8 +564,8 @@ private void compare(final VersionedProcessGroup groupA, final VersionedProcessG
564564
// - explicitly requested comparison for embedded versioned groups
565565
final boolean shouldCompareVersioned = flowCoordinateDifferences.stream()
566566
.anyMatch(diff -> !diff.getFieldName().isPresent() || !diff.getFieldName().get().equals(FLOW_VERSION)) || flowComparatorVersionedStrategy == FlowComparatorVersionedStrategy.DEEP;
567-
final boolean compareGroupContents = (groupACoordinates == null && groupBCoordinates == null)
568-
|| (groupACoordinates != null && groupBCoordinates != null && shouldCompareVersioned);
567+
final boolean bothGroupsVersioned = groupACoordinates != null && groupBCoordinates != null;
568+
final boolean compareGroupContents = !bothGroupsVersioned || shouldCompareVersioned || hasProcessGroupContents(groupA) || hasProcessGroupContents(groupB);
569569

570570

571571
if (compareGroupContents) {
@@ -620,6 +620,22 @@ private void extractPGComponentsDifferences(final VersionedProcessGroup groupA,
620620
this::compare));
621621
}
622622

623+
private boolean hasProcessGroupContents(final VersionedProcessGroup group) {
624+
if (group == null) {
625+
return false;
626+
}
627+
628+
return !group.getConnections().isEmpty()
629+
|| !group.getProcessors().isEmpty()
630+
|| !group.getControllerServices().isEmpty()
631+
|| !group.getFunnels().isEmpty()
632+
|| !group.getInputPorts().isEmpty()
633+
|| !group.getLabels().isEmpty()
634+
|| !group.getOutputPorts().isEmpty()
635+
|| !group.getProcessGroups().isEmpty()
636+
|| !group.getRemoteProcessGroups().isEmpty();
637+
}
638+
623639

624640
private void compareFlowCoordinates(final VersionedProcessGroup groupA, final VersionedProcessGroup groupB, final Set<FlowDifference> differences) {
625641
final VersionedFlowCoordinates coordinatesA = groupA.getVersionedFlowCoordinates();

nifi-registry/nifi-registry-core/nifi-registry-flow-diff/src/test/java/org/apache/nifi/registry/flow/diff/TestStandardFlowComparator.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.nifi.flow.VersionedAsset;
2424
import org.apache.nifi.flow.VersionedComponent;
2525
import org.apache.nifi.flow.VersionedControllerService;
26+
import org.apache.nifi.flow.VersionedFlowCoordinates;
2627
import org.apache.nifi.flow.VersionedParameter;
2728
import org.apache.nifi.flow.VersionedParameterContext;
2829
import org.apache.nifi.flow.VersionedProcessGroup;
@@ -285,6 +286,60 @@ public void testDeepStrategyWithChildPGs() {
285286
&& difference.getComponentA().getComponentType() == ComponentType.CONTROLLER_SERVICE));
286287
}
287288

289+
@Test
290+
public void testScheduledStateChangeDetectedForProcessorInNestedVersionedGroup() {
291+
final VersionedProcessGroup registryRoot = new VersionedProcessGroup();
292+
registryRoot.setIdentifier("rootPG");
293+
294+
final VersionedProcessGroup localRoot = new VersionedProcessGroup();
295+
localRoot.setIdentifier("rootPG");
296+
297+
final VersionedProcessGroup registryNested = new VersionedProcessGroup();
298+
registryNested.setIdentifier("nestedPG");
299+
registryNested.setVersionedFlowCoordinates(createVersionedFlowCoordinates());
300+
registryRoot.getProcessGroups().add(registryNested);
301+
302+
final VersionedProcessGroup localNested = new VersionedProcessGroup();
303+
localNested.setIdentifier("nestedPG");
304+
localNested.setVersionedFlowCoordinates(createVersionedFlowCoordinates());
305+
localRoot.getProcessGroups().add(localNested);
306+
307+
final VersionedProcessor registryProcessor = new VersionedProcessor();
308+
registryProcessor.setIdentifier("processorZ");
309+
registryProcessor.setScheduledState(ScheduledState.ENABLED);
310+
registryProcessor.setProperties(Collections.emptyMap());
311+
registryProcessor.setPropertyDescriptors(Collections.emptyMap());
312+
registryNested.getProcessors().add(registryProcessor);
313+
314+
final VersionedProcessor localProcessor = new VersionedProcessor();
315+
localProcessor.setIdentifier("processorZ");
316+
localProcessor.setScheduledState(ScheduledState.DISABLED);
317+
localProcessor.setProperties(Collections.emptyMap());
318+
localProcessor.setPropertyDescriptors(Collections.emptyMap());
319+
localNested.getProcessors().add(localProcessor);
320+
321+
final ComparableDataFlow registryFlow = new StandardComparableDataFlow("registry", registryRoot);
322+
final ComparableDataFlow localFlow = new StandardComparableDataFlow("local", localRoot);
323+
324+
final StandardFlowComparator testComparator = new StandardFlowComparator(
325+
registryFlow,
326+
localFlow,
327+
Collections.emptySet(),
328+
new StaticDifferenceDescriptor(),
329+
Function.identity(),
330+
VersionedComponent::getIdentifier,
331+
FlowComparatorVersionedStrategy.SHALLOW);
332+
333+
final Set<FlowDifference> differences = testComparator.compare().getDifferences();
334+
335+
final boolean scheduledStateDiffFound = differences.stream()
336+
.anyMatch(diff -> diff.getDifferenceType() == DifferenceType.SCHEDULED_STATE_CHANGED
337+
&& diff.getComponentB() != null
338+
&& "processorZ".equals(diff.getComponentB().getIdentifier()));
339+
340+
assertTrue(scheduledStateDiffFound, "Expected scheduled state change for processorZ inside nested process group to be detected");
341+
}
342+
288343
private VersionedParameter createParameter(final String name, final String value, final boolean sensitive) {
289344
return createParameter(name, value, sensitive, null);
290345
}
@@ -304,4 +359,13 @@ private VersionedAsset createAsset(final String id, final String name) {
304359
asset.setName(name);
305360
return asset;
306361
}
362+
363+
private VersionedFlowCoordinates createVersionedFlowCoordinates() {
364+
final VersionedFlowCoordinates coordinates = new VersionedFlowCoordinates();
365+
coordinates.setRegistryId("registry");
366+
coordinates.setBucketId("bucketId");
367+
coordinates.setFlowId("flowId");
368+
coordinates.setVersion("1");
369+
return coordinates;
370+
}
307371
}

0 commit comments

Comments
 (0)