Skip to content

Commit 537ce94

Browse files
authored
NIFI-15011 - FlowDifferenceFilters - handle splitRelationship() (#10341)
1 parent 5765c73 commit 537ce94

File tree

2 files changed

+318
-1
lines changed

2 files changed

+318
-1
lines changed

nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/util/FlowDifferenceFilters.java

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@
1818

1919
import org.apache.nifi.annotation.behavior.DynamicProperties;
2020
import org.apache.nifi.annotation.behavior.DynamicProperty;
21+
import org.apache.nifi.annotation.behavior.DynamicRelationship;
2122
import org.apache.nifi.components.ConfigurableComponent;
2223
import org.apache.nifi.components.PropertyDescriptor;
24+
import org.apache.nifi.connectable.Connectable;
25+
import org.apache.nifi.connectable.ConnectableType;
26+
import org.apache.nifi.connectable.Connection;
2327
import org.apache.nifi.controller.ComponentNode;
2428
import org.apache.nifi.controller.ProcessorNode;
2529
import org.apache.nifi.controller.flow.FlowManager;
@@ -36,10 +40,13 @@
3640
import org.apache.nifi.flow.VersionedProcessGroup;
3741
import org.apache.nifi.flow.VersionedProcessor;
3842
import org.apache.nifi.flow.VersionedReportingTask;
43+
import org.apache.nifi.groups.ProcessGroup;
44+
import org.apache.nifi.processor.Processor;
3945
import org.apache.nifi.processor.Relationship;
4046
import org.apache.nifi.registry.flow.diff.DifferenceType;
4147
import org.apache.nifi.registry.flow.diff.FlowDifference;
4248
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedComponent;
49+
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedConnection;
4350
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedControllerService;
4451
import org.apache.nifi.registry.flow.mapping.InstantiatedVersionedProcessor;
4552

@@ -96,7 +103,8 @@ public static boolean isEnvironmentalChange(final FlowDifference difference, fin
96103
|| isStaticPropertyRemoved(difference, flowManager)
97104
|| isControllerServiceCreatedForNewProperty(difference, evaluatedContext)
98105
|| isPropertyParameterizationRename(difference, evaluatedContext)
99-
|| isPropertyRenameWithMatchingValue(difference, evaluatedContext);
106+
|| isPropertyRenameWithMatchingValue(difference, evaluatedContext)
107+
|| isSelectedRelationshipChangeForNewRelationship(difference, flowManager);
100108
}
101109

102110
private static boolean isSensitivePropertyDueToGhosting(final FlowDifference difference, final FlowManager flowManager) {
@@ -441,6 +449,97 @@ public static boolean isNewRelationshipAutoTerminatedAndDefaulted(final FlowDiff
441449
return true;
442450
}
443451

452+
private static boolean isSelectedRelationshipChangeForNewRelationship(final FlowDifference difference, final FlowManager flowManager) {
453+
if (difference.getDifferenceType() != DifferenceType.SELECTED_RELATIONSHIPS_CHANGED) {
454+
return false;
455+
}
456+
457+
if (!(difference.getComponentA() instanceof VersionedConnection connectionA)) {
458+
return false;
459+
}
460+
461+
if (!(difference.getComponentB() instanceof InstantiatedVersionedConnection connectionB)) {
462+
return false;
463+
}
464+
465+
final Set<String> selectedA = new HashSet<>(replaceNull(connectionA.getSelectedRelationships(), Collections.emptySet()));
466+
final Set<String> selectedB = new HashSet<>(replaceNull(connectionB.getSelectedRelationships(), Collections.emptySet()));
467+
468+
final Set<String> newlySelected = new HashSet<>(selectedB);
469+
newlySelected.removeAll(selectedA);
470+
if (newlySelected.isEmpty()) {
471+
return false;
472+
}
473+
474+
final Set<String> removedRelationships = new HashSet<>(selectedA);
475+
removedRelationships.removeAll(selectedB);
476+
477+
if (flowManager == null) {
478+
return false;
479+
}
480+
481+
final String connectionInstanceId = connectionB.getInstanceIdentifier();
482+
final String connectionGroupId = connectionB.getInstanceGroupId();
483+
if (connectionInstanceId == null || connectionGroupId == null) {
484+
return false;
485+
}
486+
487+
final ProcessGroup processGroup = flowManager.getGroup(connectionGroupId);
488+
if (processGroup == null) {
489+
return false;
490+
}
491+
492+
final Connection connection = processGroup.getConnection(connectionInstanceId);
493+
if (connection == null) {
494+
return false;
495+
}
496+
497+
final Connectable source = connection.getSource();
498+
if (source == null || source.getConnectableType() != ConnectableType.PROCESSOR) {
499+
return false;
500+
}
501+
502+
final ProcessorNode processorNode = flowManager.getProcessorNode(source.getIdentifier());
503+
if (processorNode == null) {
504+
return false;
505+
}
506+
507+
final Processor processor = processorNode.getProcessor();
508+
if (processor != null) {
509+
final Class<?> processorClass = processor.getClass();
510+
if (processorClass.isAnnotationPresent(DynamicRelationship.class)) {
511+
return false;
512+
}
513+
}
514+
515+
for (final String relationshipName : newlySelected) {
516+
final Relationship relationship = processorNode.getRelationship(relationshipName);
517+
if (relationship == null) {
518+
return false;
519+
}
520+
521+
if (processorNode.isAutoTerminated(relationship)) {
522+
return false;
523+
}
524+
525+
final Set<Connection> relationshipConnections = replaceNull(processorNode.getConnections(relationship), Collections.emptySet());
526+
for (final Connection relationshipConnection : relationshipConnections) {
527+
if (!relationshipConnection.getIdentifier().equals(connection.getIdentifier())) {
528+
return false;
529+
}
530+
}
531+
}
532+
533+
for (final String removedRelationshipName : removedRelationships) {
534+
final Relationship removedRelationship = processorNode.getRelationship(removedRelationshipName);
535+
if (removedRelationship != null) {
536+
return false;
537+
}
538+
}
539+
540+
return true;
541+
}
542+
444543
private static <T> T replaceNull(final T value, final T replacement) {
445544
return value == null ? replacement : value;
446545
}

0 commit comments

Comments
 (0)