@@ -11,6 +11,13 @@ import io.getunleash.android.errors.ServerException
1111import io.getunleash.android.events.HeartbeatEvent
1212import io.getunleash.android.http.Throttler
1313import io.getunleash.android.unleashScope
14+ import java.io.Closeable
15+ import java.io.IOException
16+ import java.util.concurrent.TimeUnit
17+ import java.util.concurrent.atomic.AtomicReference
18+ import kotlin.coroutines.CoroutineContext
19+ import kotlin.coroutines.resume
20+ import kotlin.coroutines.resumeWithException
1421import kotlinx.coroutines.Dispatchers
1522import kotlinx.coroutines.channels.BufferOverflow
1623import kotlinx.coroutines.flow.MutableSharedFlow
@@ -30,54 +37,55 @@ import okhttp3.OkHttpClient
3037import okhttp3.Request
3138import okhttp3.Response
3239import okhttp3.internal.closeQuietly
33- import java.io.Closeable
34- import java.io.IOException
35- import java.util.concurrent.TimeUnit
36- import java.util.concurrent.atomic.AtomicReference
37- import kotlin.coroutines.CoroutineContext
38- import kotlin.coroutines.resume
39- import kotlin.coroutines.resumeWithException
4040
4141/* *
42- * Http Client for fetching data from Unleash Proxy.
43- * By default creates an OkHttpClient with readTimeout set to 2 seconds and a cache of 10 MBs
44- * @param httpClient - the http client to use for fetching toggles from Unleash proxy
42+ * Http Client for fetching data from Unleash Proxy. By default creates an OkHttpClient with
43+ * readTimeout set to 2 seconds and a cache of 10 MBs
44+ * @param httpClient
45+ * - the http client to use for fetching toggles from Unleash proxy
4546 */
4647open class UnleashFetcher (
47- unleashConfig : UnleashConfig ,
48- private val httpClient : OkHttpClient ,
49- private val unleashContext : StateFlow <UnleashContext >,
48+ unleashConfig : UnleashConfig ,
49+ private val httpClient : OkHttpClient ,
50+ private val unleashContext : StateFlow <UnleashContext >,
5051) : Closeable {
5152 companion object {
5253 private const val TAG = " UnleashFetcher"
5354 }
54-
55+ @Volatile private var contextForLastFetch : UnleashContext ? = null
5556 private val proxyUrl = unleashConfig.proxyUrl?.toHttpUrl()
56- private val applicationHeaders = unleashConfig.getApplicationHeaders(unleashConfig.pollingStrategy)
57+ private val applicationHeaders =
58+ unleashConfig.getApplicationHeaders(unleashConfig.pollingStrategy)
5759 private val appName = unleashConfig.appName
5860 private var etag: String? = null
59- private val featuresReceivedFlow = MutableSharedFlow <UnleashState >(
60- replay = 1 ,
61- onBufferOverflow = BufferOverflow .DROP_OLDEST
62- )
63- private val fetcherHeartbeatFlow = MutableSharedFlow <HeartbeatEvent >(
64- extraBufferCapacity = 5 ,
65- onBufferOverflow = BufferOverflow .DROP_OLDEST
66- )
61+ private val featuresReceivedFlow =
62+ MutableSharedFlow <UnleashState >(
63+ replay = 1 ,
64+ onBufferOverflow = BufferOverflow .DROP_OLDEST
65+ )
66+ private val fetcherHeartbeatFlow =
67+ MutableSharedFlow <HeartbeatEvent >(
68+ extraBufferCapacity = 5 ,
69+ onBufferOverflow = BufferOverflow .DROP_OLDEST
70+ )
6771 private val coroutineContextForContextChange: CoroutineContext = Dispatchers .IO
6872 private val currentCall = AtomicReference <Call ?>(null )
6973 private val throttler =
70- Throttler (
71- TimeUnit .MILLISECONDS .toSeconds(unleashConfig.pollingStrategy.interval),
72- longestAcceptableIntervalSeconds = 300 ,
73- proxyUrl.toString()
74- )
74+ Throttler (
75+ TimeUnit .MILLISECONDS .toSeconds(unleashConfig.pollingStrategy.interval),
76+ longestAcceptableIntervalSeconds = 300 ,
77+ proxyUrl.toString()
78+ )
7579
7680 fun getFeaturesReceivedFlow () = featuresReceivedFlow.asSharedFlow()
7781
7882 fun startWatchingContext () {
7983 unleashScope.launch {
80- unleashContext.distinctUntilChanged { old, new -> old != new }.collect {
84+ unleashContext.collect {
85+ if (it == contextForLastFetch) {
86+ Log .d(TAG , " Context unchanged, skipping refresh toggles" )
87+ return @collect
88+ }
8189 withContext(coroutineContextForContextChange) {
8290 Log .d(TAG , " Unleash context changed: $it " )
8391 refreshToggles()
@@ -89,7 +97,7 @@ open class UnleashFetcher(
8997 suspend fun refreshToggles (): ToggleResponse {
9098 if (throttler.performAction()) {
9199 Log .d(TAG , " Refreshing toggles" )
92- val response = refreshTogglesWithContext (unleashContext.value)
100+ val response = doFetchToggles (unleashContext.value)
93101 fetcherHeartbeatFlow.emit(HeartbeatEvent (response.status, response.error?.message))
94102 return response
95103 }
@@ -98,15 +106,28 @@ open class UnleashFetcher(
98106 return ToggleResponse (Status .THROTTLED )
99107 }
100108
101- internal suspend fun refreshTogglesWithContext (ctx : UnleashContext ): ToggleResponse {
109+ suspend fun refreshTogglesWithContext (ctx : UnleashContext ): ToggleResponse {
110+ if (throttler.performAction()) {
111+ Log .d(TAG , " Refreshing toggles" )
112+ val response = doFetchToggles(ctx)
113+ fetcherHeartbeatFlow.emit(HeartbeatEvent (response.status, response.error?.message))
114+ return response
115+ }
116+ Log .i(TAG , " Skipping refresh toggles due to throttling" )
117+ fetcherHeartbeatFlow.emit(HeartbeatEvent (Status .THROTTLED ))
118+ return ToggleResponse (Status .THROTTLED )
119+ }
120+
121+ internal suspend fun doFetchToggles (ctx : UnleashContext ): ToggleResponse {
122+ contextForLastFetch = ctx
102123 val response = fetchToggles(ctx)
103124 if (response.isSuccess()) {
104125
105- val toggles = response.config !! .toggles.groupBy { it.name }
106- .mapValues { (_, v) -> v.first() }
126+ val toggles =
127+ response.config !! .toggles.groupBy { it.name } .mapValues { (_, v) -> v.first() }
107128 Log .d(
108- TAG ,
109- " Fetched new state with ${toggles.size} toggles, emitting featuresReceivedFlow"
129+ TAG ,
130+ " Fetched new state with ${toggles.size} toggles, emitting featuresReceivedFlow"
110131 )
111132 featuresReceivedFlow.emit(UnleashState (ctx, toggles))
112133 return ToggleResponse (response.status, toggles)
@@ -124,26 +145,31 @@ open class UnleashFetcher(
124145
125146 private suspend fun fetchToggles (ctx : UnleashContext ): FetchResponse {
126147 if (proxyUrl == null ) {
127- return FetchResponse (Status .FAILED , error = IllegalStateException (" Proxy URL is not set" ))
148+ return FetchResponse (
149+ Status .FAILED ,
150+ error = IllegalStateException (" Proxy URL is not set" )
151+ )
128152 }
129153 val contextUrl = buildContextUrl(ctx)
130154 try {
131- val request = Request .Builder ().url(contextUrl)
132- .headers(applicationHeaders.toHeaders())
155+ val request = Request .Builder ().url(contextUrl).headers(applicationHeaders.toHeaders())
133156 if (etag != null ) {
134157 request.header(" If-None-Match" , etag!! )
135158 }
136159 val call = this .httpClient.newCall(request.build())
137160 val inFlightCall = currentCall.get()
138161 if (! currentCall.compareAndSet(inFlightCall, call)) {
139162 return FetchResponse (
140- Status .FAILED ,
141- error = IllegalStateException (" Failed to set new call while ${inFlightCall?.request()?.url} is in flight" )
163+ Status .FAILED ,
164+ error =
165+ IllegalStateException (
166+ " Failed to set new call while ${inFlightCall?.request()?.url} is in flight"
167+ )
142168 )
143- } else if (inFlightCall != null && ! inFlightCall.isCanceled()) {
169+ } else if (inFlightCall != null && ! inFlightCall.isCanceled() && ! inFlightCall.isExecuted() ) {
144170 Log .d(
145- TAG ,
146- " Cancelling previous ${inFlightCall.request().method} ${inFlightCall.request().url} "
171+ TAG ,
172+ " Cancelling previous ${inFlightCall.request().method} ${inFlightCall.request().url} "
147173 )
148174 inFlightCall.cancel()
149175 }
@@ -159,23 +185,21 @@ open class UnleashFetcher(
159185 res.body?.use { b ->
160186 try {
161187 val proxyResponse: ProxyResponse =
162- proxyResponseAdapter.fromJson(b.string())!!
188+ proxyResponseAdapter.fromJson(b.string())!!
163189 FetchResponse (Status .SUCCESS , proxyResponse)
164190 } catch (e: Exception ) {
165191 // If we fail to parse, just keep data
166192 FetchResponse (Status .FAILED , error = e)
167193 }
168- } ? : FetchResponse (Status .FAILED , error = NoBodyException ())
194+ }
195+ ? : FetchResponse (Status .FAILED , error = NoBodyException ())
169196 }
170-
171197 res.code == 304 -> {
172198 FetchResponse (Status .NOT_MODIFIED )
173199 }
174-
175200 res.code == 401 -> {
176201 FetchResponse (Status .FAILED , error = NotAuthorizedException ())
177202 }
178-
179203 else -> {
180204 FetchResponse (Status .FAILED , error = ServerException (res.code))
181205 }
@@ -188,31 +212,33 @@ open class UnleashFetcher(
188212
189213 private suspend fun Call.await (): Response {
190214 return suspendCancellableCoroutine { continuation ->
191- enqueue(object : Callback {
192- override fun onResponse (call : Call , response : Response ) {
193- continuation.resume(response)
194- }
215+ enqueue(
216+ object : Callback {
217+ override fun onResponse (call : Call , response : Response ) {
218+ continuation.resume(response)
219+ }
195220
196- override fun onFailure (call : Call , e : IOException ) {
197- // Don't bother with resuming the continuation if it is already cancelled.
198- if (continuation.isCancelled) return
199- continuation.resumeWithException(e)
200- }
201- })
221+ override fun onFailure (call : Call , e : IOException ) {
222+ // Don't bother with resuming the continuation if it is already
223+ // cancelled.
224+ if (continuation.isCancelled) return
225+ continuation.resumeWithException(e)
226+ }
227+ }
228+ )
202229
203230 continuation.invokeOnCancellation {
204231 try {
205232 cancel()
206233 } catch (ex: Throwable ) {
207- // Ignore cancel exception
234+ // Ignore cancel exception
208235 }
209236 }
210237 }
211238 }
212239
213240 private fun buildContextUrl (ctx : UnleashContext ): HttpUrl {
214- var contextUrl = proxyUrl!! .newBuilder()
215- .addQueryParameter(" appName" , appName)
241+ var contextUrl = proxyUrl!! .newBuilder().addQueryParameter(" appName" , appName)
216242 if (ctx.userId != null ) {
217243 contextUrl.addQueryParameter(" userId" , ctx.userId)
218244 }
0 commit comments