Skip to content

Commit 00baa18

Browse files
committed
router: for direct routes improve the handling of input instance source
- Extended struct flb_input_routes to retain the parsed plugin name, track whether an alias was provided, and cache the resolved input instance for cleanup and reuse. - Updated router configuration parsing and application to capture plugin names by default, resolve inputs by alias, internal instance name, or plugin type while avoiding reuse, and consume the enhanced resolver inside flb_router_apply_config. Signed-off-by: Eduardo Silva <[email protected]>
1 parent c26a2d1 commit 00baa18

File tree

2 files changed

+118
-17
lines changed

2 files changed

+118
-17
lines changed

include/fluent-bit/flb_router.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,9 @@ struct flb_route {
150150

151151
struct flb_input_routes {
152152
flb_sds_t input_name;
153+
flb_sds_t plugin_name;
154+
int has_alias;
155+
struct flb_input_instance *instance;
153156
struct cfl_list processors;
154157
struct cfl_list routes;
155158
struct cfl_list _head;

src/flb_router_config.c

Lines changed: 115 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,10 @@ static void input_routes_destroy(struct flb_input_routes *input)
388388
flb_sds_destroy(input->input_name);
389389
}
390390

391+
if (input->plugin_name) {
392+
flb_sds_destroy(input->plugin_name);
393+
}
394+
391395
flb_free(input);
392396
}
393397

