Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.core.amqp.implementation.ExceptionUtil;
import com.azure.core.util.ClientOptions;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.Header;
import com.azure.core.util.UserAgentUtil;
import com.azure.core.util.logging.LoggingEventBuilder;
import org.apache.qpid.proton.Proton;
Expand Down Expand Up @@ -162,6 +163,15 @@ public int getMaxFrameSize() {
return MAX_FRAME_SIZE;
}

/**
* Gets the headers from the client options associated with this connection.
*
* @return An {@link Iterable} of {@link Header} from the client options.
*/
protected Iterable<Header> getHeaders() {
return connectionOptions.getClientOptions().getHeaders();
}

/**
* Configures the SSL transport layer for the connection based on the {@link ConnectionOptions#getSslVerifyMode()}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@

import com.azure.core.amqp.implementation.AmqpMetricsProvider;
import com.azure.core.amqp.implementation.ConnectionOptions;
import com.azure.core.util.Header;
import com.microsoft.azure.proton.transport.ws.impl.WebSocketImpl;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.SslPeerDetails;
import org.apache.qpid.proton.engine.impl.TransportInternal;

import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static com.azure.core.amqp.implementation.ClientConstants.HOSTNAME_KEY;

/**
Expand Down Expand Up @@ -59,7 +64,15 @@ public WebSocketsConnectionHandler(String connectionId, ConnectionOptions connec
protected void addTransportLayers(final Event event, final TransportInternal transport) {
logger.info("Adding web socket layer");
final WebSocketImpl webSocket = new WebSocketImpl();
webSocket.configure(hostname, SOCKET_PATH, "", 0, PROTOCOL, null, null);

final Map<String, String> headers = StreamSupport.stream(getHeaders().spliterator(), false)
.collect(Collectors.collectingAndThen(
Collectors.toMap(Header::getName, Header::getValue, (a, b) -> a + "," + b,
() -> new java.util.TreeMap<>(String.CASE_INSENSITIVE_ORDER)),
m -> m.isEmpty() ? null : m));

webSocket.configure(hostname, SOCKET_PATH, "", 0, PROTOCOL, headers, null);

Comment thread
thorbenheins marked this conversation as resolved.

transport.addTransportLayer(webSocket);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.azure.core.test.utils.metrics.TestMeasurement;
import com.azure.core.test.utils.metrics.TestMeter;
import com.azure.core.util.ClientOptions;
import com.azure.core.util.Header;
import com.microsoft.azure.proton.transport.ws.impl.WebSocketImpl;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
Expand All @@ -39,6 +40,7 @@
import reactor.core.scheduler.Scheduler;
import reactor.test.StepVerifier;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -270,6 +272,45 @@ AmqpTransportType.AMQP_WEB_SOCKETS, new AmqpRetryOptions(), ProxyOptions.SYSTEM_
}
}

@SuppressWarnings("unchecked")
@Test
public void websocketConfigurePassesClientOptionsHeaders() {
// Arrange
final ClientOptions clientOptionsWithHeaders = new ClientOptions()
.setHeaders(Arrays.asList(
new Header("X-Custom-Header", "custom-value"),
new Header("X-Another-Header", "another-value")));

final ConnectionOptions connectionOptionsWithHeaders
= new ConnectionOptions(HOSTNAME, tokenCredential, CbsAuthorizationType.SHARED_ACCESS_SIGNATURE,
"authorization-scope", AmqpTransportType.AMQP_WEB_SOCKETS, new AmqpRetryOptions(),
ProxyOptions.SYSTEM_DEFAULTS, scheduler, clientOptionsWithHeaders, VERIFY_MODE, PRODUCT,
CLIENT_VERSION);

try (WebSocketsConnectionHandler handlerWithHeaders = new WebSocketsConnectionHandler(CONNECTION_ID,
connectionOptionsWithHeaders, peerDetails, AmqpMetricsProvider.noop())) {
try (MockedConstruction<WebSocketImpl> mockConstruction = mockConstruction(WebSocketImpl.class)) {
// Act
handlerWithHeaders.addTransportLayers(mock(Event.class, Mockito.CALLS_REAL_METHODS),
mock(TransportImpl.class, Mockito.CALLS_REAL_METHODS));

// Assert
final List<WebSocketImpl> constructed = mockConstruction.constructed();
assertEquals(1, constructed.size());

final WebSocketImpl webSocketImpl = constructed.get(0);
ArgumentCaptor<Map<String, String>> headersCaptor = ArgumentCaptor.forClass(Map.class);
verify(webSocketImpl).configure(eq(HOSTNAME), eq("/$servicebus/websocket"), eq(""), eq(0),
eq("AMQPWSB10"), headersCaptor.capture(), eq(null));

final Map<String, String> capturedHeaders = headersCaptor.getValue();
Assertions.assertNotNull(capturedHeaders, "Headers map should not be null.");
assertEquals("custom-value", capturedHeaders.get("X-Custom-Header"));
assertEquals("another-value", capturedHeaders.get("X-Another-Header"));
}
}
}

@Test
void onConnectionCloseMetrics() {
// Arrange
Expand Down
Loading