Skip to content

Commit 7364263

Browse files
committed
NIFI-15156 Connect Listen Ports REST API endpoint to framework
1 parent a495bbf commit 7364263

File tree

5 files changed

+110
-0
lines changed

5 files changed

+110
-0
lines changed

nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flow/AbstractFlowManager.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import org.apache.nifi.authorization.resource.Authorizable;
2222
import org.apache.nifi.bundle.BundleCoordinate;
2323
import org.apache.nifi.components.PropertyDescriptor;
24+
import org.apache.nifi.components.listen.ListenComponent;
2425
import org.apache.nifi.connectable.Connectable;
2526
import org.apache.nifi.connectable.Connection;
2627
import org.apache.nifi.connectable.Funnel;
2728
import org.apache.nifi.connectable.Port;
29+
import org.apache.nifi.controller.ComponentNode;
2830
import org.apache.nifi.controller.FlowAnalysisRuleNode;
2931
import org.apache.nifi.controller.ParameterProviderNode;
3032
import org.apache.nifi.controller.ProcessScheduler;
@@ -732,4 +734,24 @@ private void removeRuleViolationsForSubject(String identifier) {
732734
ruleViolationsManager.removeRuleViolationsForSubject(identifier);
733735
}
734736
}
737+
738+
@Override
739+
public Set<ComponentNode> getAllListenComponents() {
740+
741+
final Set<ComponentNode> allListenComponents = new HashSet<>();
742+
743+
// TODO improve performance by adding hooks to create/delete methods to keep track of listen components in a new hash map
744+
745+
// Search Processors
746+
allProcessors.values().stream()
747+
.filter(processorNode -> processorNode.getComponent() instanceof ListenComponent)
748+
.forEach(allListenComponents::add);
749+
750+
// Search Controller Services
751+
getAllControllerServices().stream()
752+
.filter(csNode -> csNode.getComponent() instanceof ListenComponent)
753+
.forEach(allListenComponents::add);
754+
755+
return allListenComponents;
756+
}
735757
}

nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/flow/FlowManager.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.nifi.connectable.Connection;
2222
import org.apache.nifi.connectable.Funnel;
2323
import org.apache.nifi.connectable.Port;
24+
import org.apache.nifi.controller.ComponentNode;
2425
import org.apache.nifi.controller.FlowAnalysisRuleNode;
2526
import org.apache.nifi.controller.ParameterProviderNode;
2627
import org.apache.nifi.controller.ProcessorNode;
@@ -439,4 +440,11 @@ FlowAnalysisRuleNode createFlowAnalysisRule(
439440
Optional<FlowAnalyzer> getFlowAnalyzer();
440441

441442
Optional<RuleViolationsManager> getRuleViolationsManager();
443+
444+
/**
445+
* Returns all components (processors, controller services, etc.) that are a {@link org.apache.nifi.components.listen.ListenComponent}
446+
*
447+
* @return A set of listen components.
448+
*/
449+
Set<ComponentNode> getAllListenComponents();
442450
}

nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import org.apache.nifi.web.api.dto.FlowRegistryClientDTO;
7171
import org.apache.nifi.web.api.dto.FunnelDTO;
7272
import org.apache.nifi.web.api.dto.LabelDTO;
73+
import org.apache.nifi.web.api.dto.ListenPortDTO;
7374
import org.apache.nifi.web.api.dto.ListingRequestDTO;
7475
import org.apache.nifi.web.api.dto.NodeDTO;
7576
import org.apache.nifi.web.api.dto.ParameterContextDTO;
@@ -3090,4 +3091,17 @@ ControllerServiceReferencingComponentsEntity updateControllerServiceReferencingC
30903091
*/
30913092
AssetEntity deleteAsset(String parameterContextId, String assetId);
30923093

3094+
// ----------------------------------------
3095+
// Listen Port methods
3096+
// ----------------------------------------
3097+
3098+
/**
3099+
* Get all dynamically defined data ingress ports provided by Listen Components (e.g., Processors and Controller Services)
3100+
*
3101+
* @param user the user performing the lookup
3102+
* @return the list of listen Ports accessible to the current user
3103+
*/
3104+
Set<ListenPortDTO> getListenPorts(NiFiUser user);
3105+
3106+
30933107
}

nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@
263263
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
264264
import org.apache.nifi.web.api.dto.FunnelDTO;
265265
import org.apache.nifi.web.api.dto.LabelDTO;
266+
import org.apache.nifi.web.api.dto.ListenPortDTO;
266267
import org.apache.nifi.web.api.dto.ListingRequestDTO;
267268
import org.apache.nifi.web.api.dto.NarCoordinateDTO;
268269
import org.apache.nifi.web.api.dto.NarSummaryDTO;
@@ -7248,6 +7249,11 @@ protected NiFiRegistryFlowMapper makeNiFiRegistryFlowMapper(final ExtensionManag
72487249
return new NiFiRegistryFlowMapper(extensionManager, options);
72497250
}
72507251

