Skip to content

Commit aea80e6

Browse files
committed
input_log: account routing metrics for unmatched/dropped logs
Signed-off-by: Eduardo Silva <[email protected]>
1 parent 34b1f76 commit aea80e6

File tree

1 file changed

+164
-6
lines changed

1 file changed

+164
-6
lines changed

src/flb_input_log.c

Lines changed: 164 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include <fluent-bit/flb_event.h>
3838
#include <fluent-bit/flb_sds.h>
3939
#include <fluent-bit/flb_mem.h>
40+
#include <fluent-bit/flb_time.h>
4041

4142
#include <chunkio/chunkio.h>
4243

@@ -443,7 +444,8 @@ static int build_payload_for_route(struct flb_input_instance *ins,
443444
struct flb_route_payload *payload,
444445
struct flb_mp_chunk_record **records,
445446
size_t record_count,
446-
uint8_t *matched_non_default)
447+
uint8_t *matched_non_default,
448+
uint8_t *matched_any_route)
447449
{
448450
int i;
449451
int j;
@@ -456,7 +458,8 @@ static int build_payload_for_route(struct flb_input_instance *ins,
456458
struct flb_mp_chunk_record *group_start_record = NULL;
457459
uint8_t *matched_by_route = NULL;
458460

459-
if (!payload || !records || record_count == 0 || !matched_non_default) {
461+
if (!payload || !records || record_count == 0 || !matched_non_default ||
462+
!matched_any_route) {
460463
return -1;
461464
}
462465

@@ -499,6 +502,7 @@ static int build_payload_for_route(struct flb_input_instance *ins,
499502

500503
matched_by_route[i] = 1;
501504
matched_non_default[i] = 1;
505+
matched_any_route[i] = 1;
502506
matched++;
503507
}
504508

@@ -603,7 +607,8 @@ static int build_payload_for_default_route(struct flb_input_instance *ins,
603607
struct flb_route_payload *payload,
604608
struct flb_mp_chunk_record **records,
605609
size_t record_count,
606-
uint8_t *matched_non_default)
610+
uint8_t *matched_non_default,
611+
uint8_t *matched_any_route)
607612
{
608613
int i;
609614
int j;
@@ -616,7 +621,7 @@ static int build_payload_for_default_route(struct flb_input_instance *ins,
616621
struct flb_mp_chunk_record *group_start_record = NULL;
617622
int *matched_by_default = NULL;
618623

619-
if (!payload || !records || !matched_non_default) {
624+
if (!payload || !records || !matched_non_default || !matched_any_route) {
620625
return -1;
621626
}
622627

@@ -673,10 +678,12 @@ static int build_payload_for_default_route(struct flb_input_instance *ins,
673678
condition_result = flb_router_condition_evaluate_record(payload->route, records[i]);
674679
if (condition_result == FLB_TRUE) {
675680
matched_by_default[i] = 1;
681+
matched_any_route[i] = 1;
676682
}
677683
}
678684
else {
679685
matched_by_default[i] = 1;
686+
matched_any_route[i] = 1;
680687
}
681688
}
682689
}
@@ -772,6 +779,108 @@ static int build_payload_for_default_route(struct flb_input_instance *ins,
772779
return 0;
773780
}
774781

782+
static int calculate_unmatched_route_metrics(struct flb_mp_chunk_record **records,
783+
size_t record_count,
784+
uint8_t *matched_any_route,
785+
size_t *dropped_records,
786+
size_t *dropped_bytes)
787+
{
788+
size_t i;
789+
size_t j;
790+
int32_t record_type;
791+
int32_t inner_type;
792+
int ret;
793+
struct flb_log_event_encoder *encoder;
794+
struct flb_mp_chunk_record *group_end = NULL;
795+
struct flb_mp_chunk_record *group_start_record = NULL;
796+
797+
if (!records || record_count == 0 || !matched_any_route ||
798+
!dropped_records || !dropped_bytes) {
799+
return -1;
800+
}
801+
802+
encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT);
803+
if (!encoder) {
804+
return -1;
805+
}
806+
807+
*dropped_records = 0;
808+
*dropped_bytes = 0;
809+
810+
for (i = 0; i < record_count; i++) {
811+
if (flb_log_event_decoder_get_record_type(&records[i]->event, &record_type) != 0) {
812+
continue;
813+
}
814+
815+
if (record_type == FLB_LOG_EVENT_GROUP_START) {
816+
group_start_record = records[i];
817+
group_end = NULL;
818+
continue;
819+
}
820+
else if (record_type == FLB_LOG_EVENT_GROUP_END) {
821+
if (group_end != NULL &&
822+
group_start_record != NULL &&
823+
records[i]->cobj_group_metadata == group_start_record->cobj_group_metadata) {
824+
ret = encode_chunk_record(encoder, group_end);
825+
if (ret != 0) {
826+
flb_log_event_encoder_destroy(encoder);
827+
return -1;
828+
}
829+
group_end = NULL;
830+
}
831+
group_start_record = NULL;
832+
continue;
833+
}
834+
else if (record_type != FLB_LOG_EVENT_NORMAL) {
835+
continue;
836+
}
837+
838+
if (!matched_any_route[i]) {
839+
if (group_start_record != NULL &&
840+
records[i]->cobj_group_metadata == group_start_record->cobj_group_metadata) {
841+
if (group_end == NULL) {
842+
ret = encode_chunk_record(encoder, group_start_record);
843+
if (ret != 0) {
844+
flb_log_event_encoder_destroy(encoder);
845+
return -1;
846+
}
847+
848+
for (j = i + 1; j < record_count; j++) {
849+
if (flb_log_event_decoder_get_record_type(&records[j]->event, &inner_type) == 0 &&
850+
inner_type == FLB_LOG_EVENT_GROUP_END &&
851+
records[j]->cobj_group_metadata == group_start_record->cobj_group_metadata) {
852+
group_end = records[j];
853+
break;
854+
}
855+
}
856+
}
857+
}
858+
859+
ret = encode_chunk_record(encoder, records[i]);
860+
if (ret != 0) {
861+
flb_log_event_encoder_destroy(encoder);
862+
return -1;
863+
}
864+
865+
(*dropped_records)++;
866+
}
867+
}
868+
869+
if (group_end != NULL) {
870+
ret = encode_chunk_record(encoder, group_end);
871+
if (ret != 0) {
872+
flb_log_event_encoder_destroy(encoder);
873+
return -1;
874+
}
875+
}
876+
877+
*dropped_bytes = encoder->buffer.size;
878+
879+
flb_log_event_encoder_destroy(encoder);
880+
881+
return 0;
882+
}
883+
775884
static void route_payload_list_destroy(struct cfl_list *payloads)
776885
{
777886
struct cfl_list *head;
@@ -879,6 +988,7 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins,
879988
int context_initialized = FLB_FALSE;
880989
size_t out_size = 0;
881990
uint8_t *matched_non_default = NULL;
991+
uint8_t *matched_any_route = NULL;
882992
struct cfl_list payloads;
883993
struct cfl_list *head;
884994
struct cfl_list *tmp;
@@ -1025,6 +1135,17 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins,
10251135
return -1;
10261136
}
10271137

1138+
matched_any_route = flb_calloc(record_count, sizeof(uint8_t));
1139+
if (!matched_any_route) {
1140+
flb_errno();
1141+
flb_free(matched_non_default);
1142+
flb_free(records_array);
1143+
flb_router_chunk_context_destroy(&context);
1144+
route_payload_list_destroy(&payloads);
1145+
flb_event_chunk_destroy(chunk);
1146+
return -1;
1147+
}
1148+
10281149
index = 0;
10291150
cfl_list_foreach(head, &context.chunk_cobj->records) {
10301151
records_array[index++] = cfl_list_entry(head, struct flb_mp_chunk_record, _head);
@@ -1040,10 +1161,12 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins,
10401161
payload,
10411162
records_array,
10421163
record_count,
1043-
matched_non_default);
1164+
matched_non_default,
1165+
matched_any_route);
10441166
if (ret != 0) {
10451167
flb_free(records_array);
10461168
flb_free(matched_non_default);
1169+
flb_free(matched_any_route);
10471170
flb_router_chunk_context_destroy(&context);
10481171
route_payload_list_destroy(&payloads);
10491172
flb_event_chunk_destroy(chunk);
@@ -1061,19 +1184,54 @@ static int split_and_append_route_payloads(struct flb_input_instance *ins,
10611184
payload,
10621185
records_array,
10631186
record_count,
1064-
matched_non_default);
1187+
matched_non_default,
1188+
matched_any_route);
10651189
if (ret != 0) {
10661190
flb_free(records_array);
10671191
flb_free(matched_non_default);
1192+
flb_free(matched_any_route);
10681193
flb_router_chunk_context_destroy(&context);
10691194
route_payload_list_destroy(&payloads);
10701195
flb_event_chunk_destroy(chunk);
10711196
return -1;
10721197
}
10731198
}
10741199

