Skip to content

Synchronous Outlet for zero-copying socket writing #170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 13 commits into
base: dev
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required (VERSION 3.12)
project (liblsl
VERSION 1.16.1
VERSION 1.16.2
LANGUAGES C CXX
DESCRIPTION "Labstreaminglayer C/C++ library"
HOMEPAGE_URL "https://github.com/sccn/liblsl"
@@ -345,6 +345,12 @@ if(NOT WIN32 AND LSL_TOOLS)
target_link_libraries(blackhole PRIVATE Threads::Threads)
target_include_directories(blackhole PRIVATE "thirdparty/asio/")
installLSLApp(blackhole)
add_executable(spike testing/spike.cpp)
target_link_libraries(spike PRIVATE lsl)
installLSLApp(spike)
add_executable(flood testing/flood.c)
target_link_libraries(flood PRIVATE lsl)
installLSLApp(flood)
endif()

set(LSL_INSTALL_ROOT ${CMAKE_CURRENT_BINARY_DIR})
4 changes: 4 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -47,3 +47,7 @@ addlslexample(TestSyncWithoutData cpp)

target_link_libraries(TestSyncWithoutData PRIVATE Threads::Threads)

if(NOT WIN32)
addlslexample(SendDataCBlocking c)
target_link_libraries(SendDataCBlocking PRIVATE Threads::Threads)
endif()
3 changes: 2 additions & 1 deletion examples/ReceiveDataInChunks.cpp
Original file line number Diff line number Diff line change
@@ -21,8 +21,9 @@ int main(int argc, char **argv) {
double max_buffered = argc > 2 ? std::stod(argv[2]) : 360.;
bool flush = argc > 3;
// resolve the stream of interest & make an inlet
int32_t buf_samples = (int32_t)(max_buffered * 1000);
lsl::stream_info inlet_info = lsl::resolve_stream("name", name).at(0);
lsl::stream_inlet inlet(inlet_info, (int32_t)max_buffered);
lsl::stream_inlet inlet(inlet_info, buf_samples, transp_bufsize_thousandths);

// Use set_postprocessing to get the timestamps in a common base clock.
// Do not use if this application will record timestamps to disk -- it is better to
204 changes: 115 additions & 89 deletions examples/SendData.cpp
Original file line number Diff line number Diff line change
@@ -1,89 +1,115 @@
#include "lsl_cpp.h"
#include <array>
#include <chrono>
#include <iostream>
#include <stdlib.h>
#include <thread>

/**
* This example program offers an 8-channel stream, float-formatted, that resembles EEG data.
* The example demonstrates also how per-channel meta-data can be specified using the .desc() field
* of the stream information object.
*
* Note that the timer used in the send loop of this program is not particularly accurate.
*/


const char *channels[] = {"C3", "C4", "Cz", "FPz", "POz", "CPz", "O1", "O2"};

int main(int argc, char *argv[]) {
std::string name, type;
if (argc < 3) {
std::cout
<< "This opens a stream under some user-defined name and with a user-defined content "
"type."
<< std::endl;
std::cout << "SendData Name Type [n_channels=8] [srate=100] [max_buffered=360]"
<< std::endl;
std::cout
<< "Please enter the stream name and the stream type (e.g. \"BioSemi EEG\" (without "
"the quotes)):"
<< std::endl;
std::cin >> name >> type;
} else {
name = argv[1];
type = argv[2];
}
int n_channels = argc > 3 ? std::stol(argv[3]) : 8;
n_channels = n_channels < 8 ? 8 : n_channels;
int samplingrate = argc > 4 ? std::stol(argv[4]) : 100;
int max_buffered = argc > 5 ? std::stol(argv[5]) : 360;

try {

// make a new stream_info (100 Hz)
lsl::stream_info info(
name, type, n_channels, samplingrate, lsl::cf_float32, std::string(name) += type);

// add some description fields
info.desc().append_child_value("manufacturer", "LSL");
lsl::xml_element chns = info.desc().append_child("channels");
for (int k = 0; k < n_channels; k++)
chns.append_child("channel")
.append_child_value("label", k < 8 ? channels[k] : "Chan-" + std::to_string(k + 1))
.append_child_value("unit", "microvolts")
.append_child_value("type", type);

// make a new outlet
lsl::stream_outlet outlet(info, 0, max_buffered);
std::vector<float> sample(n_channels, 0.0);

// Your device might have its own timer. Or you can decide how often to poll
// your device, as we do here.
int32_t sample_dur_us = 1000000 / (samplingrate > 0 ? samplingrate : 100);
auto t_start = std::chrono::high_resolution_clock::now();
auto next_sample_time = t_start;

// send data forever
std::cout << "Now sending data... " << std::endl;
double starttime = ((double)clock()) / CLOCKS_PER_SEC;
for (unsigned t = 0;; t++) {
// Create random data for the first 8 channels.
for (int c = 0; c < 8; c++) sample[c] = (float)((rand() % 1500) / 500.0 - 1.5);
// For the remaining channels, fill them with a sample counter (wraps at 1M).
std::fill(sample.begin() + 8, sample.end(), (float)(t % 1000000));

// Wait until the next expected sample time.
next_sample_time += std::chrono::microseconds(sample_dur_us);
std::this_thread::sleep_until(next_sample_time);

// send the sample
std::cout << sample[0] << "\t" << sample[n_channels-1] << std::endl;
outlet.push_sample(sample);
}

} catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; }
std::cout << "Press any key to exit. " << std::endl;
std::cin.get();
return 0;
}
#include "lsl_cpp.h"
#include <array>
#include <iostream>
#include <stdlib.h>
#include <thread>
#include <time.h>

/**
* This example program offers an 8-channel stream, float-formatted, that resembles EEG data.
* The example demonstrates also how per-channel meta-data can be specified using the .desc() field
* of the stream information object.
*
* Note that the timer used in the send loop of this program is not particularly accurate.
*/


const char *channels[] = {"C3", "C4", "Cz", "FPz", "POz", "CPz", "O1", "O2"};

int main(int argc, char *argv[]) {
std::string name, type;
if (argc < 3) {
std::cout
<< "This opens a stream under some user-defined name and with a user-defined content "
"type."
<< std::endl;
std::cout << "SendData Name Type n_channels[8] srate[100] max_buffered[360] sync[false] "
"contig[true]"
<< std::endl;
std::cout
<< "Please enter the stream name and the stream type (e.g. \"BioSemi EEG\" (without "
"the quotes)):"
<< std::endl;
std::cin >> name >> type;
} else {
name = argv[1];
type = argv[2];
}
int n_channels = argc > 3 ? std::stol(argv[3]) : 8;
n_channels = n_channels < 8 ? 8 : n_channels;
int samplingrate = argc > 4 ? std::stol(argv[4]) : 100;
int max_buffered = argc > 5 ? std::stol(argv[5]) : 360;
bool sync = argc > 6 ? std::stol(argv[6]) > 0 : false;
bool contig = argc > 7 ? std::stol(argv[7]) > 0 : true;

try {
// if (!sync && !contig) {
// throw std::invalid_argument( "async is incompatible with discontig
//push_numeric_bufs (except for strings, not used here)." );
// }

// make a new stream_info (100 Hz)
lsl::stream_info info(
name, type, n_channels, samplingrate, lsl::cf_float32, std::string(name) += type);

// add some description fields
info.desc().append_child_value("manufacturer", "LSL");
lsl::xml_element chns = info.desc().append_child("channels");
for (int k = 0; k < n_channels; k++)
chns.append_child("channel")
.append_child_value("label", k < 8 ? channels[k] : "Chan-" + std::to_string(k + 1))
.append_child_value("unit", "microvolts")
.append_child_value("type", type);

// make a new outlet
lsl::stream_outlet outlet(
info, 0, max_buffered, sync ? transp_sync_blocking : transp_default);

// Initialize 2 discontiguous data arrays.
std::vector<float> sample(8, 0.0);
std::vector<float> extra(n_channels - 8, 0.0);
// If this is contiguous mode (default) then we combine the arrays.
if (contig) sample.insert(sample.end(), extra.begin(), extra.end());

// bytes is used in !contig mode because we need to know how big each buffer is.
std::array<uint32_t, 2> bytes = {
8 * sizeof(float), static_cast<uint32_t>((n_channels - 8) * sizeof(float))};

// Your device might have its own timer. Or you can decide how often to poll
// your device, as we do here.
int32_t sample_dur_us = 1000000 / (samplingrate > 0 ? samplingrate : 100);
auto t_start = std::chrono::high_resolution_clock::now();
auto next_sample_time = t_start;

// send data forever
std::cout << "Now sending data... " << std::endl;
for (unsigned t = 0;; t++) {
// Create random data for the first 8 channels.
for (int c = 0; c < 8; c++) sample[c] = (float)((rand() % 1500) / 500.0 - 1.5);
// For the remaining channels, fill them with a sample counter (wraps at 1M).
if (contig)
std::fill(sample.begin() + 8, sample.end(), (float)(t % 1000000));
else
std::fill(extra.begin(), extra.end(), (float)(t % 1000000));

// Wait until the next expected sample time.
next_sample_time += std::chrono::microseconds(sample_dur_us);
std::this_thread::sleep_until(next_sample_time);

// send the sample
if (contig) {
std::cout << sample[0] << "\t" << sample[n_channels-1] << std::endl;
outlet.push_sample(sample);
} else {
// Advanced: Push set of discontiguous buffers.
std::array<float *, 2> bufs = {sample.data(), extra.data()};
outlet.push_numeric_bufs(
(void **)bufs.data(), bytes.data(), 2, lsl::local_clock(), true);
}
}

} catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; }
std::cout << "Press any key to exit. " << std::endl;
std::cin.get();
return 0;
}
239 changes: 239 additions & 0 deletions examples/SendDataCBlocking.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
#include <inttypes.h>
#include <lsl_c.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

