Skip to content

Commit 13f8550

Browse files
committed
Report errors
1 parent 91cd811 commit 13f8550

File tree

3 files changed

+123
-17
lines changed

3 files changed

+123
-17
lines changed

crates/core/src/sync/streaming_sync.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -628,16 +628,16 @@ impl StreamingSyncIteration {
628628
let mut desc = String::new();
629629
let _ = write!(
630630
&mut desc,
631-
"Subscription to stream {}",
631+
"Subscription to stream {} ",
632632
local.local.stream_name
633633
);
634634
if let Some(params) = &local.local.local_params {
635635
let _ = write!(&mut desc, "(with parameters {params})");
636636
} else {
637-
desc.push_str(" (without parameters)");
637+
desc.push_str("(without parameters)");
638638
}
639639

640-
let _ = write!(&mut desc, "could not be resolved: {}", error.message);
640+
let _ = write!(&mut desc, " could not be resolved: {}", error.message);
641641
event.instructions.push(Instruction::LogLine {
642642
severity: LogSeverity::WARNING,
643643
line: Cow::Owned(desc),

dart/test/sync_stream_test.dart

Lines changed: 114 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,11 @@ void main() {
7373
buckets: [
7474
bucketDescription('a',
7575
subscriptions: [
76-
{'def': 'my_default_stream'}
76+
{'default': 0}
7777
],
7878
priority: 1),
7979
],
80-
streams: [('my_default_stream', true)],
80+
streams: [stream('my_default_stream', true)],
8181
),
8282
),
8383
);
@@ -122,7 +122,7 @@ void main() {
122122
syncTest('are deleted', (_) {
123123
control('start', null);
124124

125-
for (final stream in ['s1', 's2']) {
125+
for (final name in ['s1', 's2']) {
126126
control(
127127
'line_text',
128128
json.encode(
@@ -131,11 +131,11 @@ void main() {
131131
buckets: [
132132
bucketDescription('a',
133133
subscriptions: [
134-
{'def': stream}
134+
{'default': 0}
135135
],
136136
priority: 1),
137137
],
138-
streams: [(stream, true)],
138+
streams: [stream(name, true)],
139139
),
140140
),
141141
);
@@ -164,11 +164,11 @@ void main() {
164164
buckets: [
165165
bucketDescription('a',
166166
subscriptions: [
167-
{'def': 'a'}
167+
{'default': 0}
168168
],
169169
priority: 1),
170170
],
171-
streams: [('a', true)],
171+
streams: [stream('a', true)],
172172
),
173173
),
174174
);
@@ -210,6 +210,41 @@ void main() {
210210
expect(stored, containsPair('is_default', 0));
211211
expect(stored, containsPair('ttl', isNotNull));
212212
});
213+
214+
syncTest('reports errors', (_) {
215+
control('start', null);
216+
final response = control(
217+
'line_text',
218+
json.encode(
219+
checkpoint(
220+
lastOpId: 1,
221+
buckets: [
222+
bucketDescription('a',
223+
subscriptions: [
224+
{'default': 0}
225+
],
226+
priority: 1),
227+
],
228+
streams: [
229+
stream('a', true, errors: [
230+
{'message': 'error message', 'subscription': 'default'}
231+
])
232+
],
233+
),
234+
),
235+
);
236+
237+
expect(
238+
response,
239+
contains(
240+
containsPair(
241+
'LogLine',
242+
containsPair(
243+
'line', 'Default subscription a has errors: error message'),
244+
),
245+
),
246+
);
247+
});
213248
});
214249

215250
group('explicit subscriptions', () {
@@ -300,7 +335,6 @@ void main() {
300335
'stream': 'my_stream',
301336
'parameters': null,
302337
'override_priority': null,
303-
'client_id': '1',
304338
}
305339
],
306340
},
@@ -348,7 +382,7 @@ void main() {
348382
checkpoint(
349383
lastOpId: 1,
350384
buckets: [],
351-
streams: [('a', true)],
385+
streams: [stream('a', true)],
352386
),
353387
),
354388
);
@@ -386,5 +420,76 @@ void main() {
386420
),
387421
);
388422
});
423+
424+
syncTest('reports errors', (_) {
425+
control(
426+
'subscriptions',
427+
json.encode({
428+
'subscribe': {'stream': 'a', 'params': 'invalid'}
429+
}),
430+
);
431+
control(
432+
'subscriptions',
433+
json.encode({
434+
'subscribe': {'stream': 'a', 'params': 'valid'}
435+
}),
436+
);
437+
438+
final start = control('start', null);
439+
expect(
440+
start,
441+
contains(
442+
containsPair(
443+
'EstablishSyncStream',
444+
containsPair(
445+
'request',
446+
containsPair(
447+
'streams',
448+
{
449+
'include_defaults': true,
450+
'subscriptions': [
451+
{
452+
'stream': 'a',
453+
'parameters': 'invalid',
454+
'override_priority': null
455+
},
456+
{
457+
'stream': 'a',
458+
'parameters': 'valid',
459+
'override_priority': null
460+
}
461+
]
462+
},
463+
),
464+
),
465+
),
466+
),
467+
);
468+
final response = control(
469+
'line_text',
470+
json.encode(
471+
checkpoint(
472+
lastOpId: 1,
473+
buckets: [],
474+
streams: [
475+
stream('a', true, errors: [
476+
{'message': 'error message', 'subscription': 0}
477+
])
478+
],
479+
),
480+
),
481+
);
482+
483+
expect(
484+
response,
485+
contains(
486+
containsPair(
487+
'LogLine',
488+
containsPair('line',
489+
'Subscription to stream a (with parameters "invalid") could not be resolved: error message'),
490+
),
491+
),
492+
);
493+
});
389494
});
390495
}

dart/test/utils/test_utils.dart

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,22 @@ Object checkpoint({
66
required int lastOpId,
77
List<Object> buckets = const [],
88
String? writeCheckpoint,
9-
List<(String, bool)> streams = const [],
9+
List<Object> streams = const [],
1010
}) {
1111
return {
1212
'checkpoint': {
1313
'last_op_id': '$lastOpId',
1414
'write_checkpoint': null,
1515
'buckets': buckets,
16-
'streams': [
17-
for (final (name, isDefault) in streams)
18-
{'name': name, 'is_default': isDefault},
19-
],
16+
'streams': streams,
2017
}
2118
};
2219
}
2320

21+
Object stream(String name, bool isDefault, {List<Object> errors = const []}) {
22+
return {'name': name, 'is_default': isDefault, 'errors': errors};
23+
}
24+
2425
/// Creates a `checkpoint_complete` or `partial_checkpoint_complete` line.
2526
Object checkpointComplete({int? priority, String lastOpId = '1'}) {
2627
return {

0 commit comments

Comments
 (0)