Skip to content

Commit 8064ad8

Browse files
authored
NIFI-15156 Add support for discovering Listen Ports (#10476)
Signed-off-by: David Handermann <[email protected]>
1 parent f47759c commit 8064ad8

File tree

19 files changed

+1056
-6
lines changed

19 files changed

+1056
-6
lines changed

c2/c2-protocol/c2-protocol-component-api/src/main/java/org/apache/nifi/c2/protocol/component/api/PropertyDescriptor.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@
1616
*/
1717
package org.apache.nifi.c2.protocol.component.api;
1818

19+
import io.swagger.v3.oas.annotations.media.Schema;
20+
import org.apache.nifi.expression.ExpressionLanguageScope;
21+
1922
import java.io.Serializable;
2023
import java.util.Collections;
2124
import java.util.List;
2225

23-
import io.swagger.v3.oas.annotations.media.Schema;
24-
import org.apache.nifi.expression.ExpressionLanguageScope;
25-
2626
public class PropertyDescriptor implements Serializable {
2727
private static final long serialVersionUID = 1L;
2828

@@ -41,6 +41,7 @@ public class PropertyDescriptor implements Serializable {
4141
private String validator;
4242
private boolean dynamic;
4343
private PropertyResourceDefinition resourceDefinition;
44+
private PropertyListenPortDefinition listenPortDefinition;
4445
private List<PropertyDependency> dependencies;
4546

4647
@Schema(description = "The name of the property key")
@@ -166,6 +167,15 @@ public void setResourceDefinition(PropertyResourceDefinition resourceDefinition)
166167
this.resourceDefinition = resourceDefinition;
167168
}
168169

170+
@Schema(description = "Indicates that this property defines a listen port")
171+
public PropertyListenPortDefinition getListenPortDefinition() {
172+
return listenPortDefinition;
173+
}
174+
175+
public void setListenPortDefinition(final PropertyListenPortDefinition listenPortDefinition) {
176+
this.listenPortDefinition = listenPortDefinition;
177+
}
178+
169179
@Schema(description = "The dependencies that this property has on other properties")
170180
public List<PropertyDependency> getDependencies() {
171181
return dependencies;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.nifi.c2.protocol.component.api;
18+
19+
import io.swagger.v3.oas.annotations.media.Schema;
20+
21+
import java.io.Serializable;
22+
import java.util.List;
23+
24+
public class PropertyListenPortDefinition implements Serializable {
25+
private static final long serialVersionUID = 1L;
26+
27+
private TransportProtocol transportProtocol;
28+
private List<String> applicationProtocols;
29+
30+
@Schema(description = "The transport protocol used by this listen port")
31+
public TransportProtocol getTransportProtocol() {
32+
return transportProtocol;
33+
}
34+
35+
public void setTransportProtocol(final TransportProtocol transportProtocol) {
36+
this.transportProtocol = transportProtocol;
37+
}
38+
39+
@Schema(description = "The application protocols that this listen port could support (if any)")
40+
public List<String> getApplicationProtocols() {
41+
return applicationProtocols;
42+
}
43+
44+
public void setApplicationProtocols(final List<String> applicationProtocols) {
45+
this.applicationProtocols = applicationProtocols;
46+
}
47+
48+
public enum TransportProtocol {
49+
TCP,
50+
UDP
51+
}
52+
53+
}

nifi-extension-bundles/nifi-extension-utils/nifi-git-flow-registry/src/test/java/org/apache/nifi/registry/flow/git/serialize/JacksonFlowSnapshotSerializerTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.fasterxml.jackson.databind.JsonNode;
2020
import com.fasterxml.jackson.databind.ObjectMapper;
2121
import com.fasterxml.jackson.databind.node.ArrayNode;
22+
import org.apache.nifi.flow.VersionedListenPortDefinition;
2223
import org.apache.nifi.flow.VersionedParameter;
2324
import org.apache.nifi.flow.VersionedParameterContext;
2425
import org.apache.nifi.flow.VersionedProcessGroup;
@@ -64,6 +65,10 @@ public void testOrdering() throws IOException {
6465
final VersionedResourceDefinition resourceDefinition = new VersionedResourceDefinition();
6566
resourceDefinition.setResourceTypes(Set.of(VersionedResourceType.TEXT, VersionedResourceType.URL, VersionedResourceType.FILE));
6667
descriptor.setResourceDefinition(resourceDefinition);
68+
final VersionedListenPortDefinition listenPortDefinition = new VersionedListenPortDefinition();
69+
listenPortDefinition.setTransportProtocol(VersionedListenPortDefinition.TransportProtocol.TCP);
70+
listenPortDefinition.setApplicationProtocols(List.of("http/1.1", "h2"));
71+
descriptor.setListenPortDefinition(listenPortDefinition);
6772

6873
final VersionedProcessor processor1 = new VersionedProcessor();
6974
processor1.setIdentifier("proc1");
@@ -96,6 +101,8 @@ public void testOrdering() throws IOException {
96101
assertEquals("proc1", processors.get(0).get("identifier").asText());
97102
assertEquals("[ \"failure\", \"success\" ]", processors.get(0).get("autoTerminatedRelationships").toPrettyString());
98103
assertEquals("[ \"FILE\", \"TEXT\", \"URL\" ]", processors.get(0).get("propertyDescriptors").get("prop1").get("resourceDefinition").get("resourceTypes").toPrettyString());
104+
assertEquals("TCP", processors.get(0).get("propertyDescriptors").get("prop1").get("listenPortDefinition").get("transportProtocol").asText());
105+
assertEquals("[ \"h2\", \"http/1.1\" ]", processors.get(0).get("propertyDescriptors").get("prop1").get("listenPortDefinition").get("applicationProtocols").toPrettyString());
99106

100107
assertEquals("proc2", processors.get(1).get("identifier").asText());
101108
assertEquals("proc3", processors.get(2).get("identifier").asText());

nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@
3737
import org.apache.nifi.components.PropertyValue;
3838
import org.apache.nifi.components.ValidationContext;
3939
import org.apache.nifi.components.ValidationResult;
40+
import org.apache.nifi.components.listen.ListenComponent;
41+
import org.apache.nifi.components.listen.ListenPort;
42+
import org.apache.nifi.components.listen.StandardListenPort;
43+
import org.apache.nifi.components.listen.TransportProtocol;
44+
import org.apache.nifi.controller.ConfigurationContext;
4045
import org.apache.nifi.expression.ExpressionLanguageScope;
4146
import org.apache.nifi.flowfile.FlowFile;
4247
import org.apache.nifi.jetty.configuration.connector.StandardServerConnectorFactory;
@@ -140,7 +145,7 @@ and will be automatically unpacked. This will output the original FlowFile(s) fr
140145
""")
141146
}
142147
)
143-
public class ListenHTTP extends AbstractSessionFactoryProcessor {
148+
public class ListenHTTP extends AbstractSessionFactoryProcessor implements ListenComponent {
144149
private static final String MATCH_ALL = ".*";
145150

146151
private final AtomicBoolean initialized = new AtomicBoolean(false);
@@ -183,6 +188,7 @@ public AllowableValue getAllowableValue() {
183188
.name("Listening Port")
184189
.description("The Port to listen on for incoming connections")
185190
.required(true)
191+
.identifiesListenPort(TransportProtocol.TCP, "http/1.1", "h2")
186192
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
187193
.addValidator(StandardValidators.PORT_VALIDATOR)
188194
.build();
@@ -197,6 +203,7 @@ public AllowableValue getAllowableValue() {
197203
"If the processor is set to use one-way SSL, one-way SSL will be used on this port. " +
198204
"If the processor is set to use two-way SSL, one-way SSL will be used on this port (client authentication not required).")
199205
.required(false)
206+
.identifiesListenPort(TransportProtocol.TCP, "http/1.1", "h2")
200207
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
201208
.addValidator(StandardValidators.PORT_VALIDATOR)
202209
.build();
@@ -422,6 +429,47 @@ public void shutdownHttpServer() {
422429
shutdownHttpServer(toShutdown);
423430
}
424431

432+
@Override
433+
public List<ListenPort> getListenPorts(final ConfigurationContext context) {
434+
435+
final List<ListenPort> ports = new ArrayList<>();
436+
437+
final Integer primaryPortNumber = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
438+
final Integer healthCheckPortNumber = context.getProperty(HEALTH_CHECK_PORT).evaluateAttributeExpressions().asInteger();
439+
final SSLContextProvider sslContextProvider = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextProvider.class);
440+
final HttpProtocolStrategy httpProtocolStrategy = sslContextProvider == null
441+
? HttpProtocolStrategy.valueOf(HTTP_PROTOCOL_STRATEGY.getDefaultValue())
442+
: context.getProperty(HTTP_PROTOCOL_STRATEGY).asAllowableValue(HttpProtocolStrategy.class);
443+
final List<String> applicationProtocols = switch (httpProtocolStrategy) {
444+
case H2 -> List.of("h2");
445+
case HTTP_1_1 -> List.of("http/1.1");
446+
case H2_HTTP_1_1 -> List.of("h2", "http/1.1");
447+
case null -> List.of("h2", "http/1.1");
448+
};
449+
450+
if (primaryPortNumber != null) {
451+
final ListenPort primaryPort = StandardListenPort.builder()
452+
.portNumber(primaryPortNumber)
453+
.portName(PORT.getDisplayName())
454+
.transportProtocol(TransportProtocol.TCP)
455+
.applicationProtocols(applicationProtocols)
456+
.build();
457+
ports.add(primaryPort);
458+
}
459+
460+
if (healthCheckPortNumber != null) {
461+
final ListenPort healthCheckPort = StandardListenPort.builder()
462+
.portNumber(healthCheckPortNumber)
463+
.portName(HEALTH_CHECK_PORT.getDisplayName())
464+
.transportProtocol(TransportProtocol.TCP)
465+
.applicationProtocols(applicationProtocols)
466+
.build();
467+
ports.add(healthCheckPort);
468+
}
469+
470+
return ports;
471+
}
472+
425473
Server getServer() {
426474
return this.server;
427475
}
@@ -462,8 +510,8 @@ synchronized private void createHttpServerFromService(final ProcessContext conte
462510
// get the configured port
463511
final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
464512
final HttpProtocolStrategy httpProtocolStrategy = sslContextProvider == null
465-
? HttpProtocolStrategy.valueOf(HTTP_PROTOCOL_STRATEGY.getDefaultValue())
466-
: context.getProperty(HTTP_PROTOCOL_STRATEGY).asAllowableValue(HttpProtocolStrategy.class);
513+
? HttpProtocolStrategy.valueOf(HTTP_PROTOCOL_STRATEGY.getDefaultValue())
514+
: context.getProperty(HTTP_PROTOCOL_STRATEGY).asAllowableValue(HttpProtocolStrategy.class);
467515
final ServerConnector connector = createServerConnector(server,
468516
port,
469517
requestHeaderSize,
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package org.apache.nifi.web.api.dto;
17+
18+
import io.swagger.v3.oas.annotations.media.Schema;
19+
import jakarta.xml.bind.annotation.XmlType;
20+
21+
import java.util.List;
22+
23+
@XmlType(name = "listenPort")
24+
public class ListenPortDTO {
25+
26+
// Port definition
27+
private String portName;
28+
private int portNumber;
29+
private String transportProtocol;
30+
private List<String> applicationProtocols;
31+
32+
// Contextual information about the component providing the port, and the PG containing the component
33+
private String componentType;
34+
private String componentId;
35+
private String componentName;
36+
private String componentClass;
37+
private String parentGroupId;
38+
private String parentGroupName;
39+
40+
@Schema(description = "The name of the the listen port. Useful context for components that provide multiple ports.")
41+
public String getPortName() {
42+
return portName;
43+
}
44+
45+
public void setPortName(final String portName) {
46+
this.portName = portName;
47+
}
48+
49+
@Schema(description = "The ingress port number")
50+
public int getPortNumber() {
51+
return portNumber;
52+
}
53+
54+
public void setPortNumber(final int portNumber) {
55+
this.portNumber = portNumber;
56+
}
57+
58+
@Schema(description = "The ingress transport protocol (TCP or UDP)")
59+
public String getTransportProtocol() {
60+
return transportProtocol;
61+
}
62+
63+
public void setTransportProtocol(final String transportProtocol) {
64+
this.transportProtocol = transportProtocol;
65+
}
66+
67+
@Schema(description = "Supported application protocols, if applicable")
68+
public List<String> getApplicationProtocols() {
69+
return applicationProtocols;
70+
}
71+
72+
public void setApplicationProtocols(final List<String> applicationProtocols) {
73+
this.applicationProtocols = applicationProtocols;
74+
}
75+
76+
@Schema(description = "The type of component providing the listen port (e.g., Processor, ControllerService)")
77+
public String getComponentType() {
78+
return componentType;
79+
}
80+
81+
public void setComponentType(final String componentType) {
82+
this.componentType = componentType;
83+
}
84+
85+
@Schema(description = "The id of the component providing the listen port")
86+
public String getComponentId() {
87+
return componentId;
88+
}
89+
90+
public void setComponentId(final String componentId) {
91+
this.componentId = componentId;
92+
}
93+
94+
@Schema(description = "The name of the component providing the listen port")
95+
public String getComponentName() {
96+
return componentName;
97+
}
98+
99+
public void setComponentName(final String componentName) {
100+
this.componentName = componentName;
101+
}
102+
103+
@Schema(description = "The class type of the component providing the listen port")
104+
public String getComponentClass() {
105+
return componentClass;
106+
}
107+
108+
public void setComponentClass(final String componentClass) {
109+
this.componentClass = componentClass;
110+
}
111+
112+
@Schema(description = "The id of the process group containing the component providing the listen port, if applicable")
113+
public String getParentGroupId() {
114+
return parentGroupId;
115+
}
116+
117+
public void setParentGroupId(final String parentGroupId) {
118+
this.parentGroupId = parentGroupId;
119+
}
120+
121+
@Schema(description = "The name of the process group containing the component providing the listen port, if applicable")
122+
public String getParentGroupName() {
123+
return parentGroupName;
124+
}
125+
126+
public void setParentGroupName(final String parentGroupName) {
127+
this.parentGroupName = parentGroupName;
128+
}
129+
130+
@Override
131+
public String toString() {
132+
return ("ListenPortDTO[portName= %s, portNumber=%s, transportProtocol=%s, applicationProtocols=%s, " +
133+
"componentType=%s, componentId=%s, componentName=%s, componentClass=%s, parentGroupId=%s, parentGroupName=%s]").formatted(
134+
portName, portNumber, transportProtocol, applicationProtocols, componentType, componentId, componentName, componentClass, parentGroupId, parentGroupName);
135+
}
136+
}

0 commit comments

Comments
 (0)