Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions include/fluent-bit/flb_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#ifndef FLB_IO_H
#define FLB_IO_H

#include <stddef.h>
#include <monkey/mk_core.h>

#include <fluent-bit/flb_info.h>
Expand All @@ -40,6 +41,10 @@
#define FLB_IO_IPV6 32 /* network I/O uses IPv6 */

struct flb_connection;
struct flb_iovec {
void *iov_base;
size_t iov_len;
};

int flb_io_net_accept(struct flb_connection *connection,
struct flb_coro *th);
Expand All @@ -50,6 +55,11 @@ int flb_io_net_connect(struct flb_connection *u_conn,
int flb_io_net_write(struct flb_connection *connection, const void *data,
size_t len, size_t *out_len);

int flb_io_net_writev(struct flb_connection *connection,
const struct flb_iovec *iov,
int iovcnt,
size_t *out_len);

ssize_t flb_io_net_read(struct flb_connection *connection, void *buf, size_t len);

int flb_io_fd_write(int fd, const void *data, size_t len, size_t *out_len);
Expand Down
72 changes: 58 additions & 14 deletions src/flb_http_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1491,24 +1491,68 @@ int flb_http_do_request(struct flb_http_client *c, size_t *bytes)
}
#endif

/* Write the header */
ret = flb_io_net_write(c->u_conn,
c->header_buf, c->header_len,
&bytes_header);
if (ret == -1) {
/* errno might be changed from the original call */
if (errno != 0) {
flb_errno();
if (c->body_len > 0) {
struct flb_iovec io_vector[2];

io_vector[0].iov_base = c->header_buf;
io_vector[0].iov_len = c->header_len;
io_vector[1].iov_base = c->body_buf;
io_vector[1].iov_len = c->body_len;

ret = flb_io_net_writev(c->u_conn,
io_vector,
2,
&bytes_header);
if (ret == -1) {
if (errno == ENOMEM && bytes_header == 0) {
ret = flb_io_net_write(c->u_conn,
c->header_buf, c->header_len,
&bytes_header);
if (ret == -1) {
if (errno != 0) {
flb_errno();
}
return FLB_HTTP_ERROR;
}

ret = flb_io_net_write(c->u_conn,
c->body_buf, c->body_len,
&bytes_body);
if (ret == -1) {
if (errno != 0) {
flb_errno();
}
return FLB_HTTP_ERROR;
}
}
else {
if (errno != 0) {
flb_errno();
}
return FLB_HTTP_ERROR;
}
}
return FLB_HTTP_ERROR;
}
else {
if (bytes_header < c->header_len) {
flb_error("[http_client] invalid write accounting: total=%zu header=%zu",
bytes_header, c->header_len);
return FLB_HTTP_ERROR;
}

if (c->body_len > 0) {
bytes_body = bytes_header - c->header_len;
bytes_header = c->header_len;
}
}
else {
/* Write the header */
ret = flb_io_net_write(c->u_conn,
c->body_buf, c->body_len,
&bytes_body);
c->header_buf, c->header_len,
&bytes_header);
if (ret == -1) {
flb_errno();
/* errno might be changed from the original call */
if (errno != 0) {
flb_errno();
}
return FLB_HTTP_ERROR;
}
}
Expand Down
106 changes: 106 additions & 0 deletions src/flb_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
#include <stdlib.h>
#include <limits.h>
#include <assert.h>
#include <string.h>
#include <errno.h>
#ifndef FLB_SYSTEM_WINDOWS
#include <unistd.h>
#endif

#include <monkey/mk_core.h>
#include <fluent-bit/flb_info.h>
Expand All @@ -63,6 +68,36 @@
#include <fluent-bit/flb_coro.h>
#include <fluent-bit/flb_http_client.h>


static int flb_io_get_iov_max()
{
long limit;

#ifdef IOV_MAX
limit = IOV_MAX;
#else
limit = -1;
#endif

#ifdef _SC_IOV_MAX
{
long sys_limit;

sys_limit = sysconf(_SC_IOV_MAX);

if (sys_limit > 0) {
limit = sys_limit;
}
}
#endif

if (limit <= 0 || limit > INT_MAX) {
limit = 1024;
}

return (int) limit;
}

int flb_io_net_accept(struct flb_connection *connection,
struct flb_coro *coro)
{
Expand Down Expand Up @@ -664,6 +699,77 @@ static FLB_INLINE ssize_t net_io_read_async(struct flb_coro *co,
return ret;
}


int flb_io_net_writev(struct flb_connection *connection,
const struct flb_iovec *iov,
int iovcnt,
size_t *out_len)
{
int result;
int index;
size_t total;
size_t total_length;
char *temporary_buffer;

if (iov == NULL || iovcnt <= 0 || out_len == NULL) {
errno = EINVAL;

return -1;
}

if (iovcnt > flb_io_get_iov_max()) {
errno = EINVAL;

return -1;
}

total_length = 0;

for (index = 0 ; index < iovcnt ; index++) {
/* Overflow guard */
if (iov[index].iov_len > SIZE_MAX - total_length) {
errno = EOVERFLOW;
return -1;
}

if (iov[index].iov_len > 0 && iov[index].iov_base == NULL) {
errno = EINVAL;
return -1;
}

total_length += iov[index].iov_len;
}

if (total_length == 0) {
*out_len = 0;

return 0;
}

temporary_buffer = flb_malloc(total_length);

if (temporary_buffer == NULL) {
flb_errno();

return -1;
}

total = 0;

for (index = 0 ; index < iovcnt ; index++) {
if (iov[index].iov_len > 0) {
memcpy(&temporary_buffer[total], iov[index].iov_base, iov[index].iov_len);
}
total += iov[index].iov_len;
}

result = flb_io_net_write(connection, temporary_buffer, total_length, out_len);

flb_free(temporary_buffer);

return result;
}

/* Write data to fd. For unix socket. */
int flb_io_fd_write(int fd, const void *data, size_t len, size_t *out_len)
{
Expand Down
Loading