/**
* This example program pushes a 16-bit stream with a constant value across all channels. The
* channel number and rate are configurable via command line arguments. The data are not copied!
* This requires an inconvenient application design using threads and synchronization, but the
* this kind of design is necessary when frequent copying of large data chunks is too expensive.
* The main thread takes data from the device and puts it into a transmit buffer, and the secondary
* thread pushes the data from the transmit buffer into the lsl outlet then releases the data from
* the buffer. This is a relatively new and unusual design pattern for lsl. It uses a new BLOCKING
* function `push_chunk_wait`. Presently this only supports int16 types. This function passes
* the data pointer directly to asio to write data to each consumer and waits for asio to return.
* Note: One would usually prefer to use C++ niceities and queueing libraries (e.g.,
* moodycamel::concurrentqueue) when working with big data across threads. However, the high-
* throughput data demands might be come from embedded devices with compiler limitations,
* so we use C here. Any input on how to speed up this C code is greatly appreciated.
*/

#define DEVICE_BUFFER_SIZE 4194304

typedef struct {
char name[100];
char serial[100];
double last_timestamp;
int srate;
int nchans;
int write_idx;
int read_idx;
int min_frames_per_chunk;
int16_t channel_data[DEVICE_BUFFER_SIZE / 2]; // (channels * samples) < 2 million
} fake_device;

typedef struct {
int thread_status;
int chunk_size;
double buffer_dur;
int do_async;
} thread_params;

// Linked-list queue
typedef struct chunk_info {
int16_t *buf;
int n_frames;
double timestamp;
} chunk_info;
typedef struct node {
chunk_info info;
struct node *next;
} node;
typedef struct queue {
int count;
node *front;
node *rear;
} queue;
void initialize(queue *q) {
q->count = 0;
q->front = NULL;
q->rear = NULL;
}
int isempty(queue *q) { return (q->front == NULL); }
void enqueue(queue *q, int16_t *data, int frames, double ts) {
node *tmp;
tmp = malloc(sizeof(node));
tmp->info.buf = data;
tmp->info.n_frames = frames;
tmp->info.timestamp = ts;
tmp->next = NULL;
if (isempty(q))
q->front = q->rear = tmp;
else {
q->rear->next = tmp;
q->rear = tmp;
}
q->count++;
}
chunk_info dequeue(queue *q) {
chunk_info info = q->front->info;
node *tmp = q->front;
free(tmp);
q->front = q->front->next;
q->count--;
return (info);
}

// Globals
fake_device *device = 0;
sem_t sem;
queue *q;

// fetch_data -- Normally something provided by Device SDK
uint64_t fetch_data(int16_t **buffer) {
static int buf_samples = sizeof(device->channel_data) / sizeof(device->channel_data[0]);

if (device->last_timestamp < 0) device->last_timestamp = lsl_local_clock();
double now = lsl_local_clock();
// Calculate how many frames/timestamps have elapsed since the last call.
uint64_t elapsed_frames = (uint64_t)((now - device->last_timestamp) * device->srate);
if (elapsed_frames < device->min_frames_per_chunk) elapsed_frames = 0;
// Cut this fetch short if it would go past the buffer. Next fetch will start at first idx.
if ((device->write_idx + elapsed_frames * device->nchans) > buf_samples)
elapsed_frames = (buf_samples - device->write_idx) / device->nchans;
// Further limit elapsed_samples to not overtake the read point (tail)
// if ((device->write_idx < device->read_idx) &&
// (device->write_idx + (elapsed_frames * device->nchans) >= device->read_idx))
// elapsed_frames = (device->read_idx - device->write_idx) / device->nchans;
if (elapsed_frames > 0) {
// New elapsed_time after accounting for rounding to integer frames.
device->last_timestamp += (double)(elapsed_frames) / device->srate;
// I assume that the device has its own acquisition buffer and that it copies data
// to a separate data buffer for API purposes.
// We are using a model where the device SDK shares its buffer with the client application.
// This is a bit unusual but allows for fastest throughput.
*buffer = &(device->channel_data[device->write_idx]);

// And we advance the head for the next data transfer.
device->write_idx = (device->write_idx + elapsed_frames * device->nchans) % buf_samples;
if ((buf_samples - device->write_idx) < device->nchans) device->write_idx = 0;
}
return elapsed_frames;
}

