Skip to content

Commit 944cb60

Browse files
committed
NIFI-15156 Update all ListenComponents to adopt new APIs
- HandleHttpRequest - ListenOTLP - ListenFTP - ListenTCP - ListenUDP - ListenUDPRecord - ListenSyslog - ListenTrapSNMP - JettyWebSocketServer - MapCacheServer - SetCacheServer
1 parent a0679e2 commit 944cb60

File tree

20 files changed

+453
-157
lines changed

20 files changed

+453
-157
lines changed

nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ public abstract class AbstractListenEventBatchingProcessor<E extends Event> exte
4545

4646
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
4747
NETWORK_INTF_NAME,
48-
PORT,
4948
RECV_BUFFER_SIZE,
5049
MAX_MESSAGE_QUEUE_SIZE,
5150
MAX_SOCKET_BUFFER_SIZE,

nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.apache.nifi.annotation.lifecycle.OnScheduled;
2020
import org.apache.nifi.annotation.lifecycle.OnStopped;
2121
import org.apache.nifi.components.PropertyDescriptor;
22-
import org.apache.nifi.expression.ExpressionLanguageScope;
2322
import org.apache.nifi.processor.AbstractProcessor;
2423
import org.apache.nifi.processor.DataUnit;
2524
import org.apache.nifi.processor.ProcessContext;
@@ -58,13 +57,6 @@
5857
*/
5958
public abstract class AbstractListenEventProcessor<E extends Event> extends AbstractProcessor {
6059

61-
public static final PropertyDescriptor PORT = new PropertyDescriptor
62-
.Builder().name("Port")
63-
.description("The port to listen on for communication.")
64-
.required(true)
65-
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
66-
.addValidator(StandardValidators.PORT_VALIDATOR)
67-
.build();
6860
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
6961
.name("Character Set")
7062
.description("Specifies the character set of the received data.")
@@ -111,7 +103,6 @@ public abstract class AbstractListenEventProcessor<E extends Event> extends Abst
111103

112104
private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
113105
NETWORK_INTF_NAME,
114-
PORT,
115106
RECV_BUFFER_SIZE,
116107
MAX_MESSAGE_QUEUE_SIZE,
117108
MAX_SOCKET_BUFFER_SIZE,
@@ -162,6 +153,13 @@ protected List<PropertyDescriptor> getAdditionalProperties() {
162153
return Collections.emptyList();
163154
}
164155

156+
/**
157+
* Processors that extend this abstract class must implement this method to provide the user-configured port number to this base class.
158+
*
159+
* @return the port to listen on for communication.
160+
*/
161+
protected abstract int getConfiguredPort(final ProcessContext context);
162+
165163
@Override
166164
public final Set<Relationship> getRelationships() {
167165
return this.relationships;
@@ -175,7 +173,7 @@ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
175173
@OnScheduled
176174
public void onScheduled(final ProcessContext context) throws IOException {
177175
charset = Charset.forName(context.getProperty(CHARSET).getValue());
178-
final int specifiedPort = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
176+
final int specifiedPort = getConfiguredPort(context);
179177
eventsCapacity = context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger();
180178
events = new TrackingLinkedBlockingQueue<>(eventsCapacity);
181179
final String interfaceName = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();

nifi-extension-bundles/nifi-extension-utils/nifi-event-listen/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,6 @@ public class ListenerProperties {
8383
})
8484
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
8585
.build();
86-
87-
public static final PropertyDescriptor PORT = new PropertyDescriptor
88-
.Builder().name("Port")
89-
.description("The port to listen on for communication.")
90-
.required(true)
91-
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
92-
.addValidator(StandardValidators.PORT_VALIDATOR)
93-
.build();
9486
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
9587
.name("Character Set")
9688
.description("Specifies the character set of the received data.")

nifi-extension-bundles/nifi-opentelemetry-bundle/nifi-opentelemetry-processors/src/main/java/org/apache/nifi/processors/opentelemetry/ListenOTLP.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,29 +25,35 @@
2525
import org.apache.nifi.annotation.documentation.Tags;
2626
import org.apache.nifi.annotation.lifecycle.OnScheduled;
2727
import org.apache.nifi.annotation.lifecycle.OnStopped;
28+
import org.apache.nifi.components.listen.ListenComponent;
29+
import org.apache.nifi.components.listen.ListenPort;
30+
import org.apache.nifi.components.listen.StandardListenPort;
31+
import org.apache.nifi.components.listen.TransportProtocol;
2832
import org.apache.nifi.components.PropertyDescriptor;
33+
import org.apache.nifi.controller.ConfigurationContext;
2934
import org.apache.nifi.event.transport.EventServer;
3035
import org.apache.nifi.event.transport.EventServerFactory;
3136
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
3237
import org.apache.nifi.expression.ExpressionLanguageScope;
3338
import org.apache.nifi.flowfile.FlowFile;
34-
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryAttributeName;
35-
import org.apache.nifi.processors.opentelemetry.io.RequestCallback;
36-
import org.apache.nifi.processors.opentelemetry.io.RequestCallbackProvider;
37-
import org.apache.nifi.processors.opentelemetry.server.HttpServerFactory;
3839
import org.apache.nifi.processor.AbstractProcessor;
3940
import org.apache.nifi.processor.ProcessContext;
4041
import org.apache.nifi.processor.ProcessSession;
4142
import org.apache.nifi.processor.Relationship;
4243
import org.apache.nifi.processor.exception.ProcessException;
4344
import org.apache.nifi.processor.util.StandardValidators;
45+
import org.apache.nifi.processors.opentelemetry.io.RequestCallback;
46+
import org.apache.nifi.processors.opentelemetry.io.RequestCallbackProvider;
47+
import org.apache.nifi.processors.opentelemetry.protocol.TelemetryAttributeName;
48+
import org.apache.nifi.processors.opentelemetry.server.HttpServerFactory;
4449
import org.apache.nifi.security.util.ClientAuth;
4550
import org.apache.nifi.ssl.SSLContextProvider;
4651

4752
import javax.net.ssl.SSLContext;
4853
import java.net.InetAddress;
4954
import java.net.URI;
5055
import java.net.UnknownHostException;
56+
import java.util.ArrayList;
5157
import java.util.Collections;
5258
import java.util.Iterator;
5359
import java.util.List;
@@ -69,7 +75,7 @@
6975
@WritesAttribute(attribute = TelemetryAttributeName.RESOURCE_TYPE, description = "OpenTelemetry Resource Type: LOGS, METRICS, or TRACES"),
7076
@WritesAttribute(attribute = TelemetryAttributeName.RESOURCE_COUNT, description = "Count of resource elements included in messages"),
7177
})
72-
public class ListenOTLP extends AbstractProcessor {
78+
public class ListenOTLP extends AbstractProcessor implements ListenComponent {
7379

7480
static final PropertyDescriptor ADDRESS = new PropertyDescriptor.Builder()
7581
.name("Address")
@@ -85,6 +91,7 @@ public class ListenOTLP extends AbstractProcessor {
8591
.description("TCP port number on which to listen for OTLP Export Service Requests over HTTP and gRPC")
8692
.required(true)
8793
.defaultValue("4317")
94+
.identifiesListenPort(TransportProtocol.TCP, "http/1.1", "h2", "grpc", "otlp")
8895
.addValidator(StandardValidators.PORT_VALIDATOR)
8996
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
9097
.build();
@@ -188,6 +195,21 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
188195
}
189196
}
190197

198+
@Override
199+
public List<ListenPort> getListenPorts(final ConfigurationContext context) {
200+
final List<ListenPort> ports = new ArrayList<>();
201+
final Integer portNumber = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
202+
if (portNumber != null) {
203+
final ListenPort port = StandardListenPort.builder()
204+
.portNumber(portNumber)
205+
.transportProtocol(org.apache.nifi.components.listen.TransportProtocol.TCP)
206+
.applicationProtocols(List.of("http/1.1", "h2", "grpc", "otlp"))
207+
.build();
208+
ports.add(port);
209+
}
210+
return ports;
211+
}
212+
191213
int getPort() {
192214
return server.getListeningPort();
193215
}

nifi-extension-bundles/nifi-snmp-bundle/nifi-snmp-processors/src/main/java/org/apache/nifi/snmp/processors/ListenTrapSNMP.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,14 @@
2424
import org.apache.nifi.annotation.lifecycle.OnScheduled;
2525
import org.apache.nifi.annotation.lifecycle.OnStopped;
2626
import org.apache.nifi.components.ConfigVerificationResult;
27+
import org.apache.nifi.components.listen.ListenComponent;
28+
import org.apache.nifi.components.listen.ListenPort;
29+
import org.apache.nifi.components.listen.StandardListenPort;
30+
import org.apache.nifi.components.listen.TransportProtocol;
2731
import org.apache.nifi.components.PropertyDescriptor;
2832
import org.apache.nifi.components.resource.ResourceCardinality;
2933
import org.apache.nifi.components.resource.ResourceType;
34+
import org.apache.nifi.controller.ConfigurationContext;
3035
import org.apache.nifi.flowfile.FlowFile;
3136
import org.apache.nifi.logging.ComponentLog;
3237
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
@@ -69,13 +74,14 @@
6974
@WritesAttribute(attribute = SNMPUtils.SNMP_PROP_PREFIX + "*", description = "Attributes retrieved from the SNMP response. It may include:"
7075
+ " snmp$errorIndex, snmp$errorStatus, snmp$errorStatusText, snmp$nonRepeaters, snmp$requestID, snmp$type, snmp$variableBindings")
7176
@RequiresInstanceClassLoading
72-
public class ListenTrapSNMP extends AbstractSessionFactoryProcessor implements VerifiableProcessor {
77+
public class ListenTrapSNMP extends AbstractSessionFactoryProcessor implements VerifiableProcessor, ListenComponent {
7378

7479
public static final PropertyDescriptor SNMP_MANAGER_PORT = new PropertyDescriptor.Builder()
7580
.name("snmp-manager-port")
7681
.displayName("SNMP Manager Port")
7782
.description("The port where the SNMP Manager listens to the incoming traps.")
7883
.required(true)
84+
.identifiesListenPort(TransportProtocol.UDP, "snmptrap")
7985
.addValidator(StandardValidators.PORT_VALIDATOR)
8086
.build();
8187

@@ -195,7 +201,22 @@ public void initSnmpManager(ProcessContext context) {
195201
snmpTrapReceiverHandler = new SNMPTrapReceiverHandler(configuration, usmUsers);
196202
}
197203

198-
public int getListeningPort() {
204+
@Override
205+
public List<ListenPort> getListenPorts(final ConfigurationContext context) {
206+
final List<ListenPort> ports = new ArrayList<>();
207+
final Integer portNumber = context.getProperty(SNMP_MANAGER_PORT).evaluateAttributeExpressions().asInteger();
208+
if (portNumber != null) {
209+
final ListenPort port = StandardListenPort.builder()
210+
.portNumber(portNumber)
211+
.transportProtocol(TransportProtocol.UDP)
212+
.applicationProtocols(List.of("snmptrap"))
213+
.build();
214+
ports.add(port);
215+
}
216+
return ports;
217+
}
218+
219+
int getListeningPort() {
199220
if (snmpTrapReceiverHandler == null || !snmpTrapReceiverHandler.isStarted()) {
200221
return 0;
201222
}

nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractSyslogProcessor.java

Lines changed: 0 additions & 66 deletions
This file was deleted.

nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@
4040
import org.apache.nifi.annotation.notification.PrimaryNodeState;
4141
import org.apache.nifi.components.AllowableValue;
4242
import org.apache.nifi.components.PropertyDescriptor;
43+
import org.apache.nifi.components.listen.ListenComponent;
44+
import org.apache.nifi.components.listen.ListenPort;
45+
import org.apache.nifi.components.listen.StandardListenPort;
46+
import org.apache.nifi.components.listen.TransportProtocol;
47+
import org.apache.nifi.controller.ConfigurationContext;
4348
import org.apache.nifi.expression.ExpressionLanguageScope;
4449
import org.apache.nifi.flowfile.FlowFile;
4550
import org.apache.nifi.http.HttpContextMap;
@@ -149,7 +154,7 @@
149154
@WritesAttribute(attribute = "http.multipart.fragments.total.number",
150155
description = "For requests with Content-Type \"multipart/form-data\", the count of all parts is recorded into this attribute.")})
151156
@SeeAlso(value = {HandleHttpResponse.class})
152-
public class HandleHttpRequest extends AbstractProcessor {
157+
public class HandleHttpRequest extends AbstractProcessor implements ListenComponent {
153158

154159
private static final String MIME_TYPE__MULTIPART_FORM_DATA = "multipart/form-data";
155160

@@ -169,6 +174,7 @@ public class HandleHttpRequest extends AbstractProcessor {
169174
.description("The Port to listen on for incoming HTTP requests")
170175
.required(true)
171176
.addValidator(StandardValidators.PORT_VALIDATOR)
177+
.identifiesListenPort(TransportProtocol.TCP, "http/1.1", "h2")
172178
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
173179
.defaultValue("80")
174180
.build();
@@ -400,9 +406,7 @@ synchronized void initializeServer(final ProcessContext context) throws Exceptio
400406
serverConnectorFactory.setWantClientAuth(wantClientAuth);
401407
final SSLContext sslContext = sslContextProvider == null ? null : sslContextProvider.createContext();
402408
serverConnectorFactory.setSslContext(sslContext);
403-
final HttpProtocolStrategy httpProtocolStrategy = sslContext == null
404-
? HttpProtocolStrategy.valueOf(HTTP_PROTOCOL_STRATEGY.getDefaultValue())
405-
: context.getProperty(HTTP_PROTOCOL_STRATEGY).asAllowableValue(HttpProtocolStrategy.class);
409+
final HttpProtocolStrategy httpProtocolStrategy = getHttpProtocolStrategy(context, sslContextProvider);
406410
serverConnectorFactory.setApplicationLayerProtocols(httpProtocolStrategy.getApplicationLayerProtocols());
407411

408412
final ServerConnector serverConnector = serverConnectorFactory.getServerConnector();
@@ -512,6 +516,21 @@ protected void service(final HttpServletRequest request, final HttpServletRespon
512516
ready = true;
513517
}
514518

519+
private static HttpProtocolStrategy getHttpProtocolStrategy(final ConfigurationContext context, final SSLContextProvider sslContextProvider) {
520+
return getHttpProtocolStrategy(context.getProperty(HTTP_PROTOCOL_STRATEGY).asAllowableValue(HttpProtocolStrategy.class), sslContextProvider);
521+
}
522+
523+
private static HttpProtocolStrategy getHttpProtocolStrategy(final ProcessContext context, final SSLContextProvider sslContextProvider) {
524+
return getHttpProtocolStrategy(context.getProperty(HTTP_PROTOCOL_STRATEGY).asAllowableValue(HttpProtocolStrategy.class), sslContextProvider);
525+
}
526+
527+
private static HttpProtocolStrategy getHttpProtocolStrategy(final HttpProtocolStrategy configuredProtocolStrategy, final SSLContextProvider sslContextProvider) {
528+
final HttpProtocolStrategy httpProtocolStrategy = sslContextProvider == null
529+
? HttpProtocolStrategy.valueOf(HTTP_PROTOCOL_STRATEGY.getDefaultValue())
530+
: configuredProtocolStrategy;
531+
return httpProtocolStrategy;
532+
}
533+
515534
protected int getPort() {
516535
for (final Connector connector : server.getConnectors()) {
517536
if (connector instanceof ServerConnector) {
@@ -895,4 +914,31 @@ public AsyncContext getContext() {
895914
return context;
896915
}
897916
}
917+
918+
@Override
919+
public List<ListenPort> getListenPorts(final ConfigurationContext context) {
920+
921+
final List<ListenPort> ports = new ArrayList<>();
922+
923+
final Integer portNumber = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
924+
final SSLContextProvider sslContextProvider = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextProvider.class);
925+
final HttpProtocolStrategy httpProtocolStrategy = getHttpProtocolStrategy(context, sslContextProvider);
926+
final List<String> applicationProtocols = switch (httpProtocolStrategy) {
927+
case H2 -> List.of("h2");
928+
case HTTP_1_1 -> List.of("http/1.1");
929+
case H2_HTTP_1_1 -> List.of("h2", "http/1.1");
930+
case null -> List.of("h2", "http/1.1");
931+
};
932+
933+
if (portNumber != null) {
934+
final ListenPort port = StandardListenPort.builder()
935+
.portNumber(portNumber)
936+
.transportProtocol(TransportProtocol.TCP)
937+
.applicationProtocols(applicationProtocols)
938+
.build();
939+
ports.add(port);
940+
}
941+
942+
return ports;
943+
}
898944
}

0 commit comments

Comments
 (0)