Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ services:
- OAUTH_PROTOCOL=http
- CONFIG_FORCE_ditto_gateway_authentication_oauth_openid__connect__issuers_ditto_issuer=oauth:9900/ditto
- CONFIG_FORCE_ditto_gateway_authentication_oauth_openid__connect__issuers_dummy_issuer=oauth:9900/dummy
- CONFIG_FORCE_ditto_gateway_authentication_oauth_protocol=http
- GATEWAY_WEBSOCKET_THROTTLING_ENABLED=true
- GATEWAY_WEBSOCKET_THROTTLING_INTERVAL=1s
- GATEWAY_WEBSOCKET_THROTTLING_LIMIT=100
Expand Down
4 changes: 2 additions & 2 deletions system/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>17</source>
<target>17</target>
<source>21</source>
<target>21</target>
</configuration>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,22 @@ protected static List<ConnectionId> getAllConnectionIds(final String connectionN
.collect(Collectors.toList());
}

protected static List<Connection> getAllConnectionsById(final String connectionIdPrefixToMatch) {
final Response response = connectionsClient().getConnections()
.withDevopsAuth()
.fire();

return JsonFactory.newArray(response.body().asString()).stream()
.filter(JsonValue::isObject)
.map(JsonValue::asObject)
.filter(connectionJson -> connectionJson.getValue(Connection.JsonFields.ID)
.filter(id -> id.startsWith(connectionIdPrefixToMatch))
.isPresent()
)
.map(ConnectivityModelFactory::connectionFromJson)
.collect(Collectors.toList());
}

protected static List<Connection> getAllConnections(final String connectionNamePrefixToMatch) {
final Response response = connectionsClient().getConnections()
.withDevopsAuth()
Expand All @@ -179,9 +195,27 @@ protected static List<Connection> getAllConnections(final String connectionNameP
.collect(Collectors.toList());
}

/**
* Getting an existing connection by its name for a specific user.
* Prefer this over {@link #getConnectionExistingByName(String)} as getting a connection just by name
* in a testing context can lead to unexpected results, as the connection name is not unique,
* and stale connections might be left over from previous tests especially when running locally and debugging.
* @param connectionName
* @param username
* @return the connection with the given name for the given user
*/
protected static Connection getConnectionExistingByNameForUser(final String connectionName, final String username) {
final List<Connection> allConnections = getAllConnectionsById(username);
return getConnectionByNameFrom(connectionName, allConnections);
}

protected static Connection getConnectionExistingByName(final String connectionName) {
final List<Connection> allConnections = getAllConnections(connectionName);
return getConnectionByNameFrom(connectionName, allConnections);
}

private static Connection getConnectionByNameFrom(final String connectionName,
final List<Connection> allConnections) {
return allConnections.stream()
.filter(conn -> connectionName.equals(conn.getName().orElse(null)))
.findAny()
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import org.awaitility.Awaitility;
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.common.HttpStatusCodeOutOfRangeException;
import org.eclipse.ditto.base.model.common.ResponseType;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.model.Connection;
Expand Down Expand Up @@ -764,6 +765,54 @@ public CompletableFuture<Response> asyncCreateConnection(final Solution solution
});
}

public CompletableFuture<Response> asyncModifyConnection(final JsonObject connectionJson) {
final String connectionName = getConnectionName(connectionJson);
final ConnectivityStatus desiredStatus = getDesiredConnectionStatus(connectionJson);
final String connectionId = JsonFactory.readFrom(connectionJson.toString())
.asObject()
.getValue(Connection.JsonFields.ID)
.orElseThrow();
return CompletableFuture.supplyAsync(() ->
{
try {
return ConnectionsClient.getInstance()
.putConnection(connectionJson.getValue(Connection.JsonFields.ID).orElse(connectionName),
connectionJson)
.withDevopsAuth()
.withHeader(HttpHeader.TIMEOUT, 60)
.expectingHttpStatus(HttpStatus.getInstance(204))
.fire();
} catch (HttpStatusCodeOutOfRangeException e) {
throw new RuntimeException(e);
}
})
.thenApply(response -> {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("modify connection <{}> resulted in response: {} headers: {}", connectionName,
response.statusCode(), response.headers());
response.prettyPrint();
} else {
LOGGER.info("modify connection <{}> resulted in: {}", connectionName, response.statusLine());
}
return response;
})
.thenCompose(response -> awaitLiveStatusOrTimeout(solutionSupplier.getSolution(), connectionId, desiredStatus)
.thenApply(v -> response))
.exceptionally(error -> {
LOGGER.error("modify connection <{}> failed", connectionName, error);
if (CONNECTION_RETRY_COUNTER.incrementAndGet() <= 3) {
try {
TimeUnit.SECONDS.sleep(5L);
asyncModifyConnection(connectionJson);
} catch (InterruptedException e) {
LOGGER.error("Error during sleep", e);
}
}
LOGGER.error("Couldn't modify connection after 3 attempts", error);
return null;
});
}