// transmit_thread -- responsible for popping data off the queue and pushing it to LSL
void transmit_thread(void *vargp) {
// Initialize thread-local variables
thread_params *params = (thread_params *)vargp;

/* declare a new streaminfo */
lsl_streaminfo info = lsl_create_streaminfo(
device->name, "TestLSL", device->nchans, device->srate, cft_int16, device->serial);

/* add some meta-data fields to it */
/* (for more standard fields, see https://github.com/sccn/xdf/wiki/Meta-Data) */
lsl_xml_ptr desc = lsl_get_desc(info);
lsl_append_child_value(desc, "manufacturer", "LSL");
lsl_xml_ptr chns = lsl_append_child(desc, "channels");
char chanlabel[20];
for (int c = 0; c < device->nchans; c++) {
lsl_xml_ptr chn = lsl_append_child(chns, "channel");
snprintf(chanlabel, 20, "Chan-%d", c);
lsl_append_child_value(chn, "label", chanlabel);
lsl_append_child_value(chn, "unit", "microvolts");
lsl_append_child_value(chn, "type", "EEG");
}

/* make a new outlet */
lsl_outlet outlet =
lsl_create_outlet_ex(info, params->chunk_size, params->buffer_dur, params->do_async ? transp_sync_blocking : 0);

printf("Now sending data...\n");
params->thread_status = 1;
int buf_samples = sizeof(device->channel_data) / sizeof(device->channel_data[0]);
while (params->thread_status) {
sem_wait(&sem);
if (!isempty(q)) {
chunk_info chunk = dequeue(q);
int64_t chunk_samples = chunk.n_frames * device->nchans;
lsl_push_chunk_stp(outlet, chunk.buf, chunk_samples, chunk.timestamp, 1);
device->read_idx = (device->read_idx + chunk_samples) % buf_samples;
}
}
lsl_destroy_outlet(outlet);
}

