-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmerger.c
371 lines (289 loc) · 8.88 KB
/
merger.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include "merger.h"
#include <stdlib.h> /* testing only */
static mx_packet_t *_get_packet(mx_t *s, int station, uint32_t counter)
{
mx_packet_t *p;
/* Check for a valid station */
if(station < 0 || station >= _STATIONS) return(NULL);
if(s->station[station].sid[0] == '\0') return(NULL);
if(s->station[station].timestamp <= s->timestamp - _TIMEOUT_MS) return(NULL);
/* Fetch a pointer to the target packet */
p = &s->station[station].packet[counter & 0xFFFF];
/* Is it stale? */
if(p->station != station || p->counter != counter) return(NULL);
/* Return the pointer */
return(p);
}
static mx_packet_t *_next_pcr(mx_t *s, int station, uint32_t counter)
{
mx_station_t *st;
mx_packet_t *p;
/* Increments the counter until we find the first
* packet in the designated PID with a PCR timestamp.
* Returns a pointer to the packet on success. */
st = &s->station[station];
for(; counter != st->latest + 1; counter++)
{
/* Test for a valid packet */
p = &st->packet[counter & 0xFFFF];
/* Packet must exist */
if(p->station != station || p->counter != counter) continue;
/* Packet header must have been parsed */
if(p->error != TS_OK) continue;
/* Packet must be a part of the designated
* PID and have a timestamp */
if(p->header.pid != s->pcr_pid ||
p->header.pcr_flag == 0) continue;
/* Packet must be outside the guard period */
if(p->timestamp >= s->timestamp - _GUARD_MS) continue;
/* Found one */
return(p);
}
return(NULL);
}
static mx_packet_t *_next_segment(mx_t *s, int station, mx_packet_t **r)
{
mx_station_t *st;
mx_packet_t *left, *right;
/* Convenience pointer */
st = &s->station[station];
left = _get_packet(s, station, st->right);
if(st->left == st->right || left == NULL)
{
/* No segment is currently set. Search for the first usable packet */
left = _next_pcr(s, station, st->current);
if(left == NULL) return(NULL);
}
/* Scan for the right hand side of the new segment */
right = _next_pcr(s, station, left->counter + 1);
if(right == NULL) return(NULL);
/* A new segment was found. Update the station data */
st->left = left->counter;
st->right = right->counter;
/* Advance the current stream position */
st->current = right->counter + 1;
/* Set a pointer to the right hand packet */
if(r != NULL) *r = right;
/* Return a pointer to the left hand packet */
return(left);
}
static void _reset_station(mx_t *s, int id, char sid[10], uint32_t counter)
{
/* Zero the station memory */
memset(&s->station[id], 0, sizeof(mx_station_t));
/* Set the callsign */
memcpy(s->station[id].sid, sid, 10);
/* The memset 0 creates a false positive 'valid' packet for station 0 */
s->station[id].packet[0].counter = 1;
/* Set the stream positions */
s->station[id].current = counter;
s->station[id].latest = counter;
}
static int _lookup_station(mx_t *s, char sid[10])
{
int i;
/* Search for the station ID, return index if found or -1 */
for(i = 0; i < _STATIONS; i++)
{
if(strncmp(s->station[i].sid, sid, 10) == 0 &&
s->station[i].timestamp > s->timestamp - _TIMEOUT_MS)
{
/* Found a matching station */
return(i);
}
}
return(-1);
}
static int _new_station(mx_t *s, char sid[10])
{
int i;
/* Search for an empty slot */
for(i = 0; i < _STATIONS; i++)
{
if(s->station[i].timestamp <= s->timestamp - _TIMEOUT_MS)
{
/* Found a timed out station */
return(i);
}
if(s->station[i].sid[0] == '\0')
{
/* Found an empty station */
return(i);
}
}
/* No free slots for this station */
return(-1);
}
void mx_init(mx_t *s, uint16_t pcr_pid)
{
memset(s, 0, sizeof(mx_t));
s->pcr_pid = pcr_pid;
s->next_station = -1;
}
void mx_feed(mx_t *s, int64_t timestamp, uint8_t *data)
{
int i;
int32_t d;
uint32_t counter;
mx_packet_t *p;
/* Update the global timestamp */
s->timestamp = timestamp;
/* Packet ID (2 bytes) */
if(data[0x00] != 0xA1 || data[0x01] != 0x55)
{
/* Invalid header. Ignore this packet */
return;
}
/* Counter (4 bytes little-endian) */
counter = (uint32_t) data[0x05] << 24
| (uint32_t) data[0x04] << 16
| (uint32_t) data[0x03] << 8
| (uint32_t) data[0x02] << 0;
/* Lookup the station number */
i = _lookup_station(s, (char *) &data[0x06]);
if(i < 0)
{
/* This is a new station, try to register it */
i = _new_station(s, (char *) &data[0x06]);
/* No free station slots! */
if(i < 0)
{
printf("No free slots for new station %.10s\n", (char *) &data[0x06]);
return;
}
printf("New station %.10s got slot %d\n", (char *) &data[0x06], i);
/* Reset the station */
_reset_station(s, i, (char *) &data[0x06], counter);
}
else
{
/* Existing station, ensure this new packet
* has a counter within the expected bounds */
d = (int32_t) counter - (int32_t) s->station[i].current;
if(d < -0xFFFF || d > 0xFFFF)
{
/* The counter is too far out, assume station has restarted */
printf("Station %d counter reset\n", i);
_reset_station(s, i, (char *) &data[0x06], counter);
}
else if(d <= 0)
{
/* The current stream position has already moved past
* this packet, it's too late to process it */
printf("Dropping late packet for station %d\n", i);
return;
}
}
/* Get a pointer to where the packet should go */
p = &s->station[i].packet[counter & 0xFFFF];
/* Do we already have this packet? If so, ignore it */
if(p->station == i && p->counter == counter)
{
printf("Duplicate packet received from station %d\n", i);
return;
}
/* Insert the packet into memory */
memset(p, 0, sizeof(mx_packet_t));
p->station = i;
p->counter = counter;
p->timestamp = timestamp;
memcpy(p->raw, &data[0x10], TS_PACKET_SIZE);
p->error = ts_parse_header(&p->header, p->raw);
p->next_station = -1;
p->next_counter = 0;
//printf("%d: ", counter);
//if(p->error != TS_INVALID) ts_dump_header(&p->header);
//else printf("TS_INVALID\n");
/* Update the station data */
d = (int32_t) counter - (int32_t) s->station[i].latest;
if(d > 0)
{
s->station[i].latest = counter;
}
s->station[i].timestamp = timestamp;
}
int mx_update(mx_t *s, int64_t timestamp)
{
int i;
mx_packet_t *o, *l, *r, *p;
uint64_t pcr, best_pcr;
uint32_t counter;
int best_station;
/* Update the global timestamp */
s->timestamp = timestamp;
/* Fetch the timestamp of the last packet sent, or 0 */
o = _get_packet(s, s->next_station, s->next_counter);
pcr = (o != NULL ? o->header.pcr_base : 0);
best_station = -1;
best_pcr = 0;
/* For each station, process the segments until we find one that
* begins at or after the timestamp 'pcr' */
for(i = 0; i < _STATIONS; i++)
{
/* Skip inactive stations */
if(s->station[i].sid[0] == '\0') continue;
if(s->station[i].timestamp <= s->timestamp - _TIMEOUT_MS) continue;
while((p = _next_segment(s, i, &r)) != NULL)
{
/* Skip past segments with weird or invalid PCR timings */
/* TODO: This won't handle clock roll-over well */
if(p->header.pcr_base >= r->header.pcr_base) continue;
if(r->header.pcr_base - p->header.pcr_base > _SEGMENT_PCR_LIMIT) continue;
/* Stop when we are at or ahead of the last sent segment */
if(p->header.pcr_base >= pcr) break;
}
/* Didn't find a newer segment? */
if(p == NULL) continue;
/* Track which station is offering the "best" segment to use next */
if(best_station == -1 || p->header.pcr_base < best_pcr)
{
best_station = i;
best_pcr = p->header.pcr_base;
}
else if(p->header.pcr_base == best_pcr)
{
/* TODO: Use segment and station score to decide
* if we should switch to this station next */
}
}
if(best_station == -1) return(0);
/* Link the packets inside the segment */
l = NULL;
for(counter = s->station[best_station].left; counter != s->station[best_station].right + 1; counter++)
{
p = _get_packet(s, best_station, counter);
if(p == NULL) continue;
/* Link the last packet to this one */
if(l != NULL)
{
l->next_station = p->station;
l->next_counter = p->counter;
}
l = p;
}
/* Link the previous segment to this one */
if(o != NULL)
{
p = _get_packet(s, best_station, s->station[best_station].left);
o->next_station = p->station;
o->next_counter = (pcr == best_pcr ? p->next_counter : p->counter);
}
/* Update pointer for new stations */
s->next_station = best_station;
s->next_counter = s->station[s->next_station].right;
//printf("s->next_station = %d\n", s->next_station);
//printf("s->next_counter = %d\n", s->next_counter);
return(1);
}
mx_packet_t *mx_next(mx_t *s, int last_station, uint32_t last_counter)
{
mx_packet_t *p = _get_packet(s, last_station, last_counter);
/* Fetch a pointer to the next packet in the chain, or if no
* previous valid packet use the latest segment in s */
if(p == NULL) p = _get_packet(s, s->next_station, s->next_counter);
else p = _get_packet(s, p->next_station, p->next_counter);
return(p);
}