Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

input_chunk: express specific errors using negative encoded numerical error values. #9919

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion plugins/in_emitter/emitter.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ int static do_in_emitter_add_record(struct em_chunk *ec,
ec->tag, flb_sds_len(ec->tag),
ec->mp_sbuf.data,
ec->mp_sbuf.size);
if (ret == -1) {
if (ret < 0) {
flb_plg_error(ctx->ins, "error registering chunk with tag: %s", ec->tag);
/* Release the echunk */
em_chunk_destroy(ec);
Expand Down
89 changes: 77 additions & 12 deletions src/flb_input_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -837,8 +837,41 @@ static int input_chunk_write_header(struct cio_chunk *chunk, int event_type,
return 0;
}

struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, int event_type,
const char *tag, int tag_len)
static inline void errnum_set(int *errnum, int error)
{
if (errnum) {
*errnum = error;
}
}

static inline void errnum_set_from_errno(int *errnum)
{
if (errno) {
errnum_set(errnum, errno);
}
}

static inline void errnum_set_cio(int *errnum, int cio_err)
{
switch (cio_err) {
case CIO_OK:
errnum_set(errnum, 0);
break;
case CIO_CORRUPTED:
errnum_set(errnum, EIO);
break;
case CIO_RETRY:
errnum_set(errnum, EAGAIN);
break;
default:
case CIO_ERROR:
errnum_set(errnum, EINVAL);
break;
}
}

static struct flb_input_chunk *input_chunk_create(struct flb_input_instance *in, int event_type,
const char *tag, int tag_len, int *errnum)
{
int ret;
int err;
Expand All @@ -860,6 +893,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in
if (!chunk) {
flb_error("[input chunk] could not create chunk file: %s:%s",
storage->stream->name, name);
errnum_set_cio(errnum, err);
return NULL;
}
/*
Expand All @@ -870,6 +904,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in
if (ret == CIO_FALSE) {
ret = cio_chunk_up_force(chunk);
if (ret == -1) {
errnum_set(errnum, EIO);
cio_chunk_close(chunk, CIO_TRUE);
return NULL;
}
Expand All @@ -879,6 +914,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in
/* Write chunk header */
ret = input_chunk_write_header(chunk, event_type, (char *) tag, tag_len);
if (ret == -1) {
errnum_set(errnum, EIO);
cio_chunk_close(chunk, CIO_TRUE);
return NULL;
}
Expand All @@ -887,6 +923,7 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in
ic = flb_calloc(1, sizeof(struct flb_input_chunk));
if (!ic) {
flb_errno();
errnum_set_from_errno(errnum);
cio_chunk_close(chunk, CIO_TRUE);
return NULL;
}
Expand Down Expand Up @@ -938,6 +975,12 @@ struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, in
return ic;
}

struct flb_input_chunk *flb_input_chunk_create(struct flb_input_instance *in, int event_type,
const char *tag, int tag_len)
{
return input_chunk_create(in, event_type, tag, tag_len, NULL);
}

int flb_input_chunk_destroy_corrupted(struct flb_input_chunk *ic,
const char *tag_buf, int tag_len,
int del)
Expand Down Expand Up @@ -1109,7 +1152,8 @@ int flb_input_chunk_destroy(struct flb_input_chunk *ic, int del)
static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in,
int event_type,
const char *tag, int tag_len,
size_t chunk_size, int *set_down)
size_t chunk_size, int *set_down,
int *errnum)
{
int id = -1;
int ret;
Expand Down Expand Up @@ -1174,7 +1218,7 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in,

/* No chunk was found, we need to create a new one */
if (!ic) {
ic = flb_input_chunk_create(in, event_type, (char *) tag, tag_len);
ic = input_chunk_create(in, event_type, (char *) tag, tag_len, errnum);
new_chunk = FLB_TRUE;
if (!ic) {
return NULL;
Expand All @@ -1198,6 +1242,8 @@ static struct flb_input_chunk *input_chunk_get(struct flb_input_instance *in,
if (new_chunk || flb_routes_mask_is_empty(ic->routes_mask) == FLB_TRUE) {
flb_input_chunk_destroy(ic, FLB_TRUE);
}
/* Set the error no ENOSPC so the caller knows we have hit a storage limit. */
errnum_set(errnum, ENOSPC);
return NULL;
}

Expand Down Expand Up @@ -1466,6 +1512,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
const void *buf, size_t buf_size)
{
int ret, total_records_start;
int err = 0;
int set_down = FLB_FALSE;
int min;
int new_chunk = FLB_FALSE;
Expand Down Expand Up @@ -1513,7 +1560,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in,

if (ret != 0) {
/* we could not allocate the required space, just return */
return -1;
return -ENOMEM;
}
}
}
Expand All @@ -1522,7 +1569,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
if (flb_input_buf_paused(in) == FLB_TRUE) {
flb_debug("[input chunk] %s is paused, cannot append records",
flb_input_name(in));
return -1;
return -EAGAIN;
}

if (buf_size == 0) {
Expand All @@ -1549,10 +1596,17 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
* Get a target input chunk, can be one with remaining space available
* or a new one.
*/
ic = input_chunk_get(in, event_type, tag, tag_len, buf_size, &set_down);
ic = input_chunk_get(in, event_type, tag, tag_len, buf_size, &set_down, &err);
if (!ic) {
flb_error("[input chunk] no available chunk");
return -1;
if (err != 0) {
return -err;
}
/* fallback on returning errno if it is set. */
else if (errno != 0) {
return -errno;
}
return -EIO;
}

/* newly created chunk */
Expand All @@ -1564,9 +1618,17 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
ret = flb_input_chunk_is_up(ic);
if (ret == FLB_FALSE) {
ret = cio_chunk_up_force(ic->chunk);
if (ret == -1) {
if (ret <= CIO_ERROR) {
flb_error("[input chunk] cannot retrieve temporary chunk");
return -1;
switch (ret) {
case CIO_CORRUPTED:
return -EIO;
case CIO_RETRY:
return -EAGAIN;
case CIO_ERROR:
return -ENOMEM;
}
return -EINVAL;
}
set_down = FLB_TRUE;
}
Expand Down Expand Up @@ -1638,6 +1700,7 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
ret = flb_input_chunk_write(ic,
final_data_buffer,
final_data_size);
err = errno;
}
else {
ret = 0;
Expand All @@ -1661,8 +1724,10 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
flb_error("[input chunk] error writing data from %s instance",
flb_input_name(in));
cio_chunk_tx_rollback(ic->chunk);

return -1;
if (err) {
return -err;
}
return -EIO;
}

/* get the chunks content size */
Expand Down
Loading