Skip to content

Fix issue where user-supplied enrichments were lost during the startu… #133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Analytics-CSharp/Segment/Analytics/Analytics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ public void Process(RawEvent incomingEvent, Func<RawEvent, RawEvent> enrichment
{
if (!Enable) return;

incomingEvent.ApplyRawEventData(_userInfo);
incomingEvent.ApplyRawEventData(_userInfo, enrichment);
AnalyticsScope.Launch(AnalyticsDispatcher, () =>
{
Timeline.Process(incomingEvent, enrichment);
Timeline.Process(incomingEvent);
});
}

Expand Down
2 changes: 1 addition & 1 deletion Analytics-CSharp/Segment/Analytics/Plugins/StartupQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private void ReplayEvents()
{
if (_queuedEvents.TryDequeue(out RawEvent e))
{
Analytics.Process(e);
Analytics.Process(e, e.Enrichment);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions Analytics-CSharp/Segment/Analytics/Timeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ public class Timeline
/// <param name="incomingEvent">event to be processed</param>
/// <param name="enrichment">a closure that enables enrichment on the generated event</param>
/// <returns>event after processing</returns>
internal RawEvent Process(RawEvent incomingEvent, Func<RawEvent, RawEvent> enrichment = default)
internal RawEvent Process(RawEvent incomingEvent)
{
// Apply before and enrichment types first to start the timeline processing.
RawEvent beforeResult = ApplyPlugins(PluginType.Before, incomingEvent);
// Enrichment is like middleware, a chance to update the event across the board before going to destinations.
RawEvent enrichmentResult = ApplyPlugins(PluginType.Enrichment, beforeResult);
if (enrichment != null)
if (enrichmentResult != null && enrichmentResult.Enrichment != null)
{
enrichmentResult = enrichment(enrichmentResult);
enrichmentResult = enrichmentResult.Enrichment(enrichmentResult);
}

// Make sure not to update the events during this next cycle. Since each destination may want different
Expand Down
5 changes: 4 additions & 1 deletion Analytics-CSharp/Segment/Analytics/Types.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public abstract class RawEvent
public virtual string UserId { get; set; }
public virtual string Timestamp { get; set; }

public Func<RawEvent, RawEvent> Enrichment { get; set; }

// JSON types
public JsonObject Context { get; set; }
public JsonObject Integrations { get; set; }
Expand All @@ -36,8 +38,9 @@ internal void ApplyRawEventData(RawEvent rawEvent)
Integrations = rawEvent.Integrations;
}

internal void ApplyRawEventData(UserInfo userInfo)
internal void ApplyRawEventData(UserInfo userInfo, Func<RawEvent, RawEvent> enrichment)
{
Enrichment = enrichment;
MessageId = Guid.NewGuid().ToString();
Context = new JsonObject();
Timestamp = DateTime.UtcNow.ToString("o"); // iso8601
Expand Down
259 changes: 258 additions & 1 deletion Tests/EventsTest.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Threading;
using Moq;
using Segment.Analytics;
using Segment.Analytics.Utilities;
Expand Down Expand Up @@ -690,4 +691,260 @@ public void TestAliasEnrichment()
Assert.Equal("test", actual[0].AnonymousId);
}
}

public class DelayedEventsTest
{
private readonly Analytics _analytics;

private Settings? _settings;

private readonly Mock<StubEventPlugin> _plugin;

private readonly Mock<StubAfterEventPlugin> _afterPlugin;

private readonly SemaphoreSlim _httpSemaphore;
private readonly SemaphoreSlim _assertSemaphore;
private readonly List<RawEvent> _actual;

public DelayedEventsTest()
{
_httpSemaphore = new SemaphoreSlim(0);
_assertSemaphore = new SemaphoreSlim(0);
_settings = JsonUtility.FromJson<Settings?>(
"{\"integrations\":{\"Segment.io\":{\"apiKey\":\"1vNgUqwJeCHmqgI9S1sOm9UHCyfYqbaQ\"}},\"plan\":{},\"edgeFunction\":{}}");

var mockHttpClient = new Mock<HTTPClient>(null, null, null);
mockHttpClient
.Setup(httpClient => httpClient.Settings())
.Returns(async () =>
{
// suspend http calls until we tracked events
// this will force events get into startup queue
await _httpSemaphore.WaitAsync();
return _settings;
});

_plugin = new Mock<StubEventPlugin>
{
CallBase = true
};

_afterPlugin = new Mock<StubAfterEventPlugin> { CallBase = true };
_actual = new List<RawEvent>();
_afterPlugin.Setup(o => o.Execute(Capture.In(_actual)))
.Returns((RawEvent e) =>
{
// since this is an after plugin, when its execute function is called,
// it is guaranteed that the enrichment closure has been called.
// so we can release the semaphore on assertions.
_assertSemaphore.Release();
return e;
});

var config = new Configuration(
writeKey: "123",
storageProvider: new DefaultStorageProvider("tests"),
autoAddSegmentDestination: false,
useSynchronizeDispatcher: false, // we need async analytics to buildup events on start queue
httpClientProvider: new MockHttpClientProvider(mockHttpClient)
);
_analytics = new Analytics(config);
}

[Fact]
public void TestTrackEnrichment()
{
string expectedEvent = "foo";
string expectedAnonymousId = "bar";

_analytics.Add(_afterPlugin.Object);
_analytics.Track(expectedEvent, enrichment: @event =>
{
@event.AnonymousId = expectedAnonymousId;
return @event;
});

// now we have tracked event, i.e. event added to startup queue
// release the semaphore put on http client, so we startup queue will replay the events
_httpSemaphore.Release();
// now we need to wait for events being fully replayed before making assertions
_assertSemaphore.Wait();

Assert.NotEmpty(_actual);
Assert.IsType<TrackEvent>(_actual[0]);
var actual = _actual[0] as TrackEvent;
Debug.Assert(actual != null, nameof(actual) + " != null");
Assert.True(actual.Properties.Count == 0);
Assert.Equal(expectedEvent, actual.Event);
Assert.Equal(expectedAnonymousId, actual.AnonymousId);
}

[Fact]
public void TestIdentifyEnrichment()
{
var expected = new JsonObject
{
["foo"] = "bar"
};
string expectedUserId = "newUserId";

_analytics.Add(_afterPlugin.Object);
_analytics.Identify(expectedUserId, expected, @event =>
{
if (@event is IdentifyEvent identifyEvent)
{
identifyEvent.Traits["foo"] = "baz";
}

return @event;
});

// now we have tracked event, i.e. event added to startup queue
// release the semaphore put on http client, so we startup queue will replay the events
_httpSemaphore.Release();
// now we need to wait for events being fully replayed before making assertions
_assertSemaphore.Wait();

string actualUserId = _analytics.UserId();

Assert.NotEmpty(_actual);
var actual = _actual[0] as IdentifyEvent;
Debug.Assert(actual != null, nameof(actual) + " != null");
Assert.Equal(expected, actual.Traits);
Assert.Equal(expectedUserId, actualUserId);
}

[Fact]
public void TestScreenEnrichment()
{
var expected = new JsonObject
{
["foo"] = "bar"
};
string expectedTitle = "foo";
string expectedCategory = "bar";

_analytics.Add(_afterPlugin.Object);
_analytics.Screen(expectedTitle, expected, expectedCategory, @event =>
{
if (@event is ScreenEvent screenEvent)
{
screenEvent.Properties["foo"] = "baz";
}

return @event;
});

// now we have tracked event, i.e. event added to startup queue
// release the semaphore put on http client, so we startup queue will replay the events
_httpSemaphore.Release();
// now we need to wait for events being fully replayed before making assertions
_assertSemaphore.Wait();

Assert.NotEmpty(_actual);
var actual = _actual[0] as ScreenEvent;
Debug.Assert(actual != null, nameof(actual) + " != null");
Assert.Equal(expected, actual.Properties);
Assert.Equal(expectedTitle, actual.Name);
Assert.Equal(expectedCategory, actual.Category);
}

[Fact]
public void TestPageEnrichment()
{
var expected = new JsonObject
{
["foo"] = "bar"
};
string expectedTitle = "foo";
string expectedCategory = "bar";

_analytics.Add(_afterPlugin.Object);
_analytics.Page(expectedTitle, expected, expectedCategory, @event =>
{
if (@event is PageEvent pageEvent)
{
pageEvent.Properties["foo"] = "baz";
}

return @event;
});

// now we have tracked event, i.e. event added to startup queue
// release the semaphore put on http client, so we startup queue will replay the events
_httpSemaphore.Release();
// now we need to wait for events being fully replayed before making assertions
_assertSemaphore.Wait();

Assert.NotEmpty(_actual);
var actual = _actual[0] as PageEvent;
Debug.Assert(actual != null, nameof(actual) + " != null");
Assert.Equal(expected, actual.Properties);
Assert.Equal(expectedTitle, actual.Name);
Assert.Equal(expectedCategory, actual.Category);
Assert.Equal("page", actual.Type);
}

[Fact]
public void TestGroupEnrichment()
{
var expected = new JsonObject
{
["foo"] = "bar"
};
string expectedGroupId = "foo";

_analytics.Add(_afterPlugin.Object);
_analytics.Group(expectedGroupId, expected, @event =>
{
if (@event is GroupEvent groupEvent)
{
groupEvent.Traits["foo"] = "baz";
}

return @event;
});

// now we have tracked event, i.e. event added to startup queue
// release the semaphore put on http client, so we startup queue will replay the events
_httpSemaphore.Release();
// now we need to wait for events being fully replayed before making assertions
_assertSemaphore.Wait();

Assert.NotEmpty(_actual);
var actual = _actual[0] as GroupEvent;
Debug.Assert(actual != null, nameof(actual) + " != null");
Assert.Equal(expected, actual.Traits);
Assert.Equal(expectedGroupId, actual.GroupId);
}

[Fact]
public void TestAliasEnrichment()
{
string expected = "bar";

_analytics.Add(_afterPlugin.Object);
_analytics.Alias(expected, @event =>
{
if (@event is AliasEvent aliasEvent)
{
aliasEvent.AnonymousId = "test";
}

return @event;
});

// now we have tracked event, i.e. event added to startup queue
// release the semaphore put on http client, so we startup queue will replay the events
_httpSemaphore.Release();
// now we need to wait for events being fully replayed before making assertions
_assertSemaphore.Wait();

Assert.NotEmpty(_actual);
var actual = _actual.Find(o => o is AliasEvent) as AliasEvent;
Debug.Assert(actual != null, nameof(actual) + " != null");
Assert.Equal(expected, actual.UserId);
Assert.Equal("test", actual.AnonymousId);
}
}
}
Loading