Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'corretto'
distribution: 'temurin'

- name: Set up Gradle
uses: gradle/actions/setup-gradle@v3
Expand Down
8 changes: 4 additions & 4 deletions android/src/main/java/io/ably/lib/push/PushChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void subscribeClient() throws AblyException {
* @throws AblyException
*/
public void subscribeClientAsync(CompletionListener listener) {
subscribeClientImpl().async(new CompletionListener.ToCallback(listener));
subscribeClientImpl().async(new CompletionListener.ToCallback<>(listener));
}

protected Http.Request<Void> subscribeClientImpl() {
Expand Down Expand Up @@ -83,7 +83,7 @@ public void subscribeDevice() throws AblyException {
* @throws AblyException
*/
public void subscribeDeviceAsync(CompletionListener listener) {
subscribeDeviceImpl().async(new CompletionListener.ToCallback(listener));
subscribeDeviceImpl().async(new CompletionListener.ToCallback<>(listener));
}

protected Http.Request<Void> subscribeDeviceImpl() {
Expand Down Expand Up @@ -131,7 +131,7 @@ public void unsubscribeClient() throws AblyException {
* @throws AblyException
*/
public void unsubscribeClientAsync(CompletionListener listener) {
unsubscribeClientImpl().async(new CompletionListener.ToCallback(listener));
unsubscribeClientImpl().async(new CompletionListener.ToCallback<>(listener));
}

protected Http.Request<Void> unsubscribeClientImpl() {
Expand Down Expand Up @@ -163,7 +163,7 @@ public void unsubscribeDevice() throws AblyException {
* @throws AblyException
*/
public void unsubscribeDeviceAsync(CompletionListener listener) {
unsubscribeDeviceImpl().async(new CompletionListener.ToCallback(listener));
unsubscribeDeviceImpl().async(new CompletionListener.ToCallback<>(listener));
}

protected Http.Request<Void> unsubscribeDeviceImpl() {
Expand Down
2 changes: 1 addition & 1 deletion java/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ tasks.register<Test>("testRealtimeSuite") {
}
retry {
maxRetries.set(3)
maxFailures.set(8)
maxFailures.set(15)
failOnPassedAfterRetry.set(false)
failOnSkippedAfterRetry.set(false)
}
Expand Down
10 changes: 5 additions & 5 deletions lib/src/main/java/io/ably/lib/push/PushBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void publish(Param[] recipient, JsonObject payload) throws AblyException
* @throws AblyException
*/
public void publishAsync(Param[] recipient, JsonObject payload, final CompletionListener listener) {
publishImpl(recipient, payload).async(new CompletionListener.ToCallback(listener));
publishImpl(recipient, payload).async(new CompletionListener.ToCallback<>(listener));
}

private Http.Request<Void> publishImpl(final Param[] recipient, final JsonObject payload) {
Expand Down Expand Up @@ -275,7 +275,7 @@ public void remove(String deviceId) throws AblyException {
* @param listener A listener to be notified of success or failure.
*/
public void removeAsync(String deviceId, CompletionListener listener) {
removeImpl(deviceId).async(new CompletionListener.ToCallback(listener));
removeImpl(deviceId).async(new CompletionListener.ToCallback<>(listener));
}

protected Http.Request<Void> removeImpl(final String deviceId) {
Expand Down Expand Up @@ -310,7 +310,7 @@ public void removeWhere(Param[] params) throws AblyException {
* @param listener A listener to be notified of success or failure.
*/
public void removeWhereAsync(Param[] params, CompletionListener listener) {
removeWhereImpl(params).async(new CompletionListener.ToCallback(listener));
removeWhereImpl(params).async(new CompletionListener.ToCallback<>(listener));
}

protected Http.Request<Void> removeWhereImpl(Param[] params) {
Expand Down Expand Up @@ -435,7 +435,7 @@ public void remove(ChannelSubscription subscription) throws AblyException {
* @throws AblyException
*/
public void removeAsync(ChannelSubscription subscription, CompletionListener listener) {
removeImpl(subscription).async(new CompletionListener.ToCallback(listener));
removeImpl(subscription).async(new CompletionListener.ToCallback<>(listener));
}

protected Http.Request<Void> removeImpl(ChannelSubscription subscription) {
Expand Down Expand Up @@ -476,7 +476,7 @@ public void removeWhere(Param[] params) throws AblyException {
* @throws AblyException
*/
public void removeWhereAsync(Param[] params, CompletionListener listener) {
removeWhereImpl(params).async(new CompletionListener.ToCallback(listener));
removeWhereImpl(params).async(new CompletionListener.ToCallback<>(listener));
}

protected Http.Request<Void> removeWhereImpl(Param[] params) {
Expand Down
75 changes: 71 additions & 4 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.ably.lib.types.ProtocolMessage;
import io.ably.lib.types.ProtocolMessage.Action;
import io.ably.lib.types.ProtocolMessage.Flag;
import io.ably.lib.types.PublishResult;
import io.ably.lib.types.Summary;
import io.ably.lib.types.UpdateDeleteResult;
import io.ably.lib.util.CollectionUtils;
Expand Down Expand Up @@ -445,6 +446,16 @@ private static void callCompletionListenerError(CompletionListener listener, Err
}
}

private static void callCompletionListenerError(Callback<PublishResult> listener, ErrorInfo err) {
if(listener != null) {
try {
listener.onError(err);
} catch(Throwable t) {
Log.e(TAG, "Unexpected exception calling CompletionListener", t);
}
}
}

private void setAttached(ProtocolMessage message) {
clearAttachTimers();
properties.attachSerial = message.channelSerial;
Expand Down Expand Up @@ -1026,8 +1037,9 @@ private void unsubscribeImpl(String name, MessageListener listener) {
* @param data the message payload
* @throws AblyException
*/
@NonBlocking
public void publish(String name, Object data) throws AblyException {
publish(name, data, null);
publish(name, data, (Callback<PublishResult>) null);
}

/**
Expand All @@ -1038,8 +1050,9 @@ public void publish(String name, Object data) throws AblyException {
* @param message A {@link Message} object.
* @throws AblyException
*/
@NonBlocking
public void publish(Message message) throws AblyException {
publish(message, null);
publish(message, (Callback<PublishResult>) null);
}

/**
Expand All @@ -1050,8 +1063,9 @@ public void publish(Message message) throws AblyException {
* @param messages An array of {@link Message} objects.
* @throws AblyException
*/
@NonBlocking
public void publish(Message[] messages) throws AblyException {
publish(messages, null);
publish(messages, (Callback<PublishResult>) null);
}

/**
Expand All @@ -1067,12 +1081,36 @@ public void publish(Message[] messages) throws AblyException {
* <p>
* This listener is invoked on a background thread.
* @throws AblyException
* @deprecated Use {@link #publish(String, Object, Callback)} instead.
*/
@Deprecated
@NonBlocking
public void publish(String name, Object data, CompletionListener listener) throws AblyException {
Log.v(TAG, "publish(String, Object); channel = " + this.name + "; event = " + name);
publish(new Message[] {new Message(name, data)}, listener);
}

/**
* Publishes a single message to the channel with the given event name and payload.
* When publish is called with this client library, it won't attempt to implicitly attach to the channel,
* so long as <a href="https://ably.com/docs/realtime/channels#transient-publish">transient publishing</a> is available in the library.
* Otherwise, the client will implicitly attach.
* <p>
* Spec: RTL6i
* @param name the event name
* @param data the message payload
* @param callback A callback may optionally be passed in to this call to be notified of success or failure of the operation,
* receiving a {@link PublishResult} with message serial(s) on success.
* <p>
* This callback is invoked on a background thread.
* @throws AblyException
*/
@NonBlocking
public void publish(String name, Object data, Callback<PublishResult> callback) throws AblyException {
Log.v(TAG, "publish(String, Object); channel = " + this.name + "; event = " + name);
publish(new Message[] {new Message(name, data)}, callback);
}

/**
* Publishes a message to the channel.
* When publish is called with this client library, it won't attempt to implicitly attach to the channel.
Expand All @@ -1083,12 +1121,33 @@ public void publish(String name, Object data, CompletionListener listener) throw
* <p>
* This listener is invoked on a background thread.
* @throws AblyException
* @deprecated Use {@link #publish(Message, Callback)} instead.
*/
@Deprecated
@NonBlocking
public void publish(Message message, CompletionListener listener) throws AblyException {
Log.v(TAG, "publish(Message); channel = " + this.name + "; event = " + message.name);
publish(new Message[] {message}, listener);
}

/**
* Publishes a message to the channel.
* When publish is called with this client library, it won't attempt to implicitly attach to the channel.
* <p>
* Spec: RTL6i
* @param message A {@link Message} object.
* @param callback A callback may optionally be passed in to this call to be notified of success or failure of the operation,
* receiving a {@link PublishResult} with message serial(s) on success.
* <p>
* This callback is invoked on a background thread.
* @throws AblyException
*/
@NonBlocking
public void publish(Message message, Callback<PublishResult> callback) throws AblyException {
Log.v(TAG, "publish(Message); channel = " + this.name + "; event = " + message.name);
publish(new Message[] {message}, callback);
}

/**
* Publishes an array of messages to the channel.
* When publish is called with this client library, it won't attempt to implicitly attach to the channel.
Expand All @@ -1099,8 +1158,16 @@ public void publish(Message message, CompletionListener listener) throws AblyExc
* <p>
* This listener is invoked on a background thread.
* @throws AblyException
* @deprecated Use {@link #publish(Message[], Callback)} instead.
*/
@Deprecated
@NonBlocking
public synchronized void publish(Message[] messages, CompletionListener listener) throws AblyException {
publish(messages, Listeners.fromCompletionListener(listener));
}

@NonBlocking
public synchronized void publish(Message[] messages, Callback<PublishResult> listener) throws AblyException {
Log.v(TAG, "publish(Message[]); channel = " + this.name);
ConnectionManager connectionManager = ably.connection.connectionManager;
ConnectionManager.State connectionState = connectionManager.getConnectionState();
Expand All @@ -1127,7 +1194,7 @@ public synchronized void publish(Message[] messages, CompletionListener listener
case suspended:
throw AblyException.fromErrorInfo(new ErrorInfo("Unable to publish in failed or suspended state", 400, 40000));
default:
connectionManager.send(msg, queueMessages, Listeners.fromCompletionListener(listener));
connectionManager.send(msg, queueMessages, listener);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ public void onError(ErrorInfo reason) {
}
}

class ToCallback implements Callback<Void> {
class ToCallback<T> implements Callback<T> {
private CompletionListener listener;
public ToCallback(CompletionListener listener) {
this.listener = listener;
}

@Override
public void onSuccess(Void v) {
public void onSuccess(T v) {
listener.onSuccess();
}

Expand Down
Loading
Loading