Skip to content

Commit

Permalink
* SYNC [core/sp] sync nng
Browse files Browse the repository at this point in the history
Signed-off-by: wayne <[email protected]>
  • Loading branch information
StargazerWayne committed Dec 22, 2023
1 parent 8932966 commit 1d26c8e
Show file tree
Hide file tree
Showing 16 changed files with 193 additions and 62 deletions.
33 changes: 33 additions & 0 deletions src/core/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,39 @@ nni_aio_list_active(nni_aio *aio)
return (nni_list_node_active(&aio->a_prov_node));
}

// completions list.
// Implementation note: in order to avoid wasting space, we
// reuse the reap node -- which will be inactive here.
void
nni_aio_completions_init(nni_aio_completions *clp)
{
*clp = NULL;
}

void
nni_aio_completions_add(nni_aio_completions *clp, nni_aio *aio, int result, size_t count)
{
NNI_ASSERT(!nni_aio_list_active(aio));

Check warning on line 523 in src/core/aio.c

View check run for this annotation

Codecov / codecov/patch

src/core/aio.c#L523

Added line #L523 was not covered by tests
aio->a_reap_node.rn_next = *clp;
aio->a_result = result;
aio->a_count = count;
*clp = aio;
}

void
nni_aio_completions_run(nni_aio_completions *clp)
{
nni_aio *aio;
nni_aio *cl = *clp;
*clp = NULL;

while ((aio = cl) != NULL) {
cl = (void *)aio->a_reap_node.rn_next;
aio->a_reap_node.rn_next = NULL;
nni_aio_finish_sync(aio, aio->a_result, aio->a_count);
}
}

static void
nni_aio_expire_add(nni_aio *aio)
{
Expand Down
26 changes: 25 additions & 1 deletion src/core/aio.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2022 Staysail Systems, Inc. <[email protected]>
// Copyright 2023 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
//
// This software is supplied under the terms of the MIT License, a
Expand Down Expand Up @@ -166,6 +166,30 @@ extern int nni_aio_schedule(nni_aio *, nni_aio_cancel_fn, void *);

extern void nni_sleep_aio(nni_duration, nni_aio *);

// nni_aio_completion_list is used after removing the aio from an
// active work queue, and keeping them so that the completions can
// be run in a deferred manner. These lists are simple, and intended
// to be used as local variables. It's important to initialize the
// list before using it. Also, any AIO added to a completion list must
// not be in active use anywhere.
typedef void *nni_aio_completions;

// nni_aio_completions_init just initializes a completions list.
// This just sets the pointed value to NULL.
extern void nni_aio_completions_init(nni_aio_completions *);

// nni_aio_completions_run runs nni_aio_finish_sync for all the aio objects
// that have been added to the completions. The result code and count used
// are those supplied in nni_aio_completions_add. Callers should not hold
// locks when calling this.
extern void nni_aio_completions_run(nni_aio_completions *);

// nni_aio_completions_add adds an aio (with the result code and length as
// appropriate) to the completion list. This should be done while the
// appropriate lock is held. The aio must not be scheduled.
extern void nni_aio_completions_add(nni_aio_completions *, nni_aio *,
int, size_t);

extern int nni_aio_sys_init(void);
extern void nni_aio_sys_fini(void);

Expand Down
2 changes: 1 addition & 1 deletion src/core/dialer.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ nni_dialer_destroy(nni_dialer *d)
NNI_FREE_STRUCT(d);
}

