Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
51b0a17
Http modules scaffolding
gthea Feb 9, 2026
b168028
Move files to http modules
gthea Feb 9, 2026
ef7778b
Http client config
gthea Feb 9, 2026
b793199
Http modules scaffolding (#863)
gthea Feb 9, 2026
153daed
Fix instrumented test
gthea Feb 9, 2026
8d26f92
Populate HTTP module
gthea Feb 9, 2026
3317ea8
Update http README
gthea Feb 9, 2026
666e666
Remove unused classes; move helper to private method
gthea Feb 10, 2026
a92c45c
Fix Javadoc
gthea Feb 10, 2026
2d1e8f5
Move public HTTP classes to http-domain (#864)
gthea Feb 10, 2026
0c2358c
Move classes to http module (#865)
gthea Feb 10, 2026
6cbe55a
Rename http-domain to http-api (#866)
gthea Feb 13, 2026
1e604f7
WIP
gthea Jan 28, 2026
10fc972
Streaming without auth
gthea Feb 9, 2026
70d61fa
README and .gitignore
gthea Feb 9, 2026
b841953
Fix README
gthea Feb 9, 2026
c206c2d
Move backoff back to main
gthea Feb 9, 2026
b7a56ac
Update readme
gthea Feb 9, 2026
4564dbf
Update README
gthea Feb 16, 2026
ea770a3
Remove unnecessary utils class
gthea Feb 16, 2026
706b61b
Increase test coverage
gthea Feb 17, 2026
0f0a022
Merge branch 'development' into streaming-extract
gthea Feb 24, 2026
150030a
Merge branch 'development' into streaming-extract
gthea Mar 25, 2026
5f5184a
Streaming support module
gthea Mar 25, 2026
d9a59c9
fix main/build.gradle
gthea Mar 25, 2026
21244e8
Add streaming-support to fused library
gthea Mar 25, 2026
1fce725
Merge branch 'development' into streaming-extract
gthea Mar 26, 2026
08dc6ea
Add tests
gthea Mar 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ dependencies {
return candidates.find { findProject(it) != null }
}

['main', 'logger', 'executor', 'events', 'events-domain', 'api', 'http-api', 'http', 'fallback', 'backoff', 'tracker', 'submitter'].each { moduleName ->
['main', 'logger', 'events', 'events-domain', 'api', 'http-api', 'http', 'fallback', 'backoff', 'tracker', 'submitter', 'streaming', 'streaming-support', 'executor'].each { moduleName ->
def resolvedPath = resolveProjectPath(moduleName)
if (resolvedPath != null) {
include project(resolvedPath)
Expand Down
6 changes: 4 additions & 2 deletions main/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ dependencies {
api clientModuleProject('submitter')

// Internal module dependencies
implementation clientModuleProject('http')
implementation clientModuleProject('events-domain')
implementation clientModuleProject(':http')
implementation clientModuleProject(':events-domain')
implementation clientModuleProject(':streaming')
implementation clientModuleProject(':streaming-support')

// External dependencies
implementation libs.roomRuntime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import io.split.android.client.common.CompressionUtilProvider;
import io.split.android.client.streaming.support.CompressionUtilProvider;
import io.split.android.client.events.EventsManagerCoordinator;
import io.split.android.client.events.SplitInternalEvent;
import io.split.android.client.lifecycle.SplitLifecycleManager;
Expand Down Expand Up @@ -54,13 +54,22 @@
import io.split.android.client.service.sseclient.reactor.MySegmentsUpdateWorkerRegistry;
import io.split.android.client.service.sseclient.reactor.SplitUpdatesWorker;
import io.split.android.client.service.sseclient.sseclient.BackoffCounterTimer;
import io.split.android.client.service.sseclient.sseclient.HttpFetcherStreamingAuthFetcher;
import io.split.android.client.service.sseclient.sseclient.NotificationProcessorUpdateListener;
import io.split.android.client.service.sseclient.sseclient.PushNotificationManager;
import io.split.android.client.service.sseclient.sseclient.SseAuthenticator;
import io.split.android.client.service.sseclient.sseclient.SseClient;
import io.split.android.client.service.sseclient.sseclient.SseClientImpl;
import io.split.android.client.service.sseclient.sseclient.HttpClientStreamingTransport;
import io.split.android.client.service.sseclient.sseclient.DefaultSseClient;
import io.split.android.client.service.sseclient.sseclient.EventSourceClientImpl;
import io.split.android.client.service.sseclient.sseclient.SseHandler;
import io.split.android.client.service.sseclient.sseclient.SseRefreshTokenTimer;
import io.split.android.client.service.sseclient.sseclient.SplitTaskExecutorStreamingScheduler;
import io.split.android.client.service.sseclient.sseclient.StreamingComponents;
import io.split.android.client.service.sseclient.sseclient.TelemetryRuntimeProducerStreamingTelemetry;
import io.split.android.client.service.sseclient.spi.StreamingScheduler;
import io.split.android.client.service.sseclient.spi.StreamingTelemetry;
import io.split.android.client.service.sseclient.spi.UpdateNotificationListener;
import io.split.android.client.service.synchronizer.RolloutCacheManager;
import io.split.android.client.service.synchronizer.RolloutCacheManagerImpl;
import io.split.android.client.service.synchronizer.SyncGuardian;
Expand Down Expand Up @@ -288,37 +297,41 @@ SyncManager buildSyncManager(SplitClientConfig config,
}

@NonNull
PushNotificationManager getPushNotificationManager(SplitTaskExecutor splitTaskExecutor,
PushNotificationManager getPushNotificationManager(StreamingScheduler scheduler,
SseAuthenticator sseAuthenticator,
PushManagerEventBroadcaster pushManagerEventBroadcaster,
SseClient sseClient,
TelemetryRuntimeProducer telemetryRuntimeProducer,
StreamingTelemetry telemetry,
long defaultSseConnectionDelayInSecs,
int sseDisconnectionDelayInSecs) {
return new PushNotificationManager(pushManagerEventBroadcaster,
sseAuthenticator,
sseClient,
new SseRefreshTokenTimer(splitTaskExecutor, pushManagerEventBroadcaster),
telemetryRuntimeProducer,
new SseRefreshTokenTimer(scheduler, pushManagerEventBroadcaster),
scheduler,
telemetry,
defaultSseConnectionDelayInSecs,
sseDisconnectionDelayInSecs,
null);
}

public SseClient getSseClient(String streamingServiceUrlString,
NotificationParser notificationParser,
NotificationProcessor notificationProcessor,
TelemetryRuntimeProducer telemetryRuntimeProducer,
UpdateNotificationListener updateListener,
StreamingTelemetry telemetry,
PushManagerEventBroadcaster pushManagerEventBroadcaster,
HttpClient httpClient) {
SseHandler sseHandler = new SseHandler(notificationParser,
notificationProcessor,
telemetryRuntimeProducer,
updateListener,
telemetry,
pushManagerEventBroadcaster);

return new SseClientImpl(URI.create(streamingServiceUrlString),
httpClient,
new EventStreamParser(),
EventSourceClientImpl eventSourceClient = new EventSourceClientImpl(
new HttpClientStreamingTransport(httpClient),
new EventStreamParser());

return new DefaultSseClient(URI.create(streamingServiceUrlString),
eventSourceClient,
sseHandler);
}

Expand Down Expand Up @@ -396,22 +409,25 @@ public StreamingComponents buildStreamingComponents(@NonNull SplitTaskExecutor s
notificationParser, splitsUpdateNotificationQueue);

PushManagerEventBroadcaster pushManagerEventBroadcaster = new PushManagerEventBroadcaster();
StreamingScheduler scheduler = new SplitTaskExecutorStreamingScheduler(splitTaskExecutor);
StreamingTelemetry streamingTelemetry = new TelemetryRuntimeProducerStreamingTelemetry(storageContainer.getTelemetryStorage());
UpdateNotificationListener updateListener = new NotificationProcessorUpdateListener(notificationProcessor);

SseClient sseClient = getSseClient(config.streamingServiceUrl(),
notificationParser,
notificationProcessor,
storageContainer.getTelemetryStorage(),
updateListener,
streamingTelemetry,
pushManagerEventBroadcaster,
defaultHttpClient);

SseAuthenticator sseAuthenticator = new SseAuthenticator(splitApiFacade.getSseAuthenticationFetcher(),
SseAuthenticator sseAuthenticator = new SseAuthenticator(new HttpFetcherStreamingAuthFetcher(splitApiFacade.getSseAuthenticationFetcher()),
new SseJwtParser(), flagsSpec);

PushNotificationManager pushNotificationManager = getPushNotificationManager(splitTaskExecutor,
PushNotificationManager pushNotificationManager = getPushNotificationManager(scheduler,
sseAuthenticator,
pushManagerEventBroadcaster,
sseClient,
storageContainer.getTelemetryStorage(),
streamingTelemetry,
config.defaultSSEConnectionDelay(),
config.sseDisconnectionDelay());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import io.split.android.client.main.BuildConfig;
import io.split.android.client.api.Key;
import io.split.android.client.common.CompressionUtilProvider;
import io.split.android.client.streaming.support.CompressionUtilProvider;
import io.split.android.client.events.EventsManagerCoordinator;
import io.split.android.client.factory.FactoryMonitor;
import io.split.android.client.factory.FactoryMonitorImpl;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.split.android.client.service.sseclient;

/**
* Constants used by the streaming module.
*/
public final class StreamingConstants {

private StreamingConstants() {
// Utility class
}

/**
* Buffer size for segment data decompression.
*/
public static final int SEGMENT_DATA_BUFFER_SIZE = 1024 * 10; // 10KB

/**
* Query param for flags spec in streaming auth.
*/
public static final String FLAGS_SPEC_PARAM = "s";
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import com.google.gson.annotations.SerializedName;

import io.split.android.client.common.CompressionType;
import io.split.android.client.streaming.support.CompressionType;

public abstract class InstantUpdateChangeNotification extends IncomingNotification {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import java.util.Set;

import io.split.android.client.common.CompressionType;
import io.split.android.client.streaming.support.CompressionType;

public class MembershipNotification extends IncomingNotification {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import io.split.android.client.exceptions.MySegmentsParsingException;
import io.split.android.client.utils.Base64Util;
import io.split.android.client.utils.CompressionUtil;
import io.split.android.client.streaming.support.CompressionUtil;
import io.split.android.client.utils.MurmurHash3;
import io.split.android.client.utils.StringHelper;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;

import io.split.android.client.common.CompressionType;
import io.split.android.client.common.CompressionUtilProvider;
import io.split.android.client.streaming.support.CompressionType;
import io.split.android.client.streaming.support.CompressionUtilProvider;
import io.split.android.client.service.executor.SplitTaskExecutor;
import io.split.android.client.service.mysegments.MySegmentUpdateParams;
import io.split.android.client.service.mysegments.MySegmentsUpdateTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import androidx.annotation.NonNull;

import io.split.android.client.common.CompressionUtilProvider;
import io.split.android.client.streaming.support.CompressionUtilProvider;
import io.split.android.client.service.executor.SplitTaskExecutor;
import io.split.android.client.service.sseclient.notifications.MySegmentsV2PayloadDecoder;
import io.split.android.client.service.sseclient.notifications.NotificationParser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import java.util.concurrent.BlockingQueue;

import io.split.android.client.common.CompressionUtilProvider;
import io.split.android.client.streaming.support.CompressionUtilProvider;
import io.split.android.client.dtos.Helper;
import io.split.android.client.dtos.RuleBasedSegment;
import io.split.android.client.dtos.Split;
Expand All @@ -23,7 +23,7 @@
import io.split.android.client.storage.rbs.RuleBasedSegmentStorage;
import io.split.android.client.storage.splits.SplitsStorage;
import io.split.android.client.utils.Base64Util;
import io.split.android.client.utils.CompressionUtil;
import io.split.android.client.streaming.support.CompressionUtil;
import io.split.android.client.utils.Json;
import io.split.android.client.utils.logger.Logger;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.split.android.client.service.sseclient.spi;

import androidx.annotation.Nullable;

/**
* Exception thrown by streaming auth fetchers.
*/
public class StreamingAuthException extends Exception {

@Nullable
private final Integer mStatusCode;

public StreamingAuthException(String message) {
super(message);
mStatusCode = null;
}

public StreamingAuthException(String message, Throwable cause) {
super(message, cause);
mStatusCode = null;
}

public StreamingAuthException(String message, Throwable cause, Integer statusCode) {
super(message, cause);
mStatusCode = statusCode;
}

@Nullable
public Integer getStatusCode() {
return mStatusCode;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.split.android.client.service.sseclient.spi;

import androidx.annotation.NonNull;

import java.util.Map;

import io.split.android.client.service.sseclient.SseAuthenticationResponse;

/**
* Abstraction for fetching streaming authentication tokens.
*/
public interface StreamingAuthFetcher {

/**
* Executes the auth request with the provided parameters.
*
* @param params request parameters
* @return authentication response
* @throws StreamingAuthException when request fails
*/
@NonNull
SseAuthenticationResponse execute(@NonNull Map<String, Object> params) throws StreamingAuthException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.split.android.client.service.sseclient.spi;

import androidx.annotation.NonNull;
import androidx.annotation.Nullable;

/**
* Interface for scheduling delayed tasks within the streaming module.
* Implementations should provide timer/scheduling capabilities backed
* by the host application's task executor.
*/
public interface StreamingScheduler {

/**
* Schedules a task to run after the specified delay.
*
* @param task the runnable to execute
* @param delaySeconds delay before execution in seconds
* @param listener optional listener to be notified when task completes
* @return a unique task ID that can be used to cancel the task
*/
@NonNull
String schedule(@NonNull Runnable task, long delaySeconds, @Nullable TaskExecutionListener listener);

/**
* Cancels a previously scheduled task.
*
* @param taskId the ID returned by schedule()
*/
void cancel(@Nullable String taskId);

/**
* Listener interface for task completion notifications.
*/
interface TaskExecutionListener {
/**
* Called when a scheduled task has completed execution.
*/
void onTaskExecuted();
}
}
Loading
Loading