Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
*/
package org.apache.nifi.c2.protocol.component.api;

import io.swagger.v3.oas.annotations.media.Schema;
import org.apache.nifi.expression.ExpressionLanguageScope;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;

import io.swagger.v3.oas.annotations.media.Schema;
import org.apache.nifi.expression.ExpressionLanguageScope;

public class PropertyDescriptor implements Serializable {
private static final long serialVersionUID = 1L;

Expand All @@ -41,6 +41,7 @@ public class PropertyDescriptor implements Serializable {
private String validator;
private boolean dynamic;
private PropertyResourceDefinition resourceDefinition;
private PropertyListenPortDefinition listenPortDefinition;
private List<PropertyDependency> dependencies;

@Schema(description = "The name of the property key")
Expand Down Expand Up @@ -166,6 +167,15 @@ public void setResourceDefinition(PropertyResourceDefinition resourceDefinition)
this.resourceDefinition = resourceDefinition;
}

@Schema(description = "Indicates that this property defines a listen port")
public PropertyListenPortDefinition getListenPortDefinition() {
return listenPortDefinition;
}

public void setListenPortDefinition(final PropertyListenPortDefinition listenPortDefinition) {
this.listenPortDefinition = listenPortDefinition;
}

@Schema(description = "The dependencies that this property has on other properties")
public List<PropertyDependency> getDependencies() {
return dependencies;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.c2.protocol.component.api;

import io.swagger.v3.oas.annotations.media.Schema;

import java.io.Serializable;
import java.util.List;

public class PropertyListenPortDefinition implements Serializable {
private static final long serialVersionUID = 1L;

private TransportProtocol transportProtocol;
private List<String> applicationProtocols;

@Schema(description = "The transport protocol used by this listen port")
public TransportProtocol getTransportProtocol() {
return transportProtocol;
}

public void setTransportProtocol(final TransportProtocol transportProtocol) {
this.transportProtocol = transportProtocol;
}

@Schema(description = "The application protocols that this listen port could support (if any)")
public List<String> getApplicationProtocols() {
return applicationProtocols;
}

public void setApplicationProtocols(final List<String> applicationProtocols) {
this.applicationProtocols = applicationProtocols;
}

public enum TransportProtocol {
TCP,
UDP
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.nifi.flow.VersionedListenPortDefinition;
import org.apache.nifi.flow.VersionedParameter;
import org.apache.nifi.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedProcessGroup;
Expand Down Expand Up @@ -64,6 +65,10 @@ public void testOrdering() throws IOException {
final VersionedResourceDefinition resourceDefinition = new VersionedResourceDefinition();
resourceDefinition.setResourceTypes(Set.of(VersionedResourceType.TEXT, VersionedResourceType.URL, VersionedResourceType.FILE));
descriptor.setResourceDefinition(resourceDefinition);
final VersionedListenPortDefinition listenPortDefinition = new VersionedListenPortDefinition();
listenPortDefinition.setTransportProtocol(VersionedListenPortDefinition.TransportProtocol.TCP);
listenPortDefinition.setApplicationProtocols(List.of("http/1.1", "h2"));
descriptor.setListenPortDefinition(listenPortDefinition);

final VersionedProcessor processor1 = new VersionedProcessor();
processor1.setIdentifier("proc1");
Expand Down Expand Up @@ -96,6 +101,8 @@ public void testOrdering() throws IOException {
assertEquals("proc1", processors.get(0).get("identifier").asText());
assertEquals("[ \"failure\", \"success\" ]", processors.get(0).get("autoTerminatedRelationships").toPrettyString());
assertEquals("[ \"FILE\", \"TEXT\", \"URL\" ]", processors.get(0).get("propertyDescriptors").get("prop1").get("resourceDefinition").get("resourceTypes").toPrettyString());
assertEquals("TCP", processors.get(0).get("propertyDescriptors").get("prop1").get("listenPortDefinition").get("transportProtocol").asText());
assertEquals("[ \"h2\", \"http/1.1\" ]", processors.get(0).get("propertyDescriptors").get("prop1").get("listenPortDefinition").get("applicationProtocols").toPrettyString());

assertEquals("proc2", processors.get(1).get("identifier").asText());
assertEquals("proc3", processors.get(2).get("identifier").asText());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.listen.ListenComponent;
import org.apache.nifi.components.listen.ListenPort;
import org.apache.nifi.components.listen.StandardListenPort;
import org.apache.nifi.components.listen.TransportProtocol;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jetty.configuration.connector.StandardServerConnectorFactory;
Expand Down Expand Up @@ -140,7 +145,7 @@ and will be automatically unpacked. This will output the original FlowFile(s) fr
""")
}
)
public class ListenHTTP extends AbstractSessionFactoryProcessor {
public class ListenHTTP extends AbstractSessionFactoryProcessor implements ListenComponent {
private static final String MATCH_ALL = ".*";

private final AtomicBoolean initialized = new AtomicBoolean(false);
Expand Down Expand Up @@ -183,6 +188,7 @@ public AllowableValue getAllowableValue() {
.name("Listening Port")
.description("The Port to listen on for incoming connections")
.required(true)
.identifiesListenPort(TransportProtocol.TCP, "http/1.1", "h2")
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
Expand All @@ -197,6 +203,7 @@ public AllowableValue getAllowableValue() {
"If the processor is set to use one-way SSL, one-way SSL will be used on this port. " +
"If the processor is set to use two-way SSL, one-way SSL will be used on this port (client authentication not required).")
.required(false)
.identifiesListenPort(TransportProtocol.TCP, "http/1.1", "h2")
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
Expand Down Expand Up @@ -422,6 +429,47 @@ public void shutdownHttpServer() {
shutdownHttpServer(toShutdown);
}

@Override
public List<ListenPort> getListenPorts(final ConfigurationContext context) {

final List<ListenPort> ports = new ArrayList<>();

final Integer primaryPortNumber = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
final Integer healthCheckPortNumber = context.getProperty(HEALTH_CHECK_PORT).evaluateAttributeExpressions().asInteger();
final SSLContextProvider sslContextProvider = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextProvider.class);
final HttpProtocolStrategy httpProtocolStrategy = sslContextProvider == null
? HttpProtocolStrategy.valueOf(HTTP_PROTOCOL_STRATEGY.getDefaultValue())
: context.getProperty(HTTP_PROTOCOL_STRATEGY).asAllowableValue(HttpProtocolStrategy.class);
final List<String> applicationProtocols = switch (httpProtocolStrategy) {
case H2 -> List.of("h2");
case HTTP_1_1 -> List.of("http/1.1");
case H2_HTTP_1_1 -> List.of("h2", "http/1.1");
case null -> List.of("h2", "http/1.1");
};

if (primaryPortNumber != null) {
final ListenPort primaryPort = StandardListenPort.builder()
.portNumber(primaryPortNumber)
.portName(PORT.getDisplayName())
.transportProtocol(TransportProtocol.TCP)
.applicationProtocols(applicationProtocols)
.build();
ports.add(primaryPort);
}

if (healthCheckPortNumber != null) {
final ListenPort healthCheckPort = StandardListenPort.builder()
.portNumber(healthCheckPortNumber)
.portName(HEALTH_CHECK_PORT.getDisplayName())
.transportProtocol(TransportProtocol.TCP)
.applicationProtocols(applicationProtocols)
.build();
ports.add(healthCheckPort);
}

return ports;
}

Server getServer() {
return this.server;
}
Expand Down Expand Up @@ -462,8 +510,8 @@ synchronized private void createHttpServerFromService(final ProcessContext conte
// get the configured port
final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
final HttpProtocolStrategy httpProtocolStrategy = sslContextProvider == null
? HttpProtocolStrategy.valueOf(HTTP_PROTOCOL_STRATEGY.getDefaultValue())
: context.getProperty(HTTP_PROTOCOL_STRATEGY).asAllowableValue(HttpProtocolStrategy.class);
? HttpProtocolStrategy.valueOf(HTTP_PROTOCOL_STRATEGY.getDefaultValue())
: context.getProperty(HTTP_PROTOCOL_STRATEGY).asAllowableValue(HttpProtocolStrategy.class);
final ServerConnector connector = createServerConnector(server,
port,
requestHeaderSize,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.web.api.dto;

import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.xml.bind.annotation.XmlType;

import java.util.List;

@XmlType(name = "listenPort")
public class ListenPortDTO {

// Port definition
private String portName;
private int portNumber;
private String transportProtocol;
private List<String> applicationProtocols;

// Contextual information about the component providing the port, and the PG containing the component
private String componentType;
private String componentId;
private String componentName;
private String componentClass;
private String parentGroupId;
private String parentGroupName;

@Schema(description = "The name of the the listen port. Useful context for components that provide multiple ports.")
public String getPortName() {
return portName;
}

public void setPortName(final String portName) {
this.portName = portName;
}

@Schema(description = "The ingress port number")
public int getPortNumber() {
return portNumber;
}

public void setPortNumber(final int portNumber) {
this.portNumber = portNumber;
}

@Schema(description = "The ingress transport protocol (TCP or UDP)")
public String getTransportProtocol() {
return transportProtocol;
}

public void setTransportProtocol(final String transportProtocol) {
this.transportProtocol = transportProtocol;
}

@Schema(description = "Supported application protocols, if applicable")
public List<String> getApplicationProtocols() {
return applicationProtocols;
}

public void setApplicationProtocols(final List<String> applicationProtocols) {
this.applicationProtocols = applicationProtocols;
}

@Schema(description = "The type of component providing the listen port (e.g., Processor, ControllerService)")
public String getComponentType() {
return componentType;
}

public void setComponentType(final String componentType) {
this.componentType = componentType;
}

@Schema(description = "The id of the component providing the listen port")
public String getComponentId() {
return componentId;
}

public void setComponentId(final String componentId) {
this.componentId = componentId;
}

@Schema(description = "The name of the component providing the listen port")
public String getComponentName() {
return componentName;
}

public void setComponentName(final String componentName) {
this.componentName = componentName;
}

@Schema(description = "The class type of the component providing the listen port")
public String getComponentClass() {
return componentClass;
}

public void setComponentClass(final String componentClass) {
this.componentClass = componentClass;
}

@Schema(description = "The id of the process group containing the component providing the listen port, if applicable")
public String getParentGroupId() {
return parentGroupId;
}

public void setParentGroupId(final String parentGroupId) {
this.parentGroupId = parentGroupId;
}

@Schema(description = "The name of the process group containing the component providing the listen port, if applicable")
public String getParentGroupName() {
return parentGroupName;
}

public void setParentGroupName(final String parentGroupName) {
this.parentGroupName = parentGroupName;
}

@Override
public String toString() {
return ("ListenPortDTO[portName= %s, portNumber=%s, transportProtocol=%s, applicationProtocols=%s, " +
"componentType=%s, componentId=%s, componentName=%s, componentClass=%s, parentGroupId=%s, parentGroupName=%s]").formatted(
portName, portNumber, transportProtocol, applicationProtocols, componentType, componentId, componentName, componentClass, parentGroupId, parentGroupName);
}
}
Loading