7252+
@Override
7253+
public Set<ListenPortDTO> getListenPorts(final NiFiUser user) {
7254+
return controllerFacade.getListenPorts(user);
7255+
}
7256+
72517257
@Autowired
72527258
public void setProperties(final NiFiProperties properties) {
72537259
this.properties = properties;

nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,12 @@
4343
import org.apache.nifi.cluster.protocol.NodeIdentifier;
4444
import org.apache.nifi.components.ConfigurableComponent;
4545
import org.apache.nifi.components.RequiredPermission;
46+
import org.apache.nifi.components.listen.ListenComponent;
4647
import org.apache.nifi.connectable.Connectable;
4748
import org.apache.nifi.connectable.Connection;
4849
import org.apache.nifi.connectable.Port;
50+
import org.apache.nifi.controller.ComponentNode;
51+
import org.apache.nifi.controller.ConfigurationContext;
4952
import org.apache.nifi.controller.ContentAvailability;
5053
import org.apache.nifi.controller.ControllerService;
5154
import org.apache.nifi.controller.Counter;
@@ -64,6 +67,7 @@
6467
import org.apache.nifi.controller.service.ControllerServiceNode;
6568
import org.apache.nifi.controller.service.ControllerServiceProvider;
6669
import org.apache.nifi.controller.service.ControllerServiceResolver;
70+
import org.apache.nifi.controller.service.StandardConfigurationContext;
6771
import org.apache.nifi.controller.status.ConnectionStatus;
6872
import org.apache.nifi.controller.status.PortStatus;
6973
import org.apache.nifi.controller.status.ProcessGroupStatus;
@@ -113,6 +117,7 @@
113117
import org.apache.nifi.web.api.dto.BundleDTO;
114118
import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
115119
import org.apache.nifi.web.api.dto.DtoFactory;
120+
import org.apache.nifi.web.api.dto.ListenPortDTO;
116121
import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO;
117122
import org.apache.nifi.web.api.dto.provenance.AttributeDTO;
118123
import org.apache.nifi.web.api.dto.provenance.LatestProvenanceEventsDTO;
@@ -143,6 +148,7 @@
143148
import java.util.ArrayList;
144149
import java.util.Arrays;
145150
import java.util.Collection;
151+
import java.util.Collections;
146152
import java.util.Comparator;
147153
import java.util.Date;
148154
import java.util.HashMap;
@@ -1853,6 +1859,60 @@ public SearchResultsDTO search(final String searchLiteral, final String activeGr
18531859
return results;
18541860
}
18551861

1862+
/**
1863+
* Get all user-defined data ingress ports provided by Listen Components (e.g., Processors and Controller Services)
1864+
*
1865+
* @param user the user performing the lookup
1866+
* @return the set of listen Ports accessible to the current user
1867+
*/
1868+
public Set<ListenPortDTO> getListenPorts(final NiFiUser user) {
1869+
1870+
// Get all listen components for which the requesting user is authorized
1871+
final Set<ComponentNode> listenComponentNodes = flowController.getFlowManager().getAllListenComponents().stream()
1872+
.filter(componentNode -> componentNode.isAuthorized(authorizer, RequestAction.READ, user))
1873+
.collect(Collectors.toSet());
1874+
1875+
// If the current user doesn't have access to any listen components, return an empty result for ports
1876+
if (listenComponentNodes.isEmpty()) {
1877+
return Collections.emptySet();
1878+
}
1879+
1880+
// Now find all Listen Ports provided by the Listen Components. A listen component can provide multiple Listen Ports (e.g., ListenHTTP can have a data port and a health check port).
1881+
// The current Listen Ports for a component depend on configuration (e.g., port property value), so create a configuration context to provide ListenComponent.getListenPorts(context).
1882+
final Set<ListenPortDTO> listenPorts = new HashSet<>();
1883+
final ControllerServiceProvider controllerServiceProvider = flowController.getControllerServiceProvider();
1884+
for (final ComponentNode componentNode : listenComponentNodes) {
1885+
final ConfigurationContext configurationContext = new StandardConfigurationContext(componentNode, controllerServiceProvider, null);
1886+
final ConfigurableComponent component = componentNode.getComponent();
1887+
if (component instanceof ListenComponent listenComponent /* should always be true */) {
1888+
listenComponent.getListenPorts(configurationContext).forEach(listenPort -> {
1889+
final ListenPortDTO listenPortDTO = new ListenPortDTO();
1890+
listenPortDTO.setPortNumber(listenPort.getPortNumber());
1891+
listenPortDTO.setTransportProtocol(listenPort.getTransportProtocol().name());
1892+
listenPortDTO.setApplicationProtocols(listenPort.getApplicationProtocols());
1893+
listenPortDTO.setParentGroupId(componentNode.getParentProcessGroup().map(ProcessGroup::getIdentifier).orElse(null));
1894+
listenPortDTO.setParentGroupName(componentNode.getParentProcessGroup().map(ProcessGroup::getName).orElse(null));
1895+
listenPortDTO.setComponentId(componentNode.getIdentifier());
1896+
listenPortDTO.setComponentName(componentNode.getName());
1897+
listenPortDTO.setComponentClass(componentNode.getCanonicalClassName());
1898+
// TODO this next bit could be refined
1899+
if (componentNode instanceof ProcessorNode) {
1900+
listenPortDTO.setComponentType("Processor");
1901+
} else if (componentNode instanceof ControllerServiceNode) {
1902+
listenPortDTO.setComponentType("ControllerService");
1903+
} else {
1904+
// Would we ever have anything other than a Processor or Controller Service providing a Listen Port?
1905+
logger.warn("Unexpected listen component type {}", componentNode.getClass().getCanonicalName());
1906+
listenPortDTO.setComponentType(null);
1907+
}
1908+
listenPorts.add(listenPortDTO);
1909+
});
1910+
}
1911+
}
1912+
1913+
return listenPorts;
1914+
}
1915+
18561916
public void verifyComponentTypes(VersionedProcessGroup versionedFlow) {
18571917
flowController.verifyComponentTypesInSnippet(versionedFlow);
18581918
}

0 commit comments

Comments
 (0)