Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_tail: reintroduced the old ignore_older behavior as opt-in #9913

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugins/in_tail/tail.c
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,12 @@ static struct flb_config_map config_map[] = {
"only available when a Parser is specified and it can parse the time "
"of a record."
},
{
FLB_CONFIG_MAP_BOOL, "ignore_active_older_files", "false",
0, FLB_TRUE, offsetof(struct flb_tail_config, ignore_active_older_files),
"ignore files that are older than the value set in ignore_older even "
"if the file is being ingested."
},
{
FLB_CONFIG_MAP_SIZE, "buffer_chunk_size", FLB_TAIL_CHUNK,
0, FLB_TRUE, offsetof(struct flb_tail_config, buf_chunk_size),
Expand Down
5 changes: 4 additions & 1 deletion plugins/in_tail/tail_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ struct flb_tail_config {
int read_from_head; /* read new files from head */
int rotate_wait; /* sec to wait on rotated files */
int watcher_interval; /* watcher interval */
int ignore_older; /* ignore fields older than X seconds */
int ignore_older; /* ignore fields older than X seconds */
int ignore_active_older_files; /* ignore files that exceed the ignore
* older limit even if they are already
* being ingested */
time_t last_pending; /* last time a 'pending signal' was emitted' */
struct mk_list *path_list; /* list of paths to scan (glob) */
flb_sds_t path_key; /* key name of file path */
Expand Down
13 changes: 13 additions & 0 deletions plugins/in_tail/tail_file.c
Original file line number Diff line number Diff line change
Expand Up @@ -1898,6 +1898,7 @@ static int check_purge_deleted_file(struct flb_tail_config *ctx,
struct flb_tail_file *file, time_t ts)
{
int ret;
int64_t mtime;
struct stat st;

ret = fstat(file->fd, &st);
Expand All @@ -1921,6 +1922,18 @@ static int check_purge_deleted_file(struct flb_tail_config *ctx,
return FLB_TRUE;
}

if (ctx->ignore_older > 0 && ctx->ignore_active_older_files) {
mtime = flb_tail_stat_mtime(&st);
if (mtime > 0) {
if ((ts - ctx->ignore_older) > mtime) {
flb_plg_debug(ctx->ins, "purge: monitored file (ignore older): %s",
file->name);
flb_tail_file_remove(file);
return FLB_TRUE;
}
}
}

return FLB_FALSE;
}

Expand Down
81 changes: 77 additions & 4 deletions tests/runtime/in_tail.c
Original file line number Diff line number Diff line change
Expand Up @@ -676,9 +676,9 @@ void flb_test_in_tail_skip_long_lines()
unlink(path);
}

/*
/*
* test case for https://github.com/fluent/fluent-bit/issues/3943
*
*
* test to read the lines "CRLF + empty_line + LF"
*/
void flb_test_in_tail_issue_3943()
Expand Down Expand Up @@ -1312,6 +1312,76 @@ void flb_test_ignore_older()
}
}

void flb_test_in_tail_ignore_active_older_files()
{
struct flb_lib_out_cb cb_data;
struct test_tail_ctx *ctx;
char *file[] = {"source_file.log"};
char *path = "source_file.log";
char *msg = "TEST LINE";
const int expected = 1;
int ret;
int num;
int unused;

clear_output_num();

cb_data.cb = cb_count_msgpack;
cb_data.data = &unused;

ctx = test_tail_ctx_create(&cb_data, &file[0], sizeof(file)/sizeof(char *), FLB_TRUE);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
return;
}

ret = flb_input_set(ctx->flb, ctx->o_ffd,
"path", path,
"ignore_older", "2s",
"read_from_head", "on",
"ignore_active_older_files", "on",
NULL);
TEST_CHECK(ret == 0);

/* Start the engine */
ret = flb_start(ctx->flb);

if (!TEST_CHECK(ret == 0)) {
test_tail_ctx_destroy(ctx);

return;
}

ret = write_msg(ctx, msg, strlen(msg));

if (!TEST_CHECK(ret > 0)) {
test_tail_ctx_destroy(ctx);

return;
}

/* waiting to flush */
flb_time_msleep(6000);

ret = write_msg(ctx, msg, strlen(msg));

if (!TEST_CHECK(ret > 0)) {
test_tail_ctx_destroy(ctx);

return;
}

/* waiting to flush */
flb_time_msleep(1500);

num = get_output_num();
if (!TEST_CHECK(num == expected)) {
TEST_MSG("output num error. expect=%d got=%d", expected, num);
}

test_tail_ctx_destroy(ctx);
}

void flb_test_inotify_watcher_false()
{
struct flb_lib_out_cb cb_data;
Expand Down Expand Up @@ -1695,7 +1765,7 @@ void flb_test_db_delete_stale_file()

/*
* Changing the file name from 'test_db_stale.log' to
* 'test_db_stale_new.log.' In this scenario, it is assumed that the
* 'test_db_stale_new.log.' In this scenario, it is assumed that the
* file was deleted after the FluentBit was terminated. However, since
* the FluentBit was shutdown, the inode remains in the database.
* The reason for renaming is to preserve the existing file for later use.
Expand Down Expand Up @@ -1792,7 +1862,7 @@ void flb_test_db_delete_stale_file()

num = get_output_num();
if (!TEST_CHECK(num == 3)) {
/* 3 =
/* 3 =
* test_db.log : "hello db end"
* test_db_stale.log : "msg_init" + "hello db end"
*/
Expand Down Expand Up @@ -1986,5 +2056,8 @@ TEST_LIST = {
{"in_tail_dockermode_firstline_detection", flb_test_in_tail_dockermode_firstline_detection},
{"in_tail_multiline_json_and_regex", flb_test_in_tail_multiline_json_and_regex},
#endif

{"ignore_active_older_files", flb_test_in_tail_ignore_active_older_files},

{NULL, NULL}
};