Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLSession;

import org.apache.hc.core5.concurrent.CancellableDependency;
import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.http.EndpointDetails;
import org.apache.hc.core5.http.Header;
Expand Down Expand Up @@ -650,14 +649,10 @@ private void executeRequest(final RequestExecutionCommand requestExecutionComman
final int initOutputWindow = stream.getOutputWindow().get();
streamListener.onOutputFlowControl(this, streamId, initOutputWindow, initOutputWindow);
}

requestExecutionCommand.initiated(stream);
if (stream.isOutputReady()) {
stream.produceOutput();
}
final CancellableDependency cancellableDependency = requestExecutionCommand.getCancellableDependency();
if (cancellableDependency != null) {
cancellableDependency.setDependency(stream::abort);
}
}

private void executePush(final PushResponseCommand pushResponseCommand) throws IOException, HttpException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,13 @@
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.ProtocolException;
import org.apache.hc.core5.http.StreamControl;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http2.H2Error;
import org.apache.hc.core5.http2.H2StreamResetException;

class H2Stream {

enum State { RESERVED, OPEN, CLOSED }
class H2Stream implements StreamControl {

private static final long LINGER_TIME = 1000; // 1 second

Expand All @@ -70,10 +69,16 @@ enum State { RESERVED, OPEN, CLOSED }
this.cancelled = new AtomicBoolean();
}

int getId() {
@Override
public int getId() {
return channel.getId();
}

@Override
public State getState() {
return transitionRef.get();
}

boolean isReserved() {
return reserved;
}
Expand Down Expand Up @@ -276,6 +281,11 @@ void releaseResources() {
}
}

@Override
public boolean cancel() {
return abort();
}

@Override
public String toString() {
final StringBuilder buf = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private void execute(

@Override
public void completed(final IOSession ioSession) {
ioSession.enqueue(new RequestExecutionCommand(new AsyncClientExchangeHandler() {
final AsyncClientExchangeHandler handlerProxy = new AsyncClientExchangeHandler() {

@Override
public void releaseResources() {
Expand Down Expand Up @@ -244,7 +244,13 @@ public void failed(final Exception cause) {
exchangeHandler.failed(cause);
}

}, pushHandlerFactory, cancellableDependency, context), Command.Priority.NORMAL);
};
ioSession.enqueue(new RequestExecutionCommand(
handlerProxy,
pushHandlerFactory,
context,
cancellableDependency::setDependency),
Command.Priority.NORMAL);
if (!ioSession.isOpen()) {
exchangeHandler.failed(new ConnectionClosedException());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void execute(
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context) {
Asserts.check(!closed.get(), "Connection is already closed");
final Command executionCommand = new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, null, context);
final Command executionCommand = new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context);
ioSession.enqueue(executionCommand, Command.Priority.NORMAL);
if (!ioSession.isOpen()) {
exchangeHandler.failed(new ConnectionClosedException());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/

package org.apache.hc.core5.http;

import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.Cancellable;

/**
* Represents a message stream control interface.
*
* @since 5.5
*/
@Internal
public interface StreamControl extends Cancellable {

enum State { RESERVED, OPEN, CLOSED }

int getId();

State getState();

}
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,9 @@ public void execute(
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context) {
final IOSession ioSession = getIOSession();
ioSession.enqueue(new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, null, context), Command.Priority.NORMAL);
ioSession.enqueue(
new RequestExecutionCommand(exchangeHandler, pushHandlerFactory, context),
Command.Priority.NORMAL);
if (!ioSession.isOpen()) {
try {
exchangeHandler.failed(new ConnectionClosedException());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@

/**
* Abstract executable command that may need to do some cleaning up
* in case of an failure and also optionally may want to cancel
* the associated HTTP message exchange through {@link CancellableDependency}.
* in case of an failure.
*
* @since 5.0
*/
@Internal
public abstract class ExecutableCommand implements Command {

/**
* @deprecated Not used.
*/
@Deprecated
public abstract CancellableDependency getCancellableDependency();

public abstract void failed(Exception ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@

import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.CancellableDependency;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.http.RequestNotExecutedException;
import org.apache.hc.core5.http.StreamControl;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
Expand All @@ -50,15 +52,37 @@ public final class RequestExecutionCommand extends ExecutableCommand {
private final HandlerFactory<AsyncPushConsumer> pushHandlerFactory;
private final CancellableDependency cancellableDependency;
private final HttpContext context;
private final Callback<StreamControl> initiationCallback;
private final AtomicBoolean failed;

/**
* @since 5.5
*/
public RequestExecutionCommand(
final AsyncClientExchangeHandler exchangeHandler,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context,
final Callback<StreamControl> initiationCallback) {
this.exchangeHandler = Args.notNull(exchangeHandler, "Handler");
this.pushHandlerFactory = pushHandlerFactory;
this.initiationCallback = initiationCallback;
this.cancellableDependency = null;
this.context = context;
this.failed = new AtomicBoolean();
}

/**
* @deprecated Not used.
*/
@Deprecated
public RequestExecutionCommand(
final AsyncClientExchangeHandler exchangeHandler,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final CancellableDependency cancellableDependency,
final HttpContext context) {
this.exchangeHandler = Args.notNull(exchangeHandler, "Handler");
this.pushHandlerFactory = pushHandlerFactory;
this.initiationCallback = null;
this.cancellableDependency = cancellableDependency;
this.context = context;
this.failed = new AtomicBoolean();
Expand All @@ -68,13 +92,13 @@ public RequestExecutionCommand(
final AsyncClientExchangeHandler exchangeHandler,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context) {
this(exchangeHandler, pushHandlerFactory, null, context);
this(exchangeHandler, pushHandlerFactory, context, null);
}

public RequestExecutionCommand(
final AsyncClientExchangeHandler exchangeHandler,
final HttpContext context) {
this(exchangeHandler, null, null, context);
this(exchangeHandler, null, context);
}

public AsyncClientExchangeHandler getExchangeHandler() {
Expand All @@ -85,6 +109,10 @@ public HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
return pushHandlerFactory;
}

/**
* @deprecated no used.
*/
@Deprecated
@Override
public CancellableDependency getCancellableDependency() {
return cancellableDependency;
Expand All @@ -94,6 +122,19 @@ public HttpContext getContext() {
return context;
}

/**
* @since 5.5
*/
@SuppressWarnings("deprecated")
public void initiated(final StreamControl streamControl) {
if (initiationCallback != null) {
initiationCallback.execute(streamControl);
}
if (cancellableDependency != null) {
cancellableDependency.setDependency(streamControl);
}
}

@Override
public void failed(final Exception ex) {
if (failed.compareAndSet(false, true)) {
Expand Down