Skip to content

Commit 6d875a0

Browse files
authored
Fixing event pipeline provider type (#127)
* Fixing type so custom event pipeline providers can be used and adding test to validate they're getting used * Thoroughly test the sync event pipeline, couple tweaks * Fixing ubuntu version and PR comments
1 parent 534acf2 commit 6d875a0

File tree

5 files changed

+152
-7
lines changed

5 files changed

+152
-7
lines changed

.github/workflows/build.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@ on:
1010
jobs:
1111
cancel_previous:
1212
permissions: write-all
13-
runs-on: ubuntu-latest
13+
runs-on: ubuntu-22.04
1414
steps:
1515
- uses: styfle/[email protected]
1616
with:
1717
workflow_id: ${{ github.event.workflow.id }}
1818

1919
build:
2020
needs: cancel_previous
21-
runs-on: ubuntu-latest
21+
runs-on: ubuntu-22.04
2222

2323
steps:
2424
- uses: actions/checkout@v3

.github/workflows/release.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ on:
99
jobs:
1010
release:
1111
permissions: write-all
12-
runs-on: ubuntu-latest
12+
runs-on: ubuntu-22.04
1313
environment: deployment
1414

1515
steps:

Analytics-CSharp/Segment/Analytics/Configuration.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public Configuration(string writeKey,
8585
IStorageProvider storageProvider = default,
8686
IHTTPClientProvider httpClientProvider = default,
8787
IList<IFlushPolicy> flushPolicies = default,
88-
EventPipelineProvider eventPipelineProvider = default)
88+
IEventPipelineProvider eventPipelineProvider = default)
8989
{
9090
WriteKey = writeKey;
9191
FlushAt = flushAt;

Analytics-CSharp/Segment/Analytics/Utilities/SyncEventPipeline.cs

+7-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System.Collections.Generic;
22
using System.Threading;
3+
using System.Threading.Tasks;
34
using global::System;
45
using global::System.Linq;
56
using Segment.Analytics.Policies;
@@ -70,9 +71,12 @@ public SyncEventPipeline(
7071
public void Put(RawEvent @event) => _writeChannel.Send(@event);
7172

7273
public void Flush() {
73-
FlushEvent flushEvent = new FlushEvent(new SemaphoreSlim(1,1));
74-
_writeChannel.Send(flushEvent);
75-
flushEvent._semaphore.Wait(_flushTimeout, _flushCancellationToken);
74+
if (Running && !_uploadChannel.isCancelled)
75+
{
76+
FlushEvent flushEvent = new FlushEvent(new SemaphoreSlim(0));
77+
_writeChannel.Send(flushEvent);
78+
flushEvent._semaphore.Wait(_flushTimeout, _flushCancellationToken);
79+
}
7680
}
7781

7882
public void Start()

Tests/Utilities/EventPipelineTest.cs

+141
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
using Segment.Serialization;
1111
using Tests.Utils;
1212
using Xunit;
13+
using System.Linq;
1314

1415
namespace Tests.Utilities
1516
{
@@ -205,5 +206,145 @@ public async Task TestFlushInterruptedWhenNoFileExist(IEventPipelineProvider pro
205206
_mockHttpClient.Verify(o => o.Upload(_bytes), Times.Exactly(0));
206207
_storage.Verify(o => o.RemoveFile(_file), Times.Exactly(0));
207208
}
209+
210+
[Theory]
211+
[MemberData(nameof(GetPipelineProvider))]
212+
public void TestConfigWithEventPipelineProviders(IEventPipelineProvider provider)
213+
{
214+
// Just validate that the provider is used in the configuration
215+
var config = new Configuration(
216+
writeKey: "123",
217+
autoAddSegmentDestination: false,
218+
useSynchronizeDispatcher: true,
219+
flushInterval: 0,
220+
flushAt: 2,
221+
httpClientProvider: new MockHttpClientProvider(_mockHttpClient),
222+
storageProvider: new MockStorageProvider(_storage),
223+
eventPipelineProvider: provider
224+
);
225+
var analytics = new Analytics(config);
226+
analytics.Track("test");
227+
}
228+
229+
[Fact]
230+
public void TestSyncEventPipelineProviderWaits()
231+
{
232+
const int iterations = 100;
233+
const int newAnalyticsEvery = 10;
234+
const int eventCount = 10;
235+
236+
int totalTracks = 0;
237+
int totalUploads = 0;
238+
239+
_mockHttpClient
240+
.Setup(client => client.Upload(It.IsAny<byte[]>()))
241+
.Callback<byte[]>(bytes =>
242+
{
243+
string content = System.Text.Encoding.UTF8.GetString(bytes);
244+
int count = content.Split(new string[] { "test" }, StringSplitOptions.None).Length - 1;
245+
totalUploads += count;
246+
})
247+
.ReturnsAsync(true);
248+
249+
var config = new Configuration(
250+
writeKey: "123",
251+
useSynchronizeDispatcher: true,
252+
flushInterval: 100000,
253+
flushAt: eventCount * 2,
254+
httpClientProvider: new MockHttpClientProvider(_mockHttpClient),
255+
storageProvider: new InMemoryStorageProvider(),
256+
eventPipelineProvider: new SyncEventPipelineProvider()
257+
);
258+
259+
var analytics = new Analytics(config);
260+
for (int j = 0; j < iterations; j++)
261+
{
262+
if (j % newAnalyticsEvery == 0)
263+
{
264+
analytics = new Analytics(config);
265+
}
266+
_mockHttpClient.Invocations.Clear();
267+
for (int i = 0; i < eventCount; i++)
268+
{
269+
analytics.Track($"test {i}");
270+
totalTracks++;
271+
}
272+
analytics.Flush();
273+
274+
#pragma warning disable CS4014 // Silly compiler, this isn't an invocation so it doesn't need to be awaited
275+
_mockHttpClient.Verify(client => client.Upload(It.IsAny<byte[]>()), Times.AtLeastOnce, $"Iteration {j} of {eventCount}");
276+
#pragma warning restore CS4014
277+
IInvocation lastUploadInvocation = _mockHttpClient.Invocations.Last(invocation => invocation.Method.Name == "Upload");
278+
int testsUploaded = System.Text.Encoding.UTF8
279+
.GetString((byte[])lastUploadInvocation.Arguments[0])
280+
.Split(new string[] { "test" }, StringSplitOptions.None).Length - 1;
281+
Assert.Equal(eventCount, testsUploaded);
282+
}
283+
Assert.Equal(totalTracks, totalUploads);
284+
}
285+
286+
[Fact]
287+
public void TestRepeatedFlushesDontHang()
288+
{
289+
var config = new Configuration(
290+
writeKey: "123",
291+
useSynchronizeDispatcher: true,
292+
flushInterval: 0,
293+
flushAt: 1,
294+
httpClientProvider: new MockHttpClientProvider(_mockHttpClient),
295+
storageProvider: new MockStorageProvider(_storage),
296+
eventPipelineProvider: new SyncEventPipelineProvider(5000)
297+
);
298+
var analytics = new Analytics(config);
299+
analytics.Track("test");
300+
DateTime startTime = DateTime.Now;
301+
analytics.Flush();
302+
analytics.Flush();
303+
analytics.Flush();
304+
analytics.Flush();
305+
analytics.Flush();
306+
Assert.True(DateTime.Now - startTime < TimeSpan.FromMilliseconds(100));
307+
}
308+
309+
[Fact]
310+
public void TestConfigWithCustomEventPipelineProvider()
311+
{
312+
// Just validate that the provider is used in the configuration
313+
var config = new Configuration(
314+
writeKey: "123",
315+
useSynchronizeDispatcher: true,
316+
flushInterval: 0,
317+
flushAt: 1,
318+
httpClientProvider: new MockHttpClientProvider(_mockHttpClient),
319+
storageProvider: new MockStorageProvider(_storage),
320+
eventPipelineProvider: new CustomEventPipelineProvider()
321+
);
322+
Assert.Throws<NotImplementedException>(() => {
323+
var analytics = new Analytics(config);
324+
analytics.Track("test");
325+
analytics.Flush();
326+
});
327+
}
328+
329+
330+
public class CustomEventPipelineProvider : IEventPipelineProvider
331+
{
332+
public CustomEventPipelineProvider() {}
333+
public IEventPipeline Create(Analytics analytics, string key)
334+
{
335+
return new CustomEventPipeline(analytics, key);
336+
}
337+
338+
private class CustomEventPipeline : IEventPipeline
339+
{
340+
public CustomEventPipeline(Analytics analytics, string key) {}
341+
public bool Running => throw new NotImplementedException();
342+
public string ApiHost { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
343+
public void Flush() => throw new NotImplementedException();
344+
public void Put(RawEvent @event) => throw new NotImplementedException();
345+
public void Start() => throw new NotImplementedException();
346+
public void Stop() => throw new NotImplementedException();
347+
}
348+
}
208349
}
209350
}

0 commit comments

Comments
 (0)