|
7 | 7 | #define DTBUF_CHUNK_PAYLOAD_SIZE 4000
|
8 | 8 |
|
9 | 9 | struct header {
|
10 |
| - time_ms timestamp; |
11 |
| - chunk_length data_length; |
| 10 | + time_ms timestamp; |
| 11 | + chunk_length data_length; |
12 | 12 | };
|
13 | 13 |
|
14 | 14 | #define DTBUF_CHUNK_SIZE (sizeof (struct header) + DTBUF_CHUNK_PAYLOAD_SIZE)
|
15 | 15 |
|
16 |
| -int dtbuf_init(struct dtbuf *dtbuf, size_t capacity) { |
17 |
| - // to avoid splitting a chunk on the circular buffer boundaries, add |
18 |
| - // (DTBUF_CHUNK_SIZE-1) bytes at the end: a chunk starting at (capacity-1) |
19 |
| - // will still fit |
20 |
| - dtbuf->real_capacity = capacity + DTBUF_CHUNK_SIZE - 1; |
21 |
| - if (!(dtbuf->data = malloc(dtbuf->real_capacity))) { |
22 |
| - return 1; |
23 |
| - } |
24 |
| - dtbuf->capacity = capacity; |
25 |
| - dtbuf->head = 0; |
26 |
| - dtbuf->tail = 0; |
27 |
| - return 0; |
| 16 | +int dtbuf_init(struct dtbuf *dtbuf, size_t capacity) |
| 17 | +{ |
| 18 | + // to avoid splitting a chunk on the circular buffer boundaries, add |
| 19 | + // (DTBUF_CHUNK_SIZE-1) bytes at the end: a chunk starting at (capacity-1) |
| 20 | + // will still fit |
| 21 | + dtbuf->real_capacity = capacity + DTBUF_CHUNK_SIZE - 1; |
| 22 | + if (!(dtbuf->data = malloc(dtbuf->real_capacity))) { |
| 23 | + return 1; |
| 24 | + } |
| 25 | + dtbuf->capacity = capacity; |
| 26 | + dtbuf->head = 0; |
| 27 | + dtbuf->tail = 0; |
| 28 | + return 0; |
28 | 29 | }
|
29 | 30 |
|
30 |
| -void dtbuf_free(struct dtbuf *dtbuf) { |
31 |
| - free(dtbuf->data); |
| 31 | +void dtbuf_free(struct dtbuf *dtbuf) |
| 32 | +{ |
| 33 | + free(dtbuf->data); |
32 | 34 | }
|
33 | 35 |
|
34 |
| -int dtbuf_is_empty(struct dtbuf *dtbuf) { |
35 |
| - return dtbuf->head == dtbuf->tail; |
| 36 | +int dtbuf_is_empty(struct dtbuf *dtbuf) |
| 37 | +{ |
| 38 | + return dtbuf->head == dtbuf->tail; |
36 | 39 | }
|
37 | 40 |
|
38 |
| -int dtbuf_is_full(struct dtbuf *dtbuf) { |
39 |
| - // When dtbuf->head >= dtbuf->capacity, it "cycles" (reset to 0) if and |
40 |
| - // only if there is enough space at the start for a full chunk. |
41 |
| - // Thus, if dtbuf->head has not cycled while it is after capacity, then the |
42 |
| - // buffer is full. |
43 |
| - // Else, if head >= tail, there is always enough space (by design). |
44 |
| - // Else (if head < tail), there is enough space only if dtbuf->tail is far |
45 |
| - // enough (ie we can put a full chunk at the start). |
46 |
| - return dtbuf->head >= dtbuf->capacity || (dtbuf->head < dtbuf->tail |
47 |
| - && dtbuf->tail - dtbuf->head <= |
48 |
| - DTBUF_CHUNK_SIZE); |
| 41 | +int dtbuf_is_full(struct dtbuf *dtbuf) |
| 42 | +{ |
| 43 | + // When dtbuf->head >= dtbuf->capacity, it "cycles" (reset to 0) if and |
| 44 | + // only if there is enough space at the start for a full chunk. |
| 45 | + // Thus, if dtbuf->head has not cycled while it is after capacity, then the |
| 46 | + // buffer is full. |
| 47 | + // Else, if head >= tail, there is always enough space (by design). |
| 48 | + // Else (if head < tail), there is enough space only if dtbuf->tail is far |
| 49 | + // enough (ie we can put a full chunk at the start). |
| 50 | + return dtbuf->head >= dtbuf->capacity || (dtbuf->head < dtbuf->tail |
| 51 | + && dtbuf->tail - dtbuf->head <= |
| 52 | + DTBUF_CHUNK_SIZE); |
49 | 53 | }
|
50 | 54 |
|
51 |
| -time_ms dtbuf_next_timestamp(struct dtbuf *dtbuf) { |
52 |
| - struct header *header = (struct header *) &dtbuf->data[dtbuf->tail]; |
53 |
| - return header->timestamp; |
| 55 | +time_ms dtbuf_next_timestamp(struct dtbuf *dtbuf) |
| 56 | +{ |
| 57 | + struct header *header = (struct header *) &dtbuf->data[dtbuf->tail]; |
| 58 | + return header->timestamp; |
54 | 59 | }
|
55 | 60 |
|
56 |
| -ssize_t dtbuf_write_chunk(struct dtbuf *dtbuf, int fd_in, time_ms timestamp) { |
57 |
| - ssize_t r; |
58 |
| - struct header header; |
59 |
| - // directly write to dtbuf, at the right index |
60 |
| - int payload_index = dtbuf->head + sizeof(struct header); |
61 |
| - if ((r = |
62 |
| - read(fd_in, &dtbuf->data[payload_index], |
63 |
| - DTBUF_CHUNK_PAYLOAD_SIZE)) > 0) { |
64 |
| - // write headers |
65 |
| - header.timestamp = timestamp; |
66 |
| - header.data_length = (chunk_length) r; |
67 |
| - memcpy(&dtbuf->data[dtbuf->head], &header, sizeof(header)); |
68 |
| - dtbuf->head = payload_index + r; |
69 |
| - if (dtbuf->head >= dtbuf->capacity && dtbuf->tail >= DTBUF_CHUNK_SIZE) { |
70 |
| - // not enough space at the end of the buffer, cycle if there is enough |
71 |
| - // at the start |
72 |
| - dtbuf->head = 0; |
| 61 | +ssize_t dtbuf_write_chunk(struct dtbuf *dtbuf, int fd_in, time_ms timestamp) |
| 62 | +{ |
| 63 | + ssize_t r; |
| 64 | + struct header header; |
| 65 | + // directly write to dtbuf, at the right index |
| 66 | + int payload_index = dtbuf->head + sizeof(struct header); |
| 67 | + if ((r = read(fd_in, &dtbuf->data[payload_index], |
| 68 | + DTBUF_CHUNK_PAYLOAD_SIZE)) > 0) { |
| 69 | + // write headers |
| 70 | + header.timestamp = timestamp; |
| 71 | + header.data_length = (chunk_length) r; |
| 72 | + memcpy(&dtbuf->data[dtbuf->head], &header, sizeof(header)); |
| 73 | + dtbuf->head = payload_index + r; |
| 74 | + if (dtbuf->head >= dtbuf->capacity && dtbuf->tail >= DTBUF_CHUNK_SIZE) { |
| 75 | + // not enough space at the end of the buffer, cycle if there is |
| 76 | + // enough at the start |
| 77 | + dtbuf->head = 0; |
| 78 | + } |
| 79 | + } else if (r == -1) { |
| 80 | + perror("read()"); |
73 | 81 | }
|
74 |
| - } else if (r == -1) { |
75 |
| - perror("read()"); |
76 |
| - } |
77 |
| - return r; |
| 82 | + return r; |
78 | 83 | }
|
79 | 84 |
|
80 |
| -ssize_t dtbuf_read_chunk(struct dtbuf * dtbuf, int fd_out) { |
81 |
| - ssize_t w; |
82 |
| - struct header *pheader = (struct header *) &dtbuf->data[dtbuf->tail]; |
83 |
| - struct header header; |
84 |
| - chunk_length length = pheader->data_length; |
85 |
| - // directly read from dtbuf, at the right index |
86 |
| - int payload_index = dtbuf->tail + sizeof(struct header); |
87 |
| - if ((w = write(fd_out, &dtbuf->data[payload_index], length)) > 0) { |
88 |
| - if (w == length) { |
89 |
| - // we succeed to write all the data |
90 |
| - dtbuf->tail = payload_index + w; |
91 |
| - if (dtbuf->tail >= dtbuf->capacity) { |
92 |
| - // the next chunk cannot be after capacity |
93 |
| - dtbuf->tail = 0; |
94 |
| - if (dtbuf->head >= dtbuf->capacity) { |
95 |
| - // can happen if capacity < DTBUF_CHUNK_SIZE |
96 |
| - dtbuf->head = 0; |
| 85 | +ssize_t dtbuf_read_chunk(struct dtbuf *dtbuf, int fd_out) |
| 86 | +{ |
| 87 | + ssize_t w; |
| 88 | + struct header *pheader = (struct header *) &dtbuf->data[dtbuf->tail]; |
| 89 | + struct header header; |
| 90 | + chunk_length length = pheader->data_length; |
| 91 | + // directly read from dtbuf, at the right index |
| 92 | + int payload_index = dtbuf->tail + sizeof(struct header); |
| 93 | + if ((w = write(fd_out, &dtbuf->data[payload_index], length)) > 0) { |
| 94 | + if (w == length) { |
| 95 | + // we succeed to write all the data |
| 96 | + dtbuf->tail = payload_index + w; |
| 97 | + if (dtbuf->tail >= dtbuf->capacity) { |
| 98 | + // the next chunk cannot be after capacity |
| 99 | + dtbuf->tail = 0; |
| 100 | + if (dtbuf->head >= dtbuf->capacity) { |
| 101 | + // can happen if capacity < DTBUF_CHUNK_SIZE |
| 102 | + dtbuf->head = 0; |
| 103 | + } |
| 104 | + } |
| 105 | + } else { |
| 106 | + dtbuf->tail += w; |
| 107 | + // set the timestamp for writing at the new tail position |
| 108 | + header.timestamp = pheader->timestamp; |
| 109 | + // set the remaining length |
| 110 | + header.data_length = length - w; |
| 111 | + memcpy(&dtbuf->data[dtbuf->tail], &header, sizeof(header)); |
97 | 112 | }
|
98 |
| - } |
99 |
| - } else { |
100 |
| - dtbuf->tail += w; |
101 |
| - // set the timestamp for writing at the new tail position |
102 |
| - header.timestamp = pheader->timestamp; |
103 |
| - // set the remaining length |
104 |
| - header.data_length = length - w; |
105 |
| - memcpy(&dtbuf->data[dtbuf->tail], &header, sizeof(header)); |
106 |
| - } |
107 |
| - if (dtbuf->head >= dtbuf->capacity && dtbuf->tail >= DTBUF_CHUNK_SIZE) { |
108 |
| - // there is enough space at the start now, head can cycle |
109 |
| - dtbuf->head = 0; |
| 113 | + if (dtbuf->head >= dtbuf->capacity && dtbuf->tail >= DTBUF_CHUNK_SIZE) { |
| 114 | + // there is enough space at the start now, head can cycle |
| 115 | + dtbuf->head = 0; |
| 116 | + } |
| 117 | + } else if (w == -1) { |
| 118 | + perror("write()"); |
110 | 119 | }
|
111 |
| - } else if (w == -1) { |
112 |
| - perror("write()"); |
113 |
| - } |
114 |
| - return w; |
| 120 | + return w; |
115 | 121 | }
|
0 commit comments