int main(int argc, char *argv[]) {
printf("SendDataCBlocking example program. Sends int16 data with minimal copies.\n");
printf("Usage: %s [streamname] [streamuid] [srate] [nchans] [buff_dur] [do_async]\n", argv[0]);
printf("Using lsl %d, lsl_library_info: %s\n", lsl_library_version(), lsl_library_info());
const char *name = argc > 1 ? argv[1] : "SendDataCBlocking";
const char *uid = argc > 2 ? argv[2] : "6s45ahas321";
int srate = argc > 3 ? strtol(argv[3], NULL, 10) : 512;
int n_chans = argc > 4 ? strtol(argv[4], NULL, 10) : 32;
double buff_dur = argc > 5 ? strtod(argv[5], NULL) : 60.;
int do_async = argc > 6 ? strtol(argv[6], NULL, 10) : 1;
int32_t samps_per_chunk = argc > 6 ? strtol(argv[6], NULL, 10) : 30;

// Initialize our fake device and set its parameters. This would normally be taken care of
// by the device SDK.
device = (fake_device *)malloc(sizeof(fake_device));
memset(device, 0, sizeof(fake_device));
strcpy(device->name, name);
device->srate = srate;
device->nchans = n_chans;
device->last_timestamp = -1.;
device->min_frames_per_chunk = device->srate / 1000;
strcpy(device->serial, uid);
// Give the device buffer data some non-zero value.
memset(device->channel_data, 23, sizeof(device->channel_data));
// write_idx and read_idx are OK at 0.

thread_params params;
params.buffer_dur = buff_dur;
params.chunk_size = samps_per_chunk;
params.thread_status = 0;
params.do_async = do_async;

// Initialize q
q = malloc(sizeof(queue));
initialize(q);

sem_init(&sem, 0, 0);
pthread_t thread_id;
if (pthread_create(&thread_id, NULL, (void *)&transmit_thread, &params)) {
fprintf(stderr, "Error creating LSL transmit thread.\n");
return 1;
}

int exit_condition = 0;
int16_t *shared_buff;
double last_timestamp_received = -1.;
uint64_t n_frames_received = 0;
while (1) {
if (exit_condition) break;

// Get data from device
n_frames_received = fetch_data(&shared_buff);
if (n_frames_received > 0) {
enqueue(q, shared_buff, n_frames_received, device->last_timestamp);
sem_post(&sem);
}
}

if (params.thread_status) // Kill thread
{
params.thread_status = 0;
if (pthread_join(thread_id, NULL)) {
fprintf(stderr, "Error terminating LSL transmit thread.\n");
}
}
sem_destroy(&sem);
free(device);

return 0;
}
41 changes: 33 additions & 8 deletions examples/SendDataInChunks.cpp
Original file line number Diff line number Diff line change
@@ -92,15 +92,25 @@ struct fake_device {

int main(int argc, char **argv) {
std::cout << "SendDataInChunks" << std::endl;
std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered chunk_rate" << std::endl;
std::cout << "SendDataInChunks StreamName StreamType samplerate n_channels max_buffered "
"chunk_rate nodata use_sync"
<< std::endl;
std::cout << "- max_buffered -- duration in sec (or x100 samples if samplerate is 0) to buffer for each outlet" << std::endl;
std::cout << "- chunk_rate -- number of chunks pushed per second. For this example, make it a common factor of samplingrate and 1000." << std::endl;

std::cout << "- nodata -- Set non-zero to cause the fake device to not copy pattern data into "
"the buffer."
<< std::endl;
std::cout << "- use_sync -- Set to non-zero to use blocking send." << std::endl;

std::string name{argc > 1 ? argv[1] : "MyAudioStream"}, type{argc > 2 ? argv[2] : "Audio"};
int samplingrate = argc > 3 ? std::stol(argv[3]) : 44100; // Here we specify srate, but typically this would come from the device.
int n_channels = argc > 4 ? std::stol(argv[4]) : 2; // Here we specify n_chans, but typically this would come from theh device.
double max_buffered = argc > 5 ? std::stod(argv[5]) : 360.;
int32_t chunk_rate = argc > 6 ? std::stol(argv[6]) : 10; // Chunks per second.
bool nodata = argc > 7;
bool do_sync = argc > 8 ? (bool)std::stol(argv[8]) : true;
bool b_contig = true && do_sync; // Set true to test gather-write operations.

int32_t chunk_samples = samplingrate > 0 ? std::max((samplingrate / chunk_rate), 1) : 100; // Samples per chunk.
int32_t chunk_duration = 1000 / chunk_rate; // Milliseconds per chunk

@@ -118,16 +128,20 @@ int main(int argc, char **argv) {
chn.append_child_value("type", type);
}
int32_t buf_samples = (int32_t)(max_buffered * samplingrate);
lsl::stream_outlet outlet(info, chunk_samples, buf_samples);
auto flags = static_cast<lsl_transport_options_t>(
(do_sync ? transp_sync_blocking : transp_default) | transp_bufsize_samples);
lsl::stream_outlet outlet(info, chunk_samples, buf_samples, flags);
info = outlet.info(); // Refresh info with whatever the outlet captured.
std::cout << "Stream UID: " << info.uid() << std::endl;

// Create a connection to our device.
fake_device my_device(n_channels, (float)samplingrate);
int dev_chans = b_contig ? n_channels : n_channels + 1;
fake_device my_device(dev_chans, (float)samplingrate);

// Prepare buffer to get data from 'device'.
// The buffer should be larger than you think you need. Here we make it 4x as large.
std::vector<int16_t> chunk_buffer(4 * chunk_samples * n_channels);
std::vector<int16_t> dev_buffer(4 * chunk_samples * dev_chans);
std::fill(dev_buffer.begin(), dev_buffer.end(), 0);

std::cout << "Now sending data..." << std::endl;

@@ -141,13 +155,24 @@ int main(int argc, char **argv) {
std::this_thread::sleep_until(next_chunk_time);

// Get data from device
std::size_t returned_samples = my_device.get_data(chunk_buffer);
std::size_t returned_samples = my_device.get_data(dev_buffer, nodata);

// send it to the outlet. push_chunk_multiplexed is one of the more complicated approaches.
// other push_chunk methods are easier but slightly slower.
double ts = lsl::local_clock();
outlet.push_chunk_multiplexed(
chunk_buffer.data(), returned_samples * n_channels, ts, true);
if (b_contig) {
// Push a chunk of a contiguous buffer.
outlet.push_chunk_multiplexed(
dev_buffer.data(), returned_samples * n_channels, ts, true);
} else {
std::cout << "Discontiguous push_chunk not yet supported." << std::endl;
std::cout << "See SendData.cpp for discontiguous push_sample, then set "
<< std::endl;
std::cout << "timestamps as LSL_DEDUCED_TIMESTAMP and pushtrough as false "
<< std::endl;
std::cout << "for all samples except the the first or last in a chunk."
<< std::endl;
}
}
} catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; }
std::cout << "Press any key to exit. " << std::endl;
3 changes: 3 additions & 0 deletions include/lsl/common.h
Original file line number Diff line number Diff line change
@@ -161,6 +161,9 @@ typedef enum {
/// The supplied max_buf should be scaled by 0.001.
transp_bufsize_thousandths = 2,

/// The outlet will use synchronous (blocking) calls to asio to push data
transp_sync_blocking = 4,

// prevent compilers from assuming an instance fits in a single byte
_lsl_transport_options_maxval = 0x7f000000
} lsl_transport_options_t;
19 changes: 16 additions & 3 deletions include/lsl/outlet.h
Original file line number Diff line number Diff line change
@@ -99,9 +99,11 @@ extern LIBLSL_C_API int32_t lsl_push_sample_vtp(lsl_outlet out, const void *data
/** @copybrief lsl_push_sample_ftp
* @see lsl_push_sample_ftp
* @param out The lsl_outlet object through which to push the data.
* @param data A pointer to values to push. The number of values pointed to must be no less than the
* number of channels in the sample.
* @param lengths A pointer the number of elements to push for each channel (string lengths).
* @param data An array of data buffers to push. The number of buffers in the array must be no less
* than the number of channels in the sample. Each entry in data must be longer than the
* corresponding entry in `lengths`.
* @param lengths An array containing the lengths of each buffer in data. Units are string lengths
* or number of bytes.
*/
extern LIBLSL_C_API int32_t lsl_push_sample_buf(lsl_outlet out, const char **data, const uint32_t *lengths);
/** @copydoc lsl_push_sample_buf
@@ -111,6 +113,17 @@ extern LIBLSL_C_API int32_t lsl_push_sample_buft(lsl_outlet out, const char **da
* @param pushthrough @see lsl_push_sample_ftp */
extern LIBLSL_C_API int32_t lsl_push_sample_buftp(lsl_outlet out, const char **data, const uint32_t *lengths, double timestamp, int32_t pushthrough);

/** @copydoc lsl_push_sample_buftp
* @param data An array of data buffers to push. The number of buffers in the array must be no less
* than `nbufs`.
* @param bytes An array comprising the number of bytes in each buffer. The number of entries in
* bytes must be no less than `nbufs`.
* @param nbufs The number of values pointed to in `data` and equivalently the number of items in
* `bytes`.
*/
extern LIBLSL_C_API int32_t lsl_push_sample_rawtpn(lsl_outlet out, void **data,
const uint32_t *bytes, double timestamp, int32_t pushthrough, uint32_t nbufs);

/** Push a chunk of multiplexed samples into the outlet. One timestamp per sample is provided.
*
* @attention Note that the provided buffer size is measured in channel values (e.g. floats) rather
15 changes: 15 additions & 0 deletions include/lsl_cpp.h
Original file line number Diff line number Diff line change
@@ -513,6 +513,21 @@ class stream_outlet {
lsl_push_sample_vtp(obj.get(), (sample), timestamp, pushthrough);
}

/**
* Push a pointer to an array of buffers of variable size as one sample into the outlet.
*
* @param bufs A pointer to an array of data buffers.
* @param bytes An array of sizes (number of bytes) of buffers in bufs.
* @param nbufs Total number of buffers.
* @param timestamp Optionally the capture time of the sample, in agreement with local_clock();
* @param pushthrough Whether to push the sample through to the receivers immediately instead of
* concatenating with subsequent samples.
*/
void push_numeric_bufs(void **bufs, uint32_t *bytes, uint32_t nbufs, double timestamp = 0.0,
bool pushthrough = true) {
lsl_push_sample_rawtpn(obj.get(), bufs, bytes, timestamp, pushthrough, nbufs);
}


// ===================================================
// === Pushing an chunk of samples into the outlet ===
12 changes: 6 additions & 6 deletions src/api_config.cpp
Original file line number Diff line number Diff line change
@@ -105,6 +105,12 @@ void api_config::load_from_file(const std::string &filename) {
} else
loguru::g_stderr_verbosity = log_level;

// log config filename only after setting the verbosity level
if (!filename.empty())
LOG_F(INFO, "Configuration loaded from %s", filename.c_str());
else
LOG_F(INFO, "Loaded default config");

// read out the [ports] parameters
multicast_port_ = pt.get("ports.MulticastPort", 16571);
base_port_ = pt.get("ports.BasePort", 16572);
@@ -263,12 +269,6 @@ void api_config::load_from_file(const std::string &filename) {
smoothing_halftime_ = pt.get("tuning.SmoothingHalftime", 90.0F);
force_default_timestamps_ = pt.get("tuning.ForceDefaultTimestamps", false);

// log config filename only after setting the verbosity level and all config has been read
if (!filename.empty())
LOG_F(INFO, "Configuration loaded from %s", filename.c_str());
else
LOG_F(INFO, "Loaded default config");

} catch (std::exception &e) {
LOG_F(ERROR, "Error parsing config file '%s': '%s', rolling back to defaults",
filename.c_str(), e.what());
70 changes: 65 additions & 5 deletions src/lsl_outlet_c.cpp
Original file line number Diff line number Diff line change
@@ -160,14 +160,74 @@ LIBLSL_C_API int32_t lsl_push_sample_buft(

LIBLSL_C_API int32_t lsl_push_sample_buftp(lsl_outlet out, const char **data,
const uint32_t *lengths, double timestamp, int32_t pushthrough) {
stream_outlet_impl *outimpl = out;
// As the number of bytes-per-buffer is the same as the number of chars-per-buffer,
// we can pass `lengths` through as `bytes`.
return lsl_push_sample_rawtpn(out, (void **)data, lengths, timestamp, pushthrough,
(uint32_t)outimpl->info().channel_count());
}

LIBLSL_C_API int32_t lsl_push_sample_rawtpn(lsl_outlet out, void **data, const uint32_t *bytes,
double timestamp, int32_t pushthrough, uint32_t nbufs) {
stream_outlet_impl *outimpl = out;
try {
stream_outlet_impl *outimpl = out;
std::vector<std::string> tmp;
for (uint32_t k = 0; k < (uint32_t)outimpl->info().channel_count(); k++)
tmp.emplace_back(data[k], lengths[k]);
return outimpl->push_sample_noexcept(&tmp[0], timestamp, pushthrough);
if (outimpl->is_sync_blocking()) {
// Convert input to a vector of asio buffers for a gather-write operation.
std::vector<asio::const_buffer> bufs;
bufs.reserve(nbufs);
for (auto buf_ix = 0; buf_ix < nbufs; buf_ix++) {
bufs.push_back(asio::buffer(data[buf_ix], bytes[buf_ix]));
}
return outimpl->push_sample_gather(bufs, timestamp, pushthrough);
} else {
// Make contiguous.
if (outimpl->info().channel_format() == cft_string) {
// For strings we place in std::string vector to make sure they are properly
// terminated.
std::vector<std::string> tmp;
for (uint32_t k = 0; k < nbufs; k++)
tmp.emplace_back((const char *)data[k], bytes[k]);
return outimpl->push_sample_noexcept(&tmp[0], timestamp, pushthrough);
} else {
// Otherwise we put into new memory block.
uint32_t total_bytes = 0, byte_offset = 0;
for (size_t k = 0; k < nbufs; k++) { total_bytes += bytes[k]; }
char *tmp = (char *)malloc(total_bytes);
for (size_t k = 0; k < nbufs; k++) {
memcpy(&tmp[byte_offset], data[k], bytes[k]);
byte_offset += bytes[k];
}
// TODO: I tried passing void buffer but eventually fail because the convert
// functions
// become ambiguous.
lsl_error_code_t ec;
switch (outimpl->info().channel_format()) {
case cft_int8:
ec = outimpl->push_sample_noexcept((const char *)tmp, timestamp, pushthrough);
case cft_int16:
ec =
outimpl->push_sample_noexcept((const int16_t *)tmp, timestamp, pushthrough);
case cft_int32:
ec =
outimpl->push_sample_noexcept((const int32_t *)tmp, timestamp, pushthrough);
case cft_int64:
ec =
outimpl->push_sample_noexcept((const int64_t *)tmp, timestamp, pushthrough);
case cft_float32:
ec = outimpl->push_sample_noexcept((const float *)tmp, timestamp, pushthrough);
case cft_double64:
ec = outimpl->push_sample_noexcept((const double *)tmp, timestamp, pushthrough);
case cft_undefined: ec = lsl_internal_error;
}
free(tmp);
return ec;
}
}
} catch (std::exception &e) {
LOG_F(WARNING, "Unexpected error during push_sample: %s", e.what());
if (!outimpl->is_sync_blocking() && outimpl->info().channel_format() != cft_string)
LOG_F(ERROR, "lsl_push_sample_buftpn only compatible with string type or when outlet "
"is using sync writes.");
return lsl_internal_error;
}
}
2 changes: 2 additions & 0 deletions src/sample.h
Original file line number Diff line number Diff line change
@@ -54,6 +54,8 @@ class factory {
/// Reclaim a sample that's no longer used.
void reclaim_sample(sample *s);

std::size_t datasize() const { return format_sizes[fmt_] * static_cast<std::size_t>(num_chans_); }

private:
/// Pop a sample from the freelist (multi-producer/single-consumer queue by Dmitry Vjukov)
sample *pop_freelist();
77 changes: 65 additions & 12 deletions src/stream_outlet_impl.cpp
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@
#include "tcp_server.h"
#include "udp_server.h"
#include <algorithm>
#include <asio/post.hpp>
#include <chrono>
#include <memory>

@@ -22,8 +23,16 @@ stream_outlet_impl::stream_outlet_impl(const stream_info_impl &info, int32_t chu
chunk_size_(info.calc_transport_buf_samples(requested_bufsize, flags)),
info_(std::make_shared<stream_info_impl>(info)),
send_buffer_(std::make_shared<send_buffer>(chunk_size_)),
io_ctx_data_(std::make_shared<asio::io_context>(1)),
do_sync_(flags & transp_sync_blocking), io_ctx_data_(std::make_shared<asio::io_context>(1)),
io_ctx_service_(std::make_shared<asio::io_context>(1)) {

if ((info.channel_format() == cft_string) && do_sync_)
throw std::invalid_argument("Synchronous push not supported for string-formatted streams.");

// reserver space for sync timestamps so `push_back` doesn't caused reallocations
// to invalidate pointers to elements
if(do_sync_) sync_timestamps_.reserve(chunk_size_);

ensure_lsl_initialized();
const api_config *cfg = api_config::get_instance();

@@ -41,7 +50,7 @@ stream_outlet_impl::stream_outlet_impl(const stream_info_impl &info, int32_t chu

// create TCP data server
tcp_server_ = std::make_shared<tcp_server>(info_, io_ctx_data_, send_buffer_, sample_factory_,
chunk_size_, cfg->allow_ipv4(), cfg->allow_ipv6());
chunk_size_, cfg->allow_ipv4(), cfg->allow_ipv6(), do_sync_);

// fail if both stacks failed to instantiate
if (udp_servers_.empty())
@@ -53,10 +62,8 @@ stream_outlet_impl::stream_outlet_impl(const stream_info_impl &info, int32_t chu
for (auto &responder : responders_) responder->begin_serving();

// and start the IO threads to handle them
const std::string name{"IO_" + this->info().name().substr(0, 11)};
for (const auto &io : {io_ctx_data_, io_ctx_service_})
io_threads_.emplace_back(std::make_shared<std::thread>([io, name]() {
loguru::set_thread_name(name.c_str());
io_threads_.emplace_back(std::make_shared<std::thread>([io]() {
while (true) {
try {
io->run();
@@ -66,6 +73,10 @@ stream_outlet_impl::stream_outlet_impl(const stream_info_impl &info, int32_t chu
}
}
}));

const std::string name{this->info().name().substr(0, 11)};
asio::post(*io_ctx_data_, [name]() { loguru::set_thread_name(("IO_" + name).c_str()); });
asio::post(*io_ctx_service_, [name]() { loguru::set_thread_name(("SVC_" + name).c_str()); });
}

void stream_outlet_impl::instantiate_stack(udp udp_protocol) {
@@ -147,8 +158,12 @@ void stream_outlet_impl::push_numeric_raw(const void *data, double timestamp, bo
if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0;
sample_p smp(
sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough));
smp->assign_untyped(data);
send_buffer_->push_sample(smp);
if (!do_sync_) {
smp->assign_untyped(data); // Note: Makes a copy!
send_buffer_->push_sample(smp);
} else {
enqueue_sync(asio::buffer(data, smp->datasize()), timestamp, pushthrough);
}
}

bool stream_outlet_impl::have_consumers() { return send_buffer_->have_consumers(); }
@@ -157,13 +172,44 @@ bool stream_outlet_impl::wait_for_consumers(double timeout) {
return send_buffer_->wait_for_consumers(timeout);
}

void stream_outlet_impl::push_timestamp_sync(const double& timestamp) {
static_assert(TAG_TRANSMITTED_TIMESTAMP == 2, "Unexpected TAG_TRANSMITTED_TIMESTAMP");
const uint64_t ENDIAN_SAFE_TAG_TRANSMITTED = (2LL << 28) | 2LL;
if (timestamp == DEDUCED_TIMESTAMP) {
sync_buffs_.emplace_back(&TAG_DEDUCED_TIMESTAMP, 1);
} else {
sync_timestamps_.emplace_back(ENDIAN_SAFE_TAG_TRANSMITTED, timestamp);
// add a pointer to the memory region containing |TAG_TRANSMITTED_TIMESTAMP|timestamp
// one byte for the tag, 8 for the timestamp
sync_buffs_.emplace_back(reinterpret_cast<const char*>(&sync_timestamps_.back()) + 7, 9);
}
}

void stream_outlet_impl::pushthrough_sync() {
// LOG_F(INFO, "Pushing %u buffers.", sync_buffs_.size());
tcp_server_->write_all_blocking(sync_buffs_);
sync_buffs_.clear();
sync_timestamps_.clear();
}

void stream_outlet_impl::enqueue_sync(
asio::const_buffer buff, const double& timestamp, bool pushthrough) {
push_timestamp_sync(timestamp);
sync_buffs_.push_back(buff);
if (pushthrough) pushthrough_sync();
}

template <class T>
void stream_outlet_impl::enqueue(const T *data, double timestamp, bool pushthrough) {
if (lsl::api_config::get_instance()->force_default_timestamps()) timestamp = 0.0;
sample_p smp(
sample_factory_->new_sample(timestamp == 0.0 ? lsl_clock() : timestamp, pushthrough));
smp->assign_typed(data);
send_buffer_->push_sample(smp);
if (timestamp == 0.0 || lsl::api_config::get_instance()->force_default_timestamps()) timestamp = lsl_local_clock();
if (!do_sync_) {
sample_p smp(
sample_factory_->new_sample(timestamp, pushthrough));
smp->assign_typed(data);
send_buffer_->push_sample(smp);
} else {
enqueue_sync(asio::buffer(data, sample_factory_->datasize()), timestamp, pushthrough);
}
}

template void stream_outlet_impl::enqueue<char>(const char *data, double, bool);
@@ -174,4 +220,11 @@ template void stream_outlet_impl::enqueue<float>(const float *data, double, bool
template void stream_outlet_impl::enqueue<double>(const double *data, double, bool);
template void stream_outlet_impl::enqueue<std::string>(const std::string *data, double, bool);

void stream_outlet_impl::enqueue_sync_multi(
std::vector<asio::const_buffer> buffs, const double& timestamp, bool pushthrough) {
push_timestamp_sync(timestamp);
sync_buffs_.insert(sync_buffs_.end(), buffs.begin(), buffs.end());
if (pushthrough) pushthrough_sync();
}

} // namespace lsl
36 changes: 34 additions & 2 deletions src/stream_outlet_impl.h
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
#include "common.h"
#include "forward.h"
#include "stream_info_impl.h"
#include <asio/buffer.hpp>
#include <cstdint>
#include <loguru.hpp>
#include <memory>
@@ -136,7 +137,11 @@ class stream_outlet_impl {
void push_sample(const std::string *data, double timestamp = 0.0, bool pushthrough = true) {
enqueue(data, timestamp, pushthrough);
}

lsl_error_code_t push_sample_gather(
std::vector<asio::const_buffer> buffs, double timestamp = 0.0, bool pushthrough = true) {
enqueue_sync_multi(buffs, timestamp, pushthrough);
return lsl_no_error;
}

template <typename T>
inline lsl_error_code_t push_sample_noexcept(
@@ -251,7 +256,6 @@ class stream_outlet_impl {
throw std::runtime_error("The number of buffer elements to send is not a multiple of "
"the stream's channel count.");
if (num_samples > 0) {
if (timestamp == 0.0) timestamp = lsl_clock();
if (info().nominal_srate() != IRREGULAR_RATE)
timestamp = timestamp - (num_samples - 1) / info().nominal_srate();
push_sample(buffer, timestamp, pushthrough && (num_samples == 1));
@@ -294,13 +298,35 @@ class stream_outlet_impl {
/// Wait until some consumer shows up.
bool wait_for_consumers(double timeout = FOREVER);

/// If the outlet is intended to use synchronous blocking transfers
bool is_sync_blocking() { return do_sync_; };

private:
/// Instantiate a new server stack.
void instantiate_stack(udp udp_protocol);

/// Allocate and enqueue a new sample into the send buffer.
template <class T> void enqueue(const T *data, double timestamp, bool pushthrough);

/// Append the appropriate timestamp tag and optionally timestamp onto sync_buffs_ for a single
/// timestamp.
void push_timestamp_sync(const double& timestamp);

/// push sync_buffs_ through each tcp server.
void pushthrough_sync();

/// Append a single timestamp and single buffer to sync_buffs and optionally pushthrough the
/// server.
void enqueue_sync(asio::const_buffer buff, const double& timestamp, bool pushthrough);

/**
* Append a single timestamp and multiple within-sample buffers to sync_buffs_.
* This is useful when a sample is discontiguous in memory. It makes no assumptions about how
* many channels are included in each buffer.
*/
void enqueue_sync_multi(
std::vector<asio::const_buffer> buffs, const double& timestamp, bool pushthrough);

/**
* Check whether some given number of channels matches the stream's channel_count.
* Throws an error if not.
@@ -319,6 +345,8 @@ class stream_outlet_impl {
stream_info_impl_p info_;
/// the single-producer, multiple-receiver send buffer
send_buffer_p send_buffer_;
/// Flag to indicate that push_* operations should be blocking synchronous. false by default.
bool do_sync_;
/// the IO service objects
io_context_p io_ctx_data_, io_ctx_service_;

@@ -331,6 +359,10 @@ class stream_outlet_impl {
std::vector<udp_server_p> responders_;
/// threads that handle the I/O operations (two per stack: one for UDP and one for TCP)
std::vector<thread_p> io_threads_;
/// buffers used in synchronous call to gather-write data directly to the socket.
std::vector<asio::const_buffer> sync_buffs_;
/// timestamp buffer for sync transfers
std::vector<std::pair<uint64_t, double>> sync_timestamps_;
};

} // namespace lsl
102 changes: 93 additions & 9 deletions src/tcp_server.cpp
Original file line number Diff line number Diff line change
@@ -65,7 +65,9 @@ class client_session : public std::enable_shared_from_this<client_session> {
public:
/// Instantiate a new session & its socket.
client_session(const tcp_server_p &serv, tcp_socket &&sock)
: io_(serv->io_), serv_(serv), sock_(std::move(sock)), requeststream_(&requestbuf_) {}
: io_(serv->io_), serv_(serv), sock_(std::move(sock)), requeststream_(&requestbuf_) {
LOG_F(1, "Initialized client session %p", this);
}

/// Destructor.
~client_session();
@@ -142,10 +144,31 @@ class client_session : public std::enable_shared_from_this<client_session> {
std::condition_variable completion_cond_;
};

class sync_transfer_handler {
bool transfer_is_sync_;
// sockets that should receive data in sync mode
std::vector<tcp_socket_p> sync_sockets_;
// io context for sync mode, app is responsible for running it
asio::io_context io_ctx_;
public:
sync_transfer_handler(): io_ctx_(1) {

}

/// schedules a native socket handle to be added the next time a push operation is done
void add_socket(const tcp_socket::native_handle_type handle, tcp_socket::protocol_type protocol) {
asio::post(io_ctx_, [=](){
sync_sockets_.push_back(std::make_unique<tcp_socket>(io_ctx_, protocol, handle));
});
}
void write_all_blocking(const std::vector<asio::const_buffer> &bufs);
};

tcp_server::tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf,
factory_p factory, int chunk_size, bool allow_v4, bool allow_v6)
factory_p factory, int chunk_size, bool allow_v4, bool allow_v6, bool do_sync)
: chunk_size_(chunk_size), info_(std::move(info)), io_(std::move(io)),
factory_(std::move(factory)), send_buffer_(std::move(sendbuf)) {
if (do_sync) sync_handler = std::make_unique<sync_transfer_handler>();
// assign connection-dependent fields
info_->session_id(api_config::get_instance()->session_id());
info_->reset_uid();
@@ -178,6 +201,11 @@ tcp_server::tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p s
throw std::runtime_error("Failed to instantiate socket acceptors for the TCP server");
}

tcp_server::~tcp_server() noexcept
{
// defined here so the compiler can generate the destructor for the sync_handler
}


// === externally issued asynchronous commands ===

@@ -209,12 +237,9 @@ void tcp_server::end_serving() {

void tcp_server::accept_next_connection(tcp_acceptor_p &acceptor) {
try {
// Select the IO context for handling the socket
auto &sock_io_ctx = *io_;

// accept a connection on the session's socket
acceptor->async_accept(sock_io_ctx, [shared_this = shared_from_this(), &acceptor](
err_t err, tcp_socket sock) {
// accept a new connection
acceptor->async_accept(*io_, [shared_this = shared_from_this(), &acceptor](
err_t err, tcp_socket sock) {
if (err == asio::error::operation_aborted || err == asio::error::shut_down) return;

// no error: create a new session and start processing
@@ -232,7 +257,54 @@ void tcp_server::accept_next_connection(tcp_acceptor_p &acceptor) {
}


// === graceful cancellation of in-flight sockets ===
// === synchronous transfer

void tcp_server::write_all_blocking(const std::vector<asio::const_buffer> &bufs)
{
sync_handler->write_all_blocking(bufs);
}

void sync_transfer_handler::write_all_blocking(const std::vector<asio::const_buffer> &bufs) {
bool any_session_broken = false;

for (auto &sock : sync_sockets_) {
asio::async_write(*sock, bufs,
[this, &sock, &any_session_broken](
const asio::error_code &ec, size_t bytes_transferred) {
switch (ec.value()) {
case 0: break; // success
case asio::error::broken_pipe:
case asio::error::connection_reset:
LOG_F(WARNING, "Broken Pipe / Connection Reset detected. Closing socket.");
any_session_broken = true;
asio::post(io_ctx_, [sock]() {
asio::error_code close_ec;
sock->close(close_ec);
});
break;
case asio::error::operation_aborted:
LOG_F(INFO, "Socket wasn't fast enough");
break;
default:
LOG_F(ERROR, "Unhandled write_all_blocking error: %s.", ec.message().c_str());
}
});
}
try {
// prepare the io context for new work
io_ctx_.restart();
io_ctx_.run();

if (any_session_broken) {
// remove sessions whose socket was closed
auto new_end_it = std::remove_if(sync_sockets_.begin(), sync_sockets_.end(),
[](const tcp_socket_p &sock) {
return !sock->is_open();
});
sync_sockets_.erase(new_end_it, sync_sockets_.end());
}
} catch (std::exception &e) { LOG_F(ERROR, "Error during write_all_blocking: %s", e.what()); }
}

void tcp_server::register_inflight_session(const std::shared_ptr<client_session> &session) {
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
@@ -535,6 +607,18 @@ void client_session::handle_send_feedheader_outcome(err_t err, std::size_t n) {
// convenient for unit tests
if (max_buffered_ <= 0) return;

if (serv->sync_handler) {
LOG_F(INFO, "Using synchronous blocking transfers for new client session.");
auto protocol = sock_.local_endpoint().protocol();
// move the socket into the sync_transfer_io_ctx by releasing it from this
// io ctx and re-creating it with sync_transfer_io_ctx.
// See https://stackoverflow.com/q/52671836/73299
// Then schedule the sync_transfer_io_ctx to add it to the list of sync sockets
serv->sync_handler->add_socket(sock_.release(), protocol);
serv->unregister_inflight_session(this);
return;
}

// determine transfer parameters
auto queue = serv->send_buffer_->new_consumer(max_buffered_);

16 changes: 15 additions & 1 deletion src/tcp_server.h
Original file line number Diff line number Diff line change
@@ -47,9 +47,13 @@ class tcp_server : public std::enable_shared_from_this<tcp_server> {
* @param protocol The protocol (IPv4 or IPv6) that shall be serviced by this server.
* @param chunk_size The preferred chunk size, in samples. If 0, the pushthrough flag determines
* the effective chunking.
* @param do_sync Set true to indicate data transfer should happen synchronously in a blocking
* call. Default false -- asynchronous transfer in a thread (copies data).
*/
tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf, factory_p factory,
int chunk_size, bool allow_v4, bool allow_v6);
int chunk_size, bool allow_v4, bool allow_v6, bool do_sync = false);

~tcp_server() noexcept;

/**
* Begin serving TCP connections.
@@ -67,6 +71,12 @@ class tcp_server : public std::enable_shared_from_this<tcp_server> {
*/
void end_serving();

/**
* Write directly to each socket. This should only be used when server initialized with
* do_sync = true.
*/
void write_all_blocking(const std::vector<asio::const_buffer>& bufs);

private:
friend class client_session;

@@ -95,6 +105,10 @@ class tcp_server : public std::enable_shared_from_this<tcp_server> {
// acceptor socket
tcp_acceptor_p acceptor_v4_, acceptor_v6_; // our server socket


// optional pointer to a handler class for synchronous transfers
std::unique_ptr<class sync_transfer_handler> sync_handler;

// registry of in-flight asessions (for cancellation)
std::map<void *, std::weak_ptr<client_session>> inflight_;
std::recursive_mutex inflight_mut_; // mutex protecting the registry from concurrent access
61 changes: 61 additions & 0 deletions testing/flood.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#include <lsl_c.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

static double start_t;
static int64_t samples_pushed = 0;

void handle_signal(int signal) {
double time = lsl_local_clock() - start_t;
printf("%ld samples pushed in %fs, %d samples/s\n", samples_pushed, time,
(int)(samples_pushed / time));
if (signal == SIGTERM || signal == SIGINT) exit(0);
start_t = lsl_local_clock();
samples_pushed = 0;
}

int main(int argc, char *argv[]) {
signal(SIGTERM, handle_signal);
signal(SIGINT, handle_signal);
#ifdef SIGUSR1
signal(SIGUSR1, handle_signal);
#endif
#ifdef SIGCONT
signal(SIGCONT, handle_signal);
#endif
printf("LSL inlet stress tester. Sends [nchan] uint16 channels as fast as possible.\n");
printf("Usage: %s [streamname] [nchan=56] [chunksize=500]\n", argv[0]);
printf("Using lsl %d, lsl_library_info: %s\n\n", lsl_library_version(), lsl_library_info());

const char *name = argc > 1 ? argv[1] : "Flood";
const int nchan = argc > 2 ? strtol(argv[2], NULL, 10) : 56;
const int samples_per_chunk = argc > 3 ? strtol(argv[3], NULL, 10) : 500;
const char uid[] = "325wqer4354";

/* declare a new streaminfo (name: SendDataC / argument 1, content type: EEG, 8 channels, 500
* Hz, float values, some made-up device id (can also be empty) */
lsl_streaminfo info = lsl_create_streaminfo(name, "Bench", nchan, 50000, cft_int16, uid);

/* make a new outlet (chunking: default, buffering: 360 seconds) */
lsl_outlet outlet = lsl_create_outlet_ex(info, 0, 360, transp_sync_blocking);
char *infoxml = lsl_get_xml(lsl_get_info(outlet));
printf("Streaminfo: %s\n", infoxml);
lsl_destroy_string(infoxml);

const int buf_elements = nchan * samples_per_chunk;
int16_t *buf = malloc(buf_elements * 2);
memset(buf, 0xab, buf_elements * sizeof(buf[0]));

printf("Now sending data...\n");
start_t = lsl_local_clock();

for (int t = 0;; t++) {
lsl_push_chunk_s(outlet, buf, buf_elements);
samples_pushed += samples_per_chunk;
}
lsl_destroy_outlet(outlet);
return 0;
}
50 changes: 50 additions & 0 deletions testing/spike.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#include "lsl_cpp.h"
#include <chrono>
#include <iostream>
#include <random>
#include <cstdlib>
#include <thread>

using namespace std::chrono_literals;

const char *channels[] = {"C3", "C4", "Cz", "FPz", "POz", "CPz", "O1", "O2"};

int main(int argc, char *argv[]) {
int n_channels = argc > 1 ? std::stoi(argv[1]) : 200;
int samplingrate = argc > 2 ? std::stoi(argv[2]) : 50000;
int max_buffered = argc > 3 ? std::stoi(argv[3]) : 360;
bool sync = argc > 4 ? std::stoi(argv[4]) > 0 : true;

try {
lsl::stream_info info("Spike", "bench", n_channels, samplingrate, lsl::cf_int16, "Spike");
lsl::stream_outlet outlet(
info, 0, max_buffered, sync ? transp_sync_blocking : transp_default);

std::vector<int16_t> chunk(n_channels * samplingrate / 1000, 5);

const auto t_start = std::chrono::high_resolution_clock::now();
auto next_sample_time = t_start;
const auto time_per_chunk = std::chrono::microseconds(8 / samplingrate);

// send data forever
std::cout << "Now sending data... " << std::endl;
for (unsigned t = 0;; t++) {
if (t % 3 == 0) {
for (int s = 0; s < 5; ++s)
outlet.push_chunk_multiplexed(chunk);
// Wait until the next expected sample time.
} else {
for (int s = 0; s < 10; ++s) {
outlet.push_sample(chunk.data());
std::this_thread::sleep_for(100us);
}
}
next_sample_time += time_per_chunk;
std::this_thread::sleep_until(next_sample_time);
}

} catch (std::exception &e) { std::cerr << "Got an exception: " << e.what() << std::endl; }
std::cout << "Press any key to exit. " << std::endl;
std::cin.get();
return 0;
}