private CompletableFuture<Void> awaitLiveStatusOrTimeout(final Solution solution,
final CharSequence connectionId, final ConnectivityStatus expectedStatus) {
return CompletableFuture.runAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.testing.common.conditions.DockerEnvironment;
import org.eclipse.ditto.testing.common.conditions.RunIf;
import org.eclipse.ditto.testing.system.connectivity.AbstractConnectivityITestCases;
import org.eclipse.ditto.testing.system.connectivity.AbstractConnectivityResponseDiversionITestCases;
import org.eclipse.ditto.testing.system.connectivity.AbstractConnectivityWorker;
import org.eclipse.ditto.testing.system.connectivity.ConnectionCategory;
import org.eclipse.ditto.testing.system.connectivity.Connections;
Expand All @@ -64,7 +64,8 @@
*/
@RunIf(DockerEnvironment.class)
@NotThreadSafe
public class Amqp10ConnectivityIT extends AbstractConnectivityITestCases<BlockingQueue<Message>, Message> {
public class Amqp10ConnectivityIT extends
AbstractConnectivityResponseDiversionITestCases<BlockingQueue<Message>, Message> {

private static final Logger LOGGER = LoggerFactory.getLogger(Amqp10ConnectivityIT.class);
private static final long WAIT_TIMEOUT_MS = 10_000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.testing.system.connectivity.AbstractConnectivityITBase;
import org.eclipse.ditto.testing.system.connectivity.AbstractConnectivityITestCases;
import org.eclipse.ditto.testing.system.connectivity.AbstractConnectivityResponseDiversionITestCases;
import org.eclipse.ditto.testing.system.connectivity.AbstractConnectivityWorker;
import org.eclipse.ditto.testing.system.connectivity.Connections;
import org.eclipse.ditto.testing.system.connectivity.ConnectivityFactory;
Expand All @@ -58,7 +58,7 @@
* Failsafe won't run this directly because it does not end in *IT.java.
*/
public final class KafkaConnectivitySuite extends
AbstractConnectivityITestCases<BlockingQueue<ConsumerRecord<String, byte[]>>, ConsumerRecord<String, byte[]>> {
AbstractConnectivityResponseDiversionITestCases<BlockingQueue<ConsumerRecord<String, byte[]>>, ConsumerRecord<String, byte[]>> {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConnectivitySuite.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import org.eclipse.ditto.testing.common.TestConstants;
import org.eclipse.ditto.testing.common.client.ConnectionsClient;
import org.eclipse.ditto.testing.common.matcher.StatusCodeSuccessfulMatcher;
import org.eclipse.ditto.testing.system.connectivity.AbstractConnectivityITestCases;
import org.eclipse.ditto.testing.system.connectivity.AbstractConnectivityResponseDiversionITestCases;
import org.eclipse.ditto.testing.system.connectivity.AbstractConnectivityWorker;
import org.eclipse.ditto.testing.system.connectivity.ConnectionCategory;
import org.eclipse.ditto.testing.system.connectivity.Connections;
Expand Down Expand Up @@ -100,7 +100,7 @@
* </p>
*/
public final class Mqtt3ConnectivitySuite
extends AbstractConnectivityITestCases<BlockingQueue<Mqtt3Publish>, Mqtt3Publish> {
extends AbstractConnectivityResponseDiversionITestCases<BlockingQueue<Mqtt3Publish>, Mqtt3Publish> {

private static final Logger LOGGER = LoggerFactory.getLogger(Mqtt3ConnectivitySuite.class);

Expand All @@ -119,13 +119,13 @@ public final class Mqtt3ConnectivitySuite
"{{entity:id | fn:substring-before(':') | fn:substring-before('not-matching') | fn:default(entity:namespace)}}/"
+ "{{ entity:id | fn:substring-after(\":\") | fn:default('should-not-happen') }}");

private static final String ADD_MQTT_TARGET_HEADER_MAPPING = "ADD_MQTT_TARGET_HEADER_MAPPING";
private static final String ADD_MQTT_SOURCE_HEADER_MAPPING = "ADD_MQTT_SOURCE_HEADER_MAPPING";
private static final String REMOVE_MQTT_HEADER_MAPPINGS = "REMOVE_MQTT_MAPPINGS";
private static final String ADD_MQTT_3_TARGET_HEADER_MAPPING = "ADD_MQTT3_TARGET_HEADER_MAPPING";
private static final String ADD_MQTT_3_SOURCE_HEADER_MAPPING = "ADD_MQTT3_SOURCE_HEADER_MAPPING";
private static final String REMOVE_MQTT_3_MAPPINGS = "REMOVE_MQTT3_MAPPINGS";

static {
// adds mqtt source header mapping
addMod(ADD_MQTT_SOURCE_HEADER_MAPPING, connection -> {
addMod(ADD_MQTT_3_SOURCE_HEADER_MAPPING, connection -> {
final HeaderMapping headerMapping = ConnectivityModelFactory.newHeaderMapping(Map.of(
"custom.topic", "{{ header:mqtt.topic }}",
"custom.qos", "{{ header:mqtt.qos }}",
Expand All @@ -140,7 +140,7 @@ public final class Mqtt3ConnectivitySuite
}
);
// adds mqtt target header mapping
addMod(ADD_MQTT_TARGET_HEADER_MAPPING, connection -> {
addMod(ADD_MQTT_3_TARGET_HEADER_MAPPING, connection -> {
final HeaderMapping headerMapping = ConnectivityModelFactory.newHeaderMapping(Map.of(
"mqtt.topic", "{{ header:custom.topic }}",
"mqtt.qos", "{{ header:custom.qos }}",
Expand All @@ -155,7 +155,7 @@ public final class Mqtt3ConnectivitySuite
}
);
// removes mqtt header mappings
addMod(REMOVE_MQTT_HEADER_MAPPINGS, connection -> connection.toBuilder()
addMod(REMOVE_MQTT_3_MAPPINGS, connection -> connection.toBuilder()
.setSources(connection.getSources().stream()
.map(ConnectivityModelFactory::newSourceBuilder)
.map(sb -> sb.headerMapping(null))
Expand Down Expand Up @@ -340,7 +340,7 @@ public void creatingMqttConnectionWithHeaderMappingShouldFail() {
}

@Test
@UseConnection(category = ConnectionCategory.CONNECTION1, mod = ADD_MQTT_SOURCE_HEADER_MAPPING)
@UseConnection(category = ConnectionCategory.CONNECTION1, mod = ADD_MQTT_3_SOURCE_HEADER_MAPPING)
public void allowedHeadersAreMappedFromInboundMqttMessage() {

final ConnectivityTestWebsocketClient websocketClient =
Expand Down Expand Up @@ -372,7 +372,7 @@ public void allowedHeadersAreMappedFromInboundMqttMessage() {
}

@Test
@UseConnection(category = ConnectionCategory.CONNECTION1, mod = ADD_MQTT_TARGET_HEADER_MAPPING)
@UseConnection(category = ConnectionCategory.CONNECTION1, mod = ADD_MQTT_3_TARGET_HEADER_MAPPING)
public void allowedHeadersAreMappedToOutboundMqttMessage() throws InterruptedException {

final ConnectivityTestWebsocketClient websocketClient =
Expand Down Expand Up @@ -422,6 +422,14 @@ private void testSslFailure(@Nullable final String caCrt,
testConnectionWithErrorExpected(connectionStr, ConnectionFailedException.ERROR_CODE);
}

@Override
protected String getTargetAddress(final Connection targetConnection) {
final String address = super.getTargetAddress(targetConnection);
if (address.contains("{{") && address.contains("connection:id") && address.contains("}}")) {
return address.replaceAll("\\{\\{.*}}", targetConnection.getId().toString());
}
return address;
}

@Override
protected String targetAddressForTargetPlaceHolderSubstitution() {
Expand Down Expand Up @@ -450,7 +458,7 @@ public void createThingWithMultipleSlashesInFeatureProperty() {

@Override
@Test
@UseConnection(category = ConnectionCategory.CONNECTION_WITH_CONNECTION_ANNOUNCEMENTS, mod = REMOVE_MQTT_HEADER_MAPPINGS)
@UseConnection(category = ConnectionCategory.CONNECTION_WITH_CONNECTION_ANNOUNCEMENTS, mod = REMOVE_MQTT_3_MAPPINGS)
public void sendsConnectionAnnouncements() {
LOGGER.info("Running sendsConnectionAnnouncements without header mappings for MQTT");
super.sendsConnectionAnnouncements();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ protected void sendAsJsonString(
final JsonObject objectWithHeaders = packHeaders(JsonFactory.newObject(stringMessage), extraHeaders);
final ByteString bytePayload = ByteString.fromString(objectWithHeaders.toString());
rethrow(() -> {
logger.info("mqttClient: publishing on topic <{}>: {}", mqttTopic, bytePayload);
logger.info("[{}] mqttClient: publishing on topic <{}>: {}", correlationId, mqttTopic, bytePayload);
mqttClientSupplier.get()
.publishWith()
.topic(mqttTopic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import org.eclipse.ditto.testing.common.TestConstants;
import org.eclipse.ditto.testing.common.client.ConnectionsClient;
import org.eclipse.ditto.testing.common.matcher.StatusCodeSuccessfulMatcher;
import org.eclipse.ditto.testing.system.connectivity.AbstractConnectivityITestCases;
import org.eclipse.ditto.testing.system.connectivity.AbstractConnectivityResponseDiversionITestCases;
import org.eclipse.ditto.testing.system.connectivity.AbstractConnectivityWorker;
import org.eclipse.ditto.testing.system.connectivity.ConnectionCategory;
import org.eclipse.ditto.testing.system.connectivity.Connections;
Expand Down Expand Up @@ -99,7 +99,7 @@
* </p>
*/
public final class Mqtt5ConnectivitySuite
extends AbstractConnectivityITestCases<BlockingQueue<Mqtt5Publish>, Mqtt5Publish> {
extends AbstractConnectivityResponseDiversionITestCases<BlockingQueue<Mqtt5Publish>, Mqtt5Publish> {

private static final Logger LOGGER = LoggerFactory.getLogger(Mqtt5ConnectivitySuite.class);

Expand All @@ -118,13 +118,13 @@ public final class Mqtt5ConnectivitySuite
"{{entity:id | fn:substring-before(':') | fn:substring-before('not-matching') | fn:default(entity:namespace)}}/"
+ "{{ entity:id | fn:substring-after(\":\") | fn:default('should-not-happen') }}");

private static final String ADD_MQTT_TARGET_HEADER_MAPPING = "ADD_MQTT_TARGET_HEADER_MAPPING";
private static final String ADD_MQTT_SOURCE_HEADER_MAPPING = "ADD_MQTT_SOURCE_HEADER_MAPPING";
private static final String REMOVE_MQTT_HEADER_MAPPINGS = "REMOVE_MQTT_MAPPINGS";
private static final String ADD_MQTT_5_TARGET_HEADER_MAPPING = "ADD_MQTT5_TARGET_HEADER_MAPPING";
private static final String ADD_MQTT_5_SOURCE_HEADER_MAPPING = "ADD_MQTT5_SOURCE_HEADER_MAPPING";
private static final String REMOVE_MQTT_5_MAPPINGS = "REMOVE_MQTT5_MAPPINGS";

static {
// adds mqtt source header mapping
addMod(ADD_MQTT_SOURCE_HEADER_MAPPING, connection -> {
addMod(ADD_MQTT_5_SOURCE_HEADER_MAPPING, connection -> {
final HeaderMapping headerMapping = ConnectivityModelFactory.newHeaderMapping(Map.of(
"custom.topic", "{{ header:mqtt.topic }}",
"custom.qos", "{{ header:mqtt.qos }}",
Expand All @@ -139,7 +139,7 @@ public final class Mqtt5ConnectivitySuite
}
);
// adds mqtt target header mapping
addMod(ADD_MQTT_TARGET_HEADER_MAPPING, connection -> {
addMod(ADD_MQTT_5_TARGET_HEADER_MAPPING, connection -> {
final HeaderMapping headerMapping = ConnectivityModelFactory.newHeaderMapping(Map.of(
"mqtt.topic", "{{ header:custom.topic }}",
"mqtt.qos", "{{ header:custom.qos }}",
Expand All @@ -154,7 +154,7 @@ public final class Mqtt5ConnectivitySuite
}
);
// removes mqtt header mappings
addMod(REMOVE_MQTT_HEADER_MAPPINGS, connection -> connection.toBuilder()
addMod(REMOVE_MQTT_5_MAPPINGS, connection -> connection.toBuilder()
.setSources(connection.getSources().stream()
.map(ConnectivityModelFactory::newSourceBuilder)
.map(sb -> sb.headerMapping(null))
Expand Down Expand Up @@ -334,7 +334,7 @@ public void creatingMqtt5ConnectionWithHeaderMappingShouldSucceed() {
}

@Test
@UseConnection(category = ConnectionCategory.CONNECTION1, mod = ADD_MQTT_SOURCE_HEADER_MAPPING)
@UseConnection(category = ConnectionCategory.CONNECTION1, mod = ADD_MQTT_5_SOURCE_HEADER_MAPPING)
public void allowedHeadersAreMappedFromInboundMqttMessage() {

final ConnectivityTestWebsocketClient websocketClient = ConnectivityTestWebsocketClient
Expand Down Expand Up @@ -366,7 +366,7 @@ public void allowedHeadersAreMappedFromInboundMqttMessage() {
}

@Test
@UseConnection(category = ConnectionCategory.CONNECTION1, mod = ADD_MQTT_TARGET_HEADER_MAPPING)
@UseConnection(category = ConnectionCategory.CONNECTION1, mod = ADD_MQTT_5_TARGET_HEADER_MAPPING)
public void allowedHeadersAreMappedToOutboundMqttMessage() throws InterruptedException {

final ConnectivityTestWebsocketClient websocketClient = ConnectivityTestWebsocketClient
Expand Down