Skip to content

Commit bead6b3

Browse files
CSHARP-2343: Add documentation sample for change stream.
1 parent d24d706 commit bead6b3

File tree

2 files changed

+64
-2
lines changed

2 files changed

+64
-2
lines changed

src/MongoDB.Driver.Core/Core/Misc/Feature.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public class Feature
2424
{
2525
#region static
2626
private static readonly Feature __aggregate = new Feature("Aggregate", new SemanticVersion(2, 2, 0));
27+
private static readonly Feature __aggregateAddFields = new Feature("AggregateAddFields", new SemanticVersion(3, 4, 0));
2728
private static readonly Feature __aggregateAllowDiskUse = new Feature("AggregateAllowDiskUse", new SemanticVersion(2, 6, 0));
2829
private static readonly Feature __aggregateBucketStage = new Feature("AggregateBucketStage", new SemanticVersion(3, 3, 11));
2930
private static readonly Feature __aggregateComment = new Feature("AggregateComment", new SemanticVersion(3, 6, 0, "rc0"));
@@ -75,6 +76,11 @@ public class Feature
7576
/// </summary>
7677
public static Feature Aggregate => __aggregate;
7778

79+
/// <summary>
80+
/// Gets the aggregate AddFields feature.
81+
/// </summary>
82+
public static Feature AggregateAddFields => __aggregateAddFields;
83+
7884
/// <summary>
7985
/// Gets the aggregate allow disk use feature.
8086
/// </summary>

tests/MongoDB.Driver.Examples/ChangeStreamExamples.cs

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,14 @@
1313
* limitations under the License.
1414
*/
1515

16-
using System;
17-
using System.Threading;
1816
using FluentAssertions;
1917
using MongoDB.Bson;
18+
using MongoDB.Driver.Core.Misc;
19+
using MongoDB.Driver.Core.TestHelpers.XunitExtensions;
2020
using MongoDB.Driver.Tests;
21+
using System;
22+
using System.Threading;
23+
using System.Threading.Tasks;
2124
using Xunit;
2225

2326
namespace MongoDB.Driver.Examples
@@ -122,5 +125,58 @@ public void ChangeStreamExample3()
122125
next.FullDocument.Should().Be(documents[1]);
123126
}
124127
}
128+
129+
[Fact]
130+
public void ChangestreamExample4()
131+
{
132+
RequireServer.Check().Supports(Feature.AggregateAddFields);
133+
134+
var client = DriverTestConfiguration.Client;
135+
var database = client.GetDatabase("ChangeStreamExamples");
136+
database.DropCollection("inventory");
137+
138+
var cancelationTokenSource = new CancellationTokenSource();
139+
try
140+
{
141+
var document = new BsonDocument("username", "alice");
142+
143+
Task.Run(() =>
144+
{
145+
var inventoryCollection = database.GetCollection<BsonDocument>("inventory");
146+
147+
while (!cancelationTokenSource.IsCancellationRequested)
148+
{
149+
Thread.Sleep(TimeSpan.FromMilliseconds(100));
150+
document["_id"] = ObjectId.GenerateNewId();
151+
inventoryCollection.InsertOne(document);
152+
}
153+
}, cancelationTokenSource.Token);
154+
155+
// Start Changestream Example 4
156+
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
157+
.Match(change =>
158+
change.FullDocument["username"] == "alice" ||
159+
change.OperationType == ChangeStreamOperationType.Delete)
160+
.AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
161+
"{ $addFields : { newField : 'this is an added field!' } }");
162+
163+
var collection = database.GetCollection<BsonDocument>("inventory");
164+
using (var changeStream = collection.Watch(pipeline))
165+
{
166+
using (var enumerator = changeStream.ToEnumerable().GetEnumerator())
167+
{
168+
if (enumerator.MoveNext())
169+
{
170+
var next = enumerator.Current;
171+
}
172+
}
173+
}
174+
// End Changestream Example 4
175+
}
176+
finally
177+
{
178+
cancelationTokenSource.Cancel();
179+
}
180+
}
125181
}
126182
}

0 commit comments

Comments
 (0)