@@ -1080,6 +1084,8 @@ static int parse_input_section(struct flb_cf_section *section,
10801084
struct cfl_list *input_routes,
10811085
struct flb_config *config)
10821086
{
1087+
uint32_t mask;
1088+
size_t before_count;
10831089
struct flb_input_routes *input;
10841090
struct cfl_kvlist *kvlist;
10851091
struct cfl_variant *name_var;
@@ -1088,8 +1094,7 @@ static int parse_input_section(struct flb_cf_section *section,
10881094
struct cfl_kvlist *routes_kvlist;
10891095
struct cfl_list *head;
10901096
struct cfl_kvpair *pair;
1091-
uint32_t mask;
1092-
size_t before_count;
1097+
struct cfl_variant *alias_var;
10931098

10941099
if (!section || !input_routes) {
10951100
return -1;
@@ -1130,13 +1135,29 @@ static int parse_input_section(struct flb_cf_section *section,
11301135
cfl_list_init(&input->_head);
11311136
cfl_list_init(&input->processors);
11321137
cfl_list_init(&input->routes);
1138+
input->has_alias = FLB_FALSE;
1139+
input->instance = NULL;
11331140

1134-
input->input_name = copy_from_cfl_sds(name_var->data.as_string);
1135-
if (!input->input_name) {
1141+
input->plugin_name = copy_from_cfl_sds(name_var->data.as_string);
1142+
if (!input->plugin_name) {
11361143
flb_free(input);
11371144
return -1;
11381145
}
11391146

1147+
alias_var = cfl_kvlist_fetch(kvlist, "alias");
1148+
if (alias_var && alias_var->type == CFL_VARIANT_STRING &&
1149+
cfl_sds_len(alias_var->data.as_string) > 0) {
1150+
input->input_name = copy_from_cfl_sds(alias_var->data.as_string);
1151+
input->has_alias = FLB_TRUE;
1152+
}
1153+
else {
1154+
input->input_name = copy_from_cfl_sds(name_var->data.as_string);
1155+
}
1156+
if (!input->input_name) {
1157+
input_routes_destroy(input);
1158+
return -1;
1159+
}
1160+
11401161
processors_var = cfl_kvlist_fetch(kvlist, "processors");
11411162
if (processors_var) {
11421163
if (parse_processors(processors_var, &input->processors, config) != 0) {
@@ -1223,33 +1244,110 @@ int flb_router_config_parse(struct flb_cf *cf,
12231244
}
12241245

12251246
/* Apply parsed router configuration to actual input/output instances */
1247+
static int input_instance_already_selected(struct flb_config *config,
1248+
struct flb_input_routes *current,
1249+
struct flb_input_instance *candidate)
1250+
{
1251+
struct cfl_list *head;
1252+
struct flb_input_routes *routes;
1253+
1254+
if (!config || !candidate) {
1255+
return FLB_FALSE;
1256+
}
1257+
1258+
cfl_list_foreach(head, &config->input_routes) {
1259+
routes = cfl_list_entry(head, struct flb_input_routes, _head);
1260+
1261+
if (routes == current) {
1262+
continue;
1263+
}
1264+
1265+
if (routes->instance == candidate) {
1266+
return FLB_TRUE;
1267+
}
1268+
}
1269+
1270+
return FLB_FALSE;
1271+
}
1272+
12261273
static struct flb_input_instance *find_input_instance(struct flb_config *config,
1227-
flb_sds_t name)
1274+
struct flb_input_routes *routes)
12281275
{
12291276
struct mk_list *head;
12301277
struct flb_input_instance *ins;
1278+
size_t key_len;
12311279

1232-
if (!config || !name) {
1280+
if (!config || !routes) {
12331281
return NULL;
12341282
}
12351283

1236-
mk_list_foreach(head, &config->inputs) {
1237-
ins = mk_list_entry(head, struct flb_input_instance, _head);
1284+
if (routes->instance) {
1285+
return routes->instance;
1286+
}
12381287

1239-
if (!ins->p) {
1240-
continue;
1288+
if (routes->has_alias && routes->input_name) {
1289+
mk_list_foreach(head, &config->inputs) {
1290+
ins = mk_list_entry(head, struct flb_input_instance, _head);
1291+
1292+
if (!ins->p || !ins->alias) {
1293+
continue;
1294+
}
1295+
1296+
if (strcmp(ins->alias, routes->input_name) == 0 &&
1297+
input_instance_already_selected(config, routes, ins) == FLB_FALSE) {
1298+
routes->instance = ins;
1299+
return ins;
1300+
}
12411301
}
1302+
}
12421303

1243-
if (ins->alias && strcmp(ins->alias, name) == 0) {
1244-
return ins;
1304+
if (routes->input_name) {
1305+
mk_list_foreach(head, &config->inputs) {
1306+
ins = mk_list_entry(head, struct flb_input_instance, _head);
1307+
1308+
if (!ins->p) {
1309+
continue;
1310+
}
1311+
1312+
if (strcmp(ins->name, routes->input_name) == 0 &&
1313+
input_instance_already_selected(config, routes, ins) == FLB_FALSE) {
1314+
routes->instance = ins;
1315+
return ins;
1316+
}
12451317
}
1318+
}
12461319

1247-
if (strcmp(ins->name, name) == 0) {
1248-
return ins;
1320+
if (routes->plugin_name) {
1321+
mk_list_foreach(head, &config->inputs) {
1322+
ins = mk_list_entry(head, struct flb_input_instance, _head);
1323+
1324+
if (!ins->p || !ins->p->name) {
1325+
continue;
1326+
}
1327+
1328+
if (strcmp(ins->p->name, routes->plugin_name) == 0 &&
1329+
input_instance_already_selected(config, routes, ins) == FLB_FALSE) {
1330+
routes->instance = ins;
1331+
return ins;
1332+
}
12491333
}
1334+
}
12501335

1251-
if (ins->p->name && strcmp(ins->p->name, name) == 0) {
1252-
return ins;
1336+
if (routes->input_name) {
1337+
key_len = flb_sds_len(routes->input_name);
1338+
1339+
mk_list_foreach(head, &config->inputs) {
1340+
ins = mk_list_entry(head, struct flb_input_instance, _head);
1341+
1342+
if (!ins->p || key_len == 0) {
1343+
continue;
1344+
}
1345+
1346+
if (strncmp(ins->name, routes->input_name, key_len) == 0 &&
1347+
input_instance_already_selected(config, routes, ins) == FLB_FALSE) {
1348+
routes->instance = ins;
1349+
return ins;
1350+
}
12531351
}
12541352
}
12551353

@@ -1355,7 +1453,7 @@ int flb_router_apply_config(struct flb_config *config)
13551453
cfl_list_foreach(input_head, &config->input_routes) {
13561454
input_routes = cfl_list_entry(input_head, struct flb_input_routes, _head);
13571455

1358-
input_ins = find_input_instance(config, input_routes->input_name);
1456+
input_ins = find_input_instance(config, input_routes);
13591457
if (!input_ins) {
13601458
flb_warn("[router] could not find input instance '%s' for routes",
13611459
input_routes->input_name ? input_routes->input_name : "(null)");

0 commit comments

Comments
 (0)