Skip to content

Commit 690f8b4

Browse files
authored
Provider pattern for EventPipleline and new SyncEventPipeline for synchronous flush (#101)
* Provider pattern for EventPipleline and new SyncEventPipeline for synchronous flush * Modifying EventPipelineTest to test multiple types
1 parent 9963e25 commit 690f8b4

File tree

9 files changed

+337
-58
lines changed

9 files changed

+337
-58
lines changed

Analytics-CSharp/Segment/Analytics/Configuration.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ private set
4545

4646
public IList<IFlushPolicy> FlushPolicies { get; }
4747

48+
public IEventPipelineProvider EventPipelineProvider { get; }
49+
4850
/// <summary>
4951
/// Configuration that analytics can use
5052
/// </summary>
@@ -82,7 +84,8 @@ public Configuration(string writeKey,
8284
IAnalyticsErrorHandler analyticsErrorHandler = null,
8385
IStorageProvider storageProvider = default,
8486
IHTTPClientProvider httpClientProvider = default,
85-
IList<IFlushPolicy> flushPolicies = default)
87+
IList<IFlushPolicy> flushPolicies = default,
88+
EventPipelineProvider eventPipelineProvider = default)
8689
{
8790
WriteKey = writeKey;
8891
FlushAt = flushAt;
@@ -98,6 +101,7 @@ public Configuration(string writeKey,
98101
FlushPolicies = flushPolicies == null ? new ConcurrentList<IFlushPolicy>() : new ConcurrentList<IFlushPolicy>(flushPolicies);
99102
FlushPolicies.Add(new CountFlushPolicy(flushAt));
100103
FlushPolicies.Add(new FrequencyFlushPolicy(flushInterval * 1000L));
104+
EventPipelineProvider = eventPipelineProvider ?? new EventPipelineProvider();
101105
}
102106

103107
public Configuration(string writeKey,

Analytics-CSharp/Segment/Analytics/Plugins/SegmentDestination.cs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ namespace Segment.Analytics.Plugins
1515
/// </summary>
1616
public class SegmentDestination : DestinationPlugin, ISubscriber
1717
{
18-
private EventPipeline _pipeline = null;
18+
private IEventPipeline _pipeline = null;
1919

2020
public override string Key => "Segment.io";
2121

@@ -64,13 +64,7 @@ public override void Configure(Analytics analytics)
6464
// Add DestinationMetadata enrichment plugin
6565
Add(new DestinationMetadataPlugin());
6666

67-
_pipeline = new EventPipeline(
68-
analytics,
69-
Key,
70-
analytics.Configuration.WriteKey,
71-
analytics.Configuration.FlushPolicies,
72-
analytics.Configuration.ApiHost
73-
);
67+
_pipeline = analytics.Configuration.EventPipelineProvider.Create(analytics, Key);
7468

7569
analytics.AnalyticsScope.Launch(analytics.AnalyticsDispatcher, async () =>
7670
{

Analytics-CSharp/Segment/Analytics/Utilities/EventPipeline.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
namespace Segment.Analytics.Utilities
99
{
10-
internal class EventPipeline
10+
public class EventPipeline: IEventPipeline
1111
{
1212
private readonly Analytics _analytics;
1313

@@ -23,7 +23,7 @@ internal class EventPipeline
2323

2424
private readonly IStorage _storage;
2525

26-
internal string ApiHost { get; set; }
26+
public string ApiHost { get; set; }
2727

2828
public bool Running { get; private set; }
2929

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
namespace Segment.Analytics.Utilities
2+
{
3+
public class EventPipelineProvider:IEventPipelineProvider
4+
{
5+
public EventPipelineProvider()
6+
{
7+
}
8+
9+
public IEventPipeline Create(Analytics analytics, string key)
10+
{
11+
return new EventPipeline(analytics, key,
12+
analytics.Configuration.WriteKey,
13+
analytics.Configuration.FlushPolicies,
14+
analytics.Configuration.ApiHost);
15+
}
16+
}
17+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
namespace Segment.Analytics.Utilities
2+
{
3+
public interface IEventPipeline
4+
{
5+
bool Running { get; }
6+
string ApiHost { get; set; }
7+
8+
void Put(RawEvent @event);
9+
void Flush();
10+
void Start();
11+
void Stop();
12+
}
13+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
using System.Collections.Generic;
2+
3+
namespace Segment.Analytics.Utilities
4+
{
5+
public interface IEventPipelineProvider
6+
{
7+
IEventPipeline Create(Analytics analytics, string key);
8+
}
9+
}
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
using System.Collections.Generic;
2+
using System.Threading;
3+
using global::System;
4+
using global::System.Linq;
5+
using Segment.Analytics.Policies;
6+
using Segment.Concurrent;
7+
using Segment.Serialization;
8+
9+
namespace Segment.Analytics.Utilities
10+
{
11+
internal sealed class FlushEvent : RawEvent
12+
{
13+
public override string Type => "flush";
14+
public readonly SemaphoreSlim _semaphore;
15+
16+
internal FlushEvent(SemaphoreSlim semaphore)
17+
{
18+
_semaphore = semaphore;
19+
}
20+
}
21+
22+
23+
public class SyncEventPipeline: IEventPipeline
24+
{
25+
private readonly Analytics _analytics;
26+
27+
private readonly string _logTag;
28+
29+
private readonly IList<IFlushPolicy> _flushPolicies;
30+
31+
private Channel<RawEvent> _writeChannel;
32+
33+
private Channel<FlushEvent> _uploadChannel;
34+
35+
private readonly HTTPClient _httpClient;
36+
37+
private readonly IStorage _storage;
38+
39+
public string ApiHost { get; set; }
40+
41+
public bool Running { get; private set; }
42+
43+
internal int _flushTimeout = -1;
44+
internal CancellationToken _flushCancellationToken = CancellationToken.None;
45+
46+
public SyncEventPipeline(
47+
Analytics analytics,
48+
string logTag,
49+
string apiKey,
50+
IList<IFlushPolicy> flushPolicies,
51+
string apiHost = HTTPClient.DefaultAPIHost,
52+
int flushTimeout = -1,
53+
CancellationToken? flushCancellationToken = null)
54+
{
55+
_analytics = analytics;
56+
_logTag = logTag;
57+
_flushPolicies = flushPolicies;
58+
ApiHost = apiHost;
59+
60+
_writeChannel = new Channel<RawEvent>();
61+
_uploadChannel = new Channel<FlushEvent>();
62+
_httpClient = analytics.Configuration.HttpClientProvider.CreateHTTPClient(apiKey, apiHost: apiHost);
63+
_httpClient.AnalyticsRef = analytics;
64+
_storage = analytics.Storage;
65+
Running = false;
66+
_flushTimeout = flushTimeout;
67+
_flushCancellationToken = flushCancellationToken ?? CancellationToken.None;
68+
}
69+
70+
public void Put(RawEvent @event) => _writeChannel.Send(@event);
71+
72+
public void Flush() {
73+
FlushEvent flushEvent = new FlushEvent(new SemaphoreSlim(1,1));
74+
_writeChannel.Send(flushEvent);
75+
flushEvent._semaphore.Wait(_flushTimeout, _flushCancellationToken);
76+
}
77+
78+
public void Start()
79+
{
80+
if (Running) return;
81+
82+
// avoid to re-establish a channel if the pipeline just gets created
83+
if (_writeChannel.isCancelled)
84+
{
85+
_writeChannel = new Channel<RawEvent>();
86+
_uploadChannel = new Channel<FlushEvent>();
87+
}
88+
89+
Running = true;
90+
Schedule();
91+
Write();
92+
Upload();
93+
}
94+
95+
public void Stop()
96+
{
97+
if (!Running) return;
98+
Running = false;
99+
100+
_uploadChannel.Cancel();
101+
_writeChannel.Cancel();
102+
Unschedule();
103+
}
104+
105+
private void Write() => _analytics.AnalyticsScope.Launch(_analytics.FileIODispatcher, async () =>
106+
{
107+
while (!_writeChannel.isCancelled)
108+
{
109+
RawEvent e = await _writeChannel.Receive();
110+
bool isPoison = e is FlushEvent;
111+
112+
if (!isPoison)
113+
{
114+
try
115+
{
116+
string str = JsonUtility.ToJson(e);
117+
Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " running " + str);
118+
await _storage.Write(StorageConstants.Events, str);
119+
120+
foreach (IFlushPolicy flushPolicy in _flushPolicies)
121+
{
122+
flushPolicy.UpdateState(e);
123+
}
124+
}
125+
catch (Exception exception)
126+
{
127+
Analytics.Logger.Log(LogLevel.Error, exception, _logTag + ": Error writing events to storage.");
128+
}
129+
}
130+
131+
if (isPoison || _flushPolicies.Any(o => o.ShouldFlush()))
132+
{
133+
FlushEvent flushEvent = e as FlushEvent ?? new FlushEvent(null);
134+
_uploadChannel.Send(flushEvent);
135+
foreach (IFlushPolicy flushPolicy in _flushPolicies)
136+
{
137+
flushPolicy.Reset();
138+
}
139+
}
140+
}
141+
});
142+
143+
private void Upload() => _analytics.AnalyticsScope.Launch(_analytics.NetworkIODispatcher, async () =>
144+
{
145+
while (!_uploadChannel.isCancelled)
146+
{
147+
FlushEvent flushEvent = await _uploadChannel.Receive();
148+
Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " performing flush");
149+
150+
await Scope.WithContext(_analytics.FileIODispatcher, async () => await _storage.Rollover());
151+
152+
string[] fileUrlList = _storage.Read(StorageConstants.Events).Split(',');
153+
foreach (string url in fileUrlList)
154+
{
155+
if (string.IsNullOrEmpty(url))
156+
{
157+
continue;
158+
}
159+
160+
byte[] data = _storage.ReadAsBytes(url);
161+
if (data == null)
162+
{
163+
continue;
164+
}
165+
166+
bool shouldCleanup = true;
167+
try
168+
{
169+
shouldCleanup = await _httpClient.Upload(data);
170+
Analytics.Logger.Log(LogLevel.Debug, message: _logTag + " uploaded " + url);
171+
}
172+
catch (Exception e)
173+
{
174+
Analytics.Logger.Log(LogLevel.Error, e, _logTag + ": Error uploading to url");
175+
}
176+
177+
if (shouldCleanup)
178+
{
179+
_storage.RemoveFile(url);
180+
}
181+
}
182+
flushEvent._semaphore?.Release();
183+
}
184+
});
185+
186+
private void Schedule()
187+
{
188+
foreach (IFlushPolicy flushPolicy in _flushPolicies)
189+
{
190+
flushPolicy.Schedule(_analytics);
191+
}
192+
}
193+
194+
private void Unschedule()
195+
{
196+
foreach (IFlushPolicy flushPolicy in _flushPolicies)
197+
{
198+
flushPolicy.Unschedule();
199+
}
200+
}
201+
}
202+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using System.Threading;
2+
3+
namespace Segment.Analytics.Utilities
4+
{
5+
public class SyncEventPipelineProvider: IEventPipelineProvider
6+
{
7+
internal int _flushTimeout = -1;
8+
internal CancellationToken? _flushCancellationToken = null;
9+
10+
public SyncEventPipelineProvider(
11+
int flushTimeout = -1,
12+
CancellationToken? flushCancellationToken = null)
13+
{
14+
_flushTimeout = flushTimeout;
15+
_flushCancellationToken = flushCancellationToken;
16+
}
17+
18+
public IEventPipeline Create(Analytics analytics, string key)
19+
{
20+
return new SyncEventPipeline(analytics, key,
21+
analytics.Configuration.WriteKey,
22+
analytics.Configuration.FlushPolicies,
23+
analytics.Configuration.ApiHost,
24+
_flushTimeout,
25+
_flushCancellationToken);
26+
}
27+
}
28+
}

0 commit comments

Comments
 (0)