#if NNG_ENABLE_STATS
#ifdef NNG_ENABLE_STATS
static void
dialer_stat_init(nni_dialer *d, nni_stat_item *item, const nni_stat_info *info)
{
Expand Down
2 changes: 1 addition & 1 deletion src/core/listener.c
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ nni_listener_getopt(
void
nni_listener_add_stat(nni_listener *l, nni_stat_item *item)
{
#if NNG_ENABLE_STATS
#ifdef NNG_ENABLE_STATS
nni_stat_add(&l->st_root, item);
#else
NNI_ARG_UNUSED(l);
Expand Down
2 changes: 1 addition & 1 deletion src/core/panic.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
void
nni_show_backtrace(void)
{
#if NNG_HAVE_BACKTRACE
#ifdef NNG_HAVE_BACKTRACE
void *frames[50];
int nframes;

Expand Down
2 changes: 1 addition & 1 deletion src/core/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ nni_pipe_create_listener(nni_pipe **pp, nni_listener *l, void *tran_data)
return (rv);
}
p->p_listener = l;
#if NNG_ENABLE_STATS
#ifdef NNG_ENABLE_STATS
static const nni_stat_info listener_info = {
.si_name = "listener",
.si_desc = "listener for pipe",
Expand Down
8 changes: 4 additions & 4 deletions src/sp/protocol/pair1/pair.c
Original file line number Diff line number Diff line change
Expand Up @@ -498,15 +498,15 @@ pair1_pipe_send(pair1_pipe *p, nni_msg *m)
// assumption: we have unique access to the message at this point.
NNI_ASSERT(!nni_msg_shared(m));

#if NNG_TEST_LIB
#ifdef NNG_TEST_LIB
if (s->inject_header) {
goto inject;
}
#endif
NNI_ASSERT(nni_msg_header_len(m) == sizeof(uint32_t));
nni_msg_header_poke_u32(m, nni_msg_header_peek_u32(m) + 1);

#if NNG_TEST_LIB
#ifdef NNG_TEST_LIB
inject:
#endif

Expand All @@ -531,7 +531,7 @@ pair1_sock_send(void *arg, nni_aio *aio)
return;
}

#if NNG_TEST_LIB
#ifdef NNG_TEST_LIB
if (s->inject_header) {
goto inject;
}
Expand All @@ -554,7 +554,7 @@ pair1_sock_send(void *arg, nni_aio *aio)
nni_msg_header_append_u32(m, 0);
}

#if NNG_TEST_LIB
#ifdef NNG_TEST_LIB
inject:
#endif

Expand Down
67 changes: 32 additions & 35 deletions src/sp/protocol/pubsub0/sub.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright 2021 Staysail Systems, Inc. <[email protected]>
// Copyright 2023 Staysail Systems, Inc. <[email protected]>
// Copyright 2018 Capitar IT Group BV <[email protected]>
// Copyright 2019 Nathan Kent <[email protected]>
//
Expand Down Expand Up @@ -44,14 +44,14 @@ static void sub0_pipe_fini(void *);
struct sub0_topic {
nni_list_node node;
size_t len;
void * buf;
void *buf;
};

// sub0_ctx is a context for a SUB socket. The advantage of contexts is
// that different contexts can maintain different subscriptions.
struct sub0_ctx {
nni_list_node node;
sub0_sock * sock;
sub0_sock *sock;
nni_list topics; // TODO: Consider patricia trie
nni_list recv_queue; // can have multiple pending receives
nni_lmq lmq;
Expand All @@ -71,15 +71,15 @@ struct sub0_sock {

// sub0_pipe is our per-pipe protocol private structure.
struct sub0_pipe {
nni_pipe * pipe;
nni_pipe *pipe;
sub0_sock *sub;
nni_aio aio_recv;
};

static void
sub0_ctx_cancel(nng_aio *aio, void *arg, int rv)
{
sub0_ctx * ctx = arg;
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
nni_mtx_lock(&sock->lk);
if (nni_list_active(&ctx->recv_queue, aio)) {
Expand All @@ -92,9 +92,9 @@ sub0_ctx_cancel(nng_aio *aio, void *arg, int rv)
static void
sub0_ctx_recv(void *arg, nni_aio *aio)
{
sub0_ctx * ctx = arg;
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
nni_msg * msg;
nni_msg *msg;

if (nni_aio_begin(aio) != 0) {
return;
Expand Down Expand Up @@ -140,9 +140,9 @@ sub0_ctx_send(void *arg, nni_aio *aio)
static void
sub0_ctx_close(void *arg)
{
sub0_ctx * ctx = arg;
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
nni_aio * aio;
nni_aio *aio;

nni_mtx_lock(&sock->lk);
while ((aio = nni_list_first(&ctx->recv_queue)) != NULL) {
Expand All @@ -155,8 +155,8 @@ sub0_ctx_close(void *arg)
static void
sub0_ctx_fini(void *arg)
{
sub0_ctx * ctx = arg;
sub0_sock * sock = ctx->sock;
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
sub0_topic *topic;

sub0_ctx_close(ctx);
Expand All @@ -179,7 +179,7 @@ static void
sub0_ctx_init(void *ctx_arg, void *sock_arg)
{
sub0_sock *sock = sock_arg;
sub0_ctx * ctx = ctx_arg;
sub0_ctx *ctx = ctx_arg;
size_t len;
bool prefer_new;

Expand Down Expand Up @@ -311,22 +311,22 @@ sub0_matches(sub0_ctx *ctx, uint8_t *body, size_t len)
static void
sub0_recv_cb(void *arg)
{
sub0_pipe *p = arg;
sub0_sock *sock = p->sub;
sub0_ctx * ctx;
nni_msg * msg;
size_t len;
uint8_t * body;
nni_list finish;
nng_aio * aio;
nni_msg * dup_msg;
sub0_pipe *p = arg;
sub0_sock *sock = p->sub;
sub0_ctx *ctx;
nni_msg *msg;
size_t len;
uint8_t *body;
nng_aio *aio;
nni_msg *dup_msg;
nni_aio_completions finish;

if (nni_aio_result(&p->aio_recv) != 0) {
nni_pipe_close(p->pipe);
return;
}

nni_aio_list_init(&finish);
nni_aio_completions_init(&finish);

msg = nni_aio_get_msg(&p->aio_recv);
nni_aio_set_msg(&p->aio_recv, NULL);
Expand Down Expand Up @@ -370,7 +370,7 @@ sub0_recv_cb(void *arg)
nni_aio_set_msg(aio, dup_msg);

// Save for synchronous completion
nni_list_append(&finish, aio);
nni_aio_completions_add(&finish, aio, 0, len);
} else if (nni_lmq_full(&ctx->lmq)) {
// Make space for the new message.
nni_msg *old;
Expand Down Expand Up @@ -401,18 +401,15 @@ sub0_recv_cb(void *arg)
nni_msg_free(msg);
}

while ((aio = nni_list_first(&finish)) != NULL) {
nni_list_remove(&finish, aio);
nni_aio_finish_sync(aio, 0, len);
}
nni_aio_completions_run(&finish);

nni_pipe_recv(p->pipe, &p->aio_recv);
}

static int
sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t)
{
sub0_ctx * ctx = arg;
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
int val;
nni_mtx_lock(&sock->lk);
Expand All @@ -425,7 +422,7 @@ sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t)
static int
sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_ctx * ctx = arg;
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
int val;
int rv;
Expand Down Expand Up @@ -456,8 +453,8 @@ sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t)
static int
sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_ctx * ctx = arg;
sub0_sock * sock = ctx->sock;
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
sub0_topic *topic;
sub0_topic *new_topic;
NNI_ARG_UNUSED(t);
Expand Down Expand Up @@ -494,8 +491,8 @@ sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t)
static int
sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_ctx * ctx = arg;
sub0_sock * sock = ctx->sock;
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
sub0_topic *topic;
size_t len;
NNI_ARG_UNUSED(t);
Expand Down Expand Up @@ -540,7 +537,7 @@ sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t)
static int
sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t)
{
sub0_ctx * ctx = arg;
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
bool val;

Expand All @@ -554,7 +551,7 @@ sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t)
static int
sub0_ctx_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t)
{
sub0_ctx * ctx = arg;
sub0_ctx *ctx = arg;
sub0_sock *sock = ctx->sock;
bool val;
int rv;
Expand Down
Loading

0 comments on commit 1d26c8e

Please sign in to comment.