Skip to content

Commit 58496b0

Browse files
committed
Fix Partitioned SessionWindow bug, where restoring from checkpoint triggers an exception.
Cause of the Issue: The PartitionedSessionWindowPipe keeps multiple dictionary states. One of the dictionary does not have a [DataMember] attribute, Because the value type is a LinkedList which does not support serialization. On checkpoint and then Restore, this dictionary is re-created using other data members in UpdatePointers callback. During this, the scenario of empty LinkedList value is missed and not restored. When next data event appears for the partition, the partitionKey is indexed on the dictionary resulting in KeyNotFoundException Regression No, this existed for a long time. The customer hit the bug now because of workaround suggested (for another bug) to use 'timestamp by .. over' clause. ReproSteps: Added as a Testcase. Fix: Added code to restore key with empty LinkedList. Verified that this was the exact state just before checkpoint. Added a testcase that triggers this bug and with the fix the testcase passes.
1 parent dc69f56 commit 58496b0

File tree

2 files changed

+112
-1
lines changed

2 files changed

+112
-1
lines changed

Sources/Core/Microsoft.StreamProcessing/Operators/SessionWindow/PartitionedSessionWindowPipe.cs

+12-1
Original file line numberDiff line numberDiff line change
@@ -249,15 +249,26 @@ public override int CurrentlyBufferedInputCount
249249

250250
protected override void UpdatePointers()
251251
{
252+
// This method restores the member 'this.orderedKeysDictionary'
253+
// The dictionary is not serializable because of the LinkedList value type,
254+
// and hence it does not have [DataMember] attribute
252255
int iter = FastDictionary<TKey, long>.IteratorStart;
253256
var temp = new List<Tuple<TKey, long, TPartitionKey>>();
254257
while (this.lastDataTimeDictionary.Iterate(ref iter))
255258
{
259+
var partitionKey = this.getPartitionKey(this.lastDataTimeDictionary.entries[iter].key);
260+
256261
if (this.stateDictionary.entries[iter].value.Any())
257262
{
258263
temp.Add(Tuple.Create(
259264
this.lastDataTimeDictionary.entries[iter].key,
260-
Math.Min(this.lastDataTimeDictionary.entries[iter].value + this.sessionTimeout, this.windowEndTimeDictionary.entries[iter].value), this.getPartitionKey(this.lastDataTimeDictionary.entries[iter].key)));
265+
Math.Min(this.lastDataTimeDictionary.entries[iter].value + this.sessionTimeout, this.windowEndTimeDictionary.entries[iter].value),
266+
partitionKey));
267+
}
268+
else if (!this.orderedKeysDictionary.ContainsKey(partitionKey))
269+
{
270+
// We still need to restore the empty list - as that was the case just before checkpoint
271+
this.orderedKeysDictionary.Add(partitionKey, new LinkedList<TKey>());
261272
}
262273
}
263274
foreach (var item in temp.OrderBy(o => o.Item2))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// *********************************************************************
2+
// Copyright (c) Microsoft Corporation. All rights reserved.
3+
// Licensed under the MIT License
4+
// *********************************************************************
5+
using System;
6+
using System.Collections.Generic;
7+
using System.IO;
8+
using System.Linq;
9+
using System.Reactive.Linq;
10+
using System.Reactive.Subjects;
11+
using Microsoft.StreamProcessing;
12+
using Microsoft.VisualStudio.TestTools.UnitTesting;
13+
14+
namespace SimpleTesting
15+
{
16+
/* This testcase verifies fix for bug where PartitionedSessionWindowPipe is not restored (from checkpoint) properly causing an exception
17+
*
18+
* Cause of the Bug:
19+
* The PartitionedSessionWindowPipe keeps multiple dictionary states.
20+
* One of the dictionary does not have a [DataMember] attribute, Because the value type is a LinkedList which does not support serialization.
21+
* On checkpoint and then Restore, this dictionary is re-created using other data members in UpdatePointers callback.
22+
* During this, the scenario of empty LinkedList value is missed and not restored.
23+
* When next data event appears for the partition, the partitionKey is indexed on the dictionary resulting in KeyNotFoundException
24+
*/
25+
[TestClass]
26+
public class PartitionedStreamCheckpointTests : TestWithConfigSettingsWithoutMemoryLeakDetection
27+
{
28+
[TestMethod, TestCategory("Gated")]
29+
public void CheckpointPartitionedSessionWindow()
30+
{
31+
Config.DataBatchSize = 1;
32+
33+
var data = new PartitionedStreamEvent<int, double>[]
34+
{
35+
PartitionedStreamEvent.CreatePoint(0, 5, 1.0),
36+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 8),
37+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 11),
38+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 14),
39+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 17),
40+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 21),
41+
PartitionedStreamEvent.CreatePoint(0, 24, 1.0),
42+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 100),
43+
};
44+
45+
var expected = new PartitionedStreamEvent<int, double>[]
46+
{
47+
PartitionedStreamEvent.CreateStart(0, 5, 1.0),
48+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 8),
49+
PartitionedStreamEvent.CreateEnd(0, 9, 5, 1.0),
50+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 11),
51+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 14),
52+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 17),
53+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 21),
54+
PartitionedStreamEvent.CreateStart(0, 24, 1.0),
55+
PartitionedStreamEvent.CreateEnd(0, 28, 24, 1.0),
56+
PartitionedStreamEvent.CreatePunctuation<int, double>(0, 100),
57+
};
58+
59+
// This index represents the point when the checkpoint restore needs to happen to trigger the bug.
60+
const int checkpointIndex = 6;
61+
62+
var subject = new Subject<PartitionedStreamEvent<int, double>>();
63+
var output = new List<PartitionedStreamEvent<int, double>>();
64+
var process = CreateQueryContainerForPartitionedStream(subject, output);
65+
66+
for (int i = 0; i < data.Length; i++)
67+
{
68+
if (i == checkpointIndex)
69+
{
70+
using (var ms = new MemoryStream())
71+
{
72+
process.Checkpoint(ms);
73+
ms.Seek(0, SeekOrigin.Begin);
74+
75+
subject = new Subject<PartitionedStreamEvent<int, double>>();
76+
process = CreateQueryContainerForPartitionedStream(subject, output, ms);
77+
}
78+
}
79+
80+
subject.OnNext(data[i]);
81+
}
82+
83+
Assert.IsTrue(expected.SequenceEqual(output));
84+
}
85+
86+
private Process CreateQueryContainerForPartitionedStream(
87+
Subject<PartitionedStreamEvent<int, double>> subject,
88+
List<PartitionedStreamEvent<int, double>> output,
89+
Stream stream = null)
90+
{
91+
var qc = new QueryContainer();
92+
var input = qc.RegisterInput(subject);
93+
var streamableOutput = input.SessionTimeoutWindow(4, 5).Sum(o => o);
94+
var egress = qc.RegisterOutput(streamableOutput).ForEachAsync(o => output.Add(o));
95+
var process = qc.Restore(stream);
96+
97+
return process;
98+
}
99+
}
100+
}

0 commit comments

Comments
 (0)