Skip to content

Commit bc28c21

Browse files
committed
Thoroughly test the sync event pipeline, couple tweaks
1 parent 8eb6dca commit bc28c21

File tree

2 files changed

+92
-7
lines changed

2 files changed

+92
-7
lines changed

Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System.Collections.Generic;
22
using System.Threading;
3+
using System.Threading.Tasks;
34
using global::System;
45
using global::System.Linq;
56
using Segment.Analytics.Policies;
@@ -70,9 +71,15 @@ public SyncEventPipeline(
7071
public void Put(RawEvent @event) => _writeChannel.Send(@event);
7172

7273
public void Flush() {
73-
FlushEvent flushEvent = new FlushEvent(new SemaphoreSlim(1,1));
74+
FlushEvent flushEvent = new FlushEvent(new SemaphoreSlim(1));
75+
// Set it up to be released by Upload
76+
flushEvent._semaphore.Wait();
7477
_writeChannel.Send(flushEvent);
75-
flushEvent._semaphore.Wait(_flushTimeout, _flushCancellationToken);
78+
// Wait until the single slot in the semaphore is free. (if it's running!)
79+
if (Running && !_uploadChannel.isCancelled)
80+
{
81+
flushEvent._semaphore.Wait(_flushTimeout, _flushCancellationToken);
82+
}
7683
}
7784

7885
public void Start()
@@ -148,6 +155,7 @@ private void Upload() => _analytics.AnalyticsScope.Launch(_analytics.NetworkIODi
148155
Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " performing flush");
149156

150157
await Scope.WithContext(_analytics.FileIODispatcher, async () => await _storage.Rollover());
158+
await Task.Delay(1); // The rollover might not quite be done, even 1 ms prevents an empty URL list
151159

152160
string[] fileUrlList = _storage.Read(StorageConstants.Events).Split(',');
153161
foreach (string url in fileUrlList)

Tests/Utilities/EventPipelineTest.cs

Lines changed: 82 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
using Segment.Serialization;
1111
using Tests.Utils;
1212
using Xunit;
13+
using System.Linq;
1314

1415
namespace Tests.Utilities
1516
{
@@ -225,6 +226,86 @@ public void TestConfigWithEventPipelineProviders(IEventPipelineProvider provider
225226
analytics.Track("test");
226227
}
227228

229+
[Fact]
230+
public void TestSyncEventPipelineProviderWaits()
231+
{
232+
const int iterations = 100;
233+
const int newAnalyticsEvery = 10;
234+
const int eventCount = 10;
235+
236+
int totalTracks = 0;
237+
int totalUploads = 0;
238+
239+
_mockHttpClient
240+
.Setup(client => client.Upload(It.IsAny<byte[]>()))
241+
.Callback<byte[]>(bytes =>
242+
{
243+
string content = System.Text.Encoding.UTF8.GetString(bytes);
244+
int count = content.Split(new string[] { "test" }, StringSplitOptions.None).Length - 1;
245+
totalUploads += count;
246+
})
247+
.ReturnsAsync(true);
248+
249+
var config = new Configuration(
250+
writeKey: "123",
251+
useSynchronizeDispatcher: true,
252+
flushInterval: 100000,
253+
flushAt: eventCount * 2,
254+
httpClientProvider: new MockHttpClientProvider(_mockHttpClient),
255+
storageProvider: new InMemoryStorageProvider(),
256+
eventPipelineProvider: new SyncEventPipelineProvider()
257+
);
258+
259+
var analytics = new Analytics(config);
260+
for (int j = 0; j < iterations; j++)
261+
{
262+
if (j % newAnalyticsEvery == 0)
263+
{
264+
analytics = new Analytics(config);
265+
}
266+
_mockHttpClient.Invocations.Clear();
267+
for (int i = 0; i < eventCount; i++)
268+
{
269+
analytics.Track($"test {i}");
270+
totalTracks++;
271+
}
272+
analytics.Flush();
273+
274+
#pragma warning disable CS4014 // Silly compiler, this isn't an invocation so it doesn't need to be awaited
275+
_mockHttpClient.Verify(client => client.Upload(It.IsAny<byte[]>()), Times.AtLeastOnce, $"Iteration {j} of {eventCount}");
276+
#pragma warning restore CS4014
277+
IInvocation lastUploadInvocation = _mockHttpClient.Invocations.Last(invocation => invocation.Method.Name == "Upload");
278+
int testsUploaded = System.Text.Encoding.UTF8
279+
.GetString((byte[])lastUploadInvocation.Arguments[0])
280+
.Split(new string[] { "test" }, StringSplitOptions.None).Length - 1;
281+
Assert.Equal(eventCount, testsUploaded);
282+
}
283+
Assert.Equal(totalTracks, totalUploads);
284+
}
285+
286+
[Fact]
287+
public void TestRepeatedFlushesDontHang()
288+
{
289+
var config = new Configuration(
290+
writeKey: "123",
291+
useSynchronizeDispatcher: true,
292+
flushInterval: 0,
293+
flushAt: 1,
294+
httpClientProvider: new MockHttpClientProvider(_mockHttpClient),
295+
storageProvider: new MockStorageProvider(_storage),
296+
eventPipelineProvider: new SyncEventPipelineProvider(5000)
297+
);
298+
var analytics = new Analytics(config);
299+
analytics.Track("test");
300+
DateTime startTime = DateTime.Now;
301+
analytics.Flush();
302+
analytics.Flush();
303+
analytics.Flush();
304+
analytics.Flush();
305+
analytics.Flush();
306+
Assert.True(DateTime.Now - startTime < TimeSpan.FromMilliseconds(100));
307+
}
308+
228309
[Fact]
229310
public void TestConfigWithCustomEventPipelineProvider()
230311
{
@@ -248,13 +329,9 @@ public void TestConfigWithCustomEventPipelineProvider()
248329

249330
public class CustomEventPipelineProvider : IEventPipelineProvider
250331
{
251-
public CustomEventPipelineProvider()
252-
{
253-
}
254-
332+
public CustomEventPipelineProvider() {}
255333
public IEventPipeline Create(Analytics analytics, string key)
256334
{
257-
// Custom implementation
258335
return new CustomEventPipeline(analytics, key);
259336
}
260337

0 commit comments

Comments
 (0)