1200+
{
1201+
size_t dropped_records = 0;
1202+
size_t dropped_bytes = 0;
1203+
struct flb_router *router = NULL;
1204+
char *input_label = NULL;
1205+
char *output_label = "unmatched";
1206+
uint64_t now;
1207+
1208+
ret = calculate_unmatched_route_metrics(records_array,
1209+
record_count,
1210+
matched_any_route,
1211+
&dropped_records,
1212+
&dropped_bytes);
1213+
if (ret == 0 && dropped_records > 0 && ins->config && ins->config->router) {
1214+
router = ins->config->router;
1215+
input_label = (char *) flb_input_name(ins);
1216+
now = cfl_time_now();
1217+
1218+
cmt_counter_add(router->logs_drop_records_total,
1219+
now,
1220+
(double) dropped_records,
1221+
2,
1222+
(char *[]){input_label, output_label});
1223+
1224+
cmt_counter_add(router->logs_drop_bytes_total,
1225+
now,
1226+
(double) dropped_bytes,
1227+
2,
1228+
(char *[]){input_label, output_label});
1229+
}
1230+
}
1231+
10751232
flb_free(records_array);
10761233
flb_free(matched_non_default);
1234+
flb_free(matched_any_route);
10771235

10781236
cfl_list_foreach_safe(head, tmp, &payloads) {
10791237
payload = cfl_list_entry(head, struct flb_route_payload, _head);

0 commit comments

Comments
 (0)