Skip to content

Commit

Permalink
Merge pull request zeromq#2903 from sigiesec/fix-2895-2
Browse files Browse the repository at this point in the history
Problem: in case of exhausted resources on creation of a context, assertions are triggered
  • Loading branch information
bluca authored Jan 31, 2018
2 parents 4e2b9e6 + 206c832 commit c6bd123
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 49 deletions.
116 changes: 78 additions & 38 deletions src/ctx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ zmq::ctx_t::~ctx_t ()
tag = ZMQ_CTX_TAG_VALUE_BAD;
}

bool zmq::ctx_t::valid () const
{
return term_mailbox.valid ();
}

int zmq::ctx_t::terminate ()
{
slot_sync.lock();
Expand All @@ -146,7 +151,6 @@ int zmq::ctx_t::terminate ()
terminating = saveTerminating;

if (!starting) {

#ifdef HAVE_FORK
if (pid != getpid ()) {
// we are a forked child process. Close all file descriptors
Expand Down Expand Up @@ -320,47 +324,83 @@ int zmq::ctx_t::get (int option_)
return rc;
}

zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
bool zmq::ctx_t::start ()
{
scoped_lock_t locker(slot_sync);
// Initialise the array of mailboxes. Additional three slots are for
// zmq_ctx_term thread and reaper thread.
opt_sync.lock ();
int mazmq = max_sockets;
int ios = io_thread_count;
opt_sync.unlock ();
slot_count = mazmq + ios + 2;
slots = (i_mailbox **) malloc (sizeof (i_mailbox *) * slot_count);
if (!slots) {
errno = ENOMEM;
goto fail;
}

// Initialise the infrastructure for zmq_ctx_term thread.
slots[term_tid] = &term_mailbox;

// Create the reaper thread.
reaper = new (std::nothrow) reaper_t (this, reaper_tid);
if (!reaper) {
errno = ENOMEM;
goto fail_cleanup_slots;
}
if (!reaper->get_mailbox ()->valid ())
goto fail_cleanup_reaper;
slots[reaper_tid] = reaper->get_mailbox ();
reaper->start ();

// Create I/O thread objects and launch them.
for (int32_t i = (int32_t) slot_count - 1; i >= (int32_t) 2; i--) {
slots[i] = NULL;
}

for (int i = 2; i != ios + 2; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
if (!io_thread) {
errno = ENOMEM;
goto fail_cleanup_reaper;
}
if (!io_thread->get_mailbox ()->valid ()) {
delete io_thread;
goto fail_cleanup_reaper;
}
io_threads.push_back (io_thread);
slots [i] = io_thread->get_mailbox ();
io_thread->start ();
}

if (unlikely (starting)) {
// In the unused part of the slot array, create a list of empty slots.
for (int32_t i = (int32_t) slot_count - 1; i >= (int32_t) ios + 2; i--) {
empty_slots.push_back (i);
}

starting = false;
// Initialise the array of mailboxes. Additional three slots are for
// zmq_ctx_term thread and reaper thread.
opt_sync.lock ();
int mazmq = max_sockets;
int ios = io_thread_count;
opt_sync.unlock ();
slot_count = mazmq + ios + 2;
slots = (i_mailbox **) malloc (sizeof (i_mailbox*) * slot_count);
alloc_assert (slots);

// Initialise the infrastructure for zmq_ctx_term thread.
slots [term_tid] = &term_mailbox;

// Create the reaper thread.
reaper = new (std::nothrow) reaper_t (this, reaper_tid);
alloc_assert (reaper);
slots [reaper_tid] = reaper->get_mailbox ();
reaper->start ();

// Create I/O thread objects and launch them.
for (int i = 2; i != ios + 2; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
alloc_assert (io_thread);
io_threads.push_back (io_thread);
slots [i] = io_thread->get_mailbox ();
io_thread->start ();
}
starting = false;
return true;

// In the unused part of the slot array, create a list of empty slots.
for (int32_t i = (int32_t) slot_count - 1;
i >= (int32_t) ios + 2; i--) {
empty_slots.push_back (i);
slots [i] = NULL;
}
fail_cleanup_reaper:
reaper->stop ();
delete reaper;
reaper = NULL;

fail_cleanup_slots:
free (slots);
slots = NULL;

fail:
return false;
}

zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
scoped_lock_t locker (slot_sync);

if (unlikely (starting)) {
if (!start ())
return NULL;
}

// Once zmq_ctx_term() was called, we can't create new sockets.
Expand Down
3 changes: 3 additions & 0 deletions src/ctx.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ namespace zmq

~ctx_t ();

bool valid() const;

private:
bool start();

struct pending_connection_t
{
Expand Down
13 changes: 9 additions & 4 deletions src/io_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,16 @@
#include "ctx.hpp"

zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
object_t (ctx_, tid_)
object_t (ctx_, tid_),
mailbox_handle (NULL)
{
poller = new (std::nothrow) poller_t (*ctx_);
alloc_assert (poller);

mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
poller->set_pollin (mailbox_handle);
if (mailbox.get_fd () != retired_fd) {
mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
poller->set_pollin (mailbox_handle);
}
}

zmq::io_thread_t::~io_thread_t ()
Expand Down Expand Up @@ -109,6 +112,8 @@ zmq::poller_t *zmq::io_thread_t::get_poller ()

void zmq::io_thread_t::process_stop ()
{
poller->rm_fd (mailbox_handle);
if (mailbox_handle) {
poller->rm_fd (mailbox_handle);
}
poller->stop ();
}
5 changes: 5 additions & 0 deletions src/mailbox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,8 @@ int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
zmq_assert (ok);
return 0;
}

bool zmq::mailbox_t::valid () const
{
return signaler.valid ();
}
2 changes: 2 additions & 0 deletions src/mailbox.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ namespace zmq
void send (const command_t &cmd_);
int recv (command_t *cmd_, int timeout_);

bool valid () const;

#ifdef HAVE_FORK
// close the file descriptors in the signaller. This is used in a forked
// child process to close the file descriptors so that they do not interfere
Expand Down
10 changes: 9 additions & 1 deletion src/reaper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@
zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
object_t (ctx_, tid_),
mailbox_handle((poller_t::handle_t)NULL),
poller (NULL),
sockets (0),
terminating (false)
{
if (!mailbox.valid ())
return;

poller = new (std::nothrow) poller_t (*ctx_);
alloc_assert (poller);

Expand All @@ -64,13 +68,17 @@ zmq::mailbox_t *zmq::reaper_t::get_mailbox ()

void zmq::reaper_t::start ()
{
zmq_assert (mailbox.valid ());

// Start the thread.
poller->start ();
}

void zmq::reaper_t::stop ()
{
send_stop ();
if (get_mailbox ()->valid ()) {
send_stop ();
}
}

void zmq::reaper_t::in_event ()
Expand Down
7 changes: 6 additions & 1 deletion src/select.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ zmq::select_t::select_t (const zmq::ctx_t &ctx_) :
#else
maxfd (retired_fd),
#endif
started (false),
stopping (false)
{
#if defined ZMQ_HAVE_WINDOWS
Expand All @@ -65,7 +66,10 @@ zmq::select_t::select_t (const zmq::ctx_t &ctx_) :

zmq::select_t::~select_t ()
{
worker.stop ();
if (started) {
stop ();
worker.stop ();
}
}

zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
Expand Down Expand Up @@ -257,6 +261,7 @@ void zmq::select_t::reset_pollout (handle_t handle_)
void zmq::select_t::start ()
{
ctx.start_thread (worker, worker_routine, this);
started = true;
}

void zmq::select_t::stop ()
Expand Down
3 changes: 3 additions & 0 deletions src/select.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ class select_t : public poller_base_t
static fd_entries_t::iterator
find_fd_entry_by_handle (fd_entries_t &fd_entries, handle_t handle_);

// If true, start has been called.
bool started;

// If true, thread is shutting down.
bool stopping;

Expand Down
7 changes: 6 additions & 1 deletion src/signaler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,11 @@ int zmq::signaler_t::recv_failable ()
return 0;
}

bool zmq::signaler_t::valid () const
{
return w != retired_fd;
}

#ifdef HAVE_FORK
void zmq::signaler_t::forked ()
{
Expand All @@ -398,7 +403,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
errno_assert (errno == ENFILE || errno == EMFILE);
*w_ = *r_ = -1;
return -1;
}
}
else {
*w_ = *r_ = fd;
return 0;
Expand Down
7 changes: 6 additions & 1 deletion src/signaler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,16 @@ namespace zmq
signaler_t ();
~signaler_t ();

// Returns the socket/file descriptor
// May return retired_fd if the signaler could not be initialized.
fd_t get_fd () const;
void send ();
int wait (int timeout_);
void recv ();
int recv_failable ();

bool valid () const;

#ifdef HAVE_FORK
// close the file descriptors in a forked child process so that they
// do not interfere with the context in the parent process.
Expand All @@ -70,7 +74,8 @@ namespace zmq
static int make_fdpair (fd_t *r_, fd_t *w_);

// Underlying write & read file descriptor
// Will be -1 if we exceeded number of available handles
// Will be -1 if an error occurred during initialization, e.g. we
// exceeded the number of available handles
fd_t w;
fd_t r;

Expand Down
15 changes: 13 additions & 2 deletions src/socket_poller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,19 @@ int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short e
zmq_assert (rc == 0);

if (thread_safe) {
if (signaler == NULL)
signaler = new signaler_t ();
if (signaler == NULL) {
signaler = new (std::nothrow) signaler_t ();
if (!signaler) {
errno = ENOMEM;
return -1;
}
if (!signaler->valid ()) {
delete signaler;
signaler = NULL;
errno = EMFILE;
return -1;
}
}

rc = socket_->add_signaler (signaler);
zmq_assert (rc == 0);
Expand Down
7 changes: 6 additions & 1 deletion src/zmq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,12 @@ void *zmq_ctx_new (void)

// Create 0MQ context.
zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t;
alloc_assert (ctx);
if (ctx) {
if (!ctx->valid ()) {
delete ctx;
return NULL;
}
}
return ctx;
}

Expand Down

0 comments on commit c6bd123

Please sign in to comment.