diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index 2b3241d4d3c..454f818fd4c 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -1188,8 +1188,10 @@ struct flb_output_flush *flb_output_flush_create(struct flb_task *task, static inline void flb_output_return(int ret, struct flb_coro *co) { int n; int pipe_fd; + int effective_records; uint32_t set; uint64_t val; + size_t effective_bytes; struct flb_task *task; struct flb_output_flush *out_flush; struct flb_output_instance *o_ins; @@ -1199,8 +1201,21 @@ static inline void flb_output_return(int ret, struct flb_coro *co) { o_ins = out_flush->o_ins; task = out_flush->task; + effective_records = 0; + effective_bytes = 0; + if (task->event_chunk != NULL) { + effective_records = task->event_chunk->total_events; + effective_bytes = task->event_chunk->size; + } + + if (out_flush->processed_event_chunk != NULL) { + effective_records = out_flush->processed_event_chunk->total_events; + effective_bytes = out_flush->processed_event_chunk->size; + } + flb_task_acquire_lock(task); + flb_task_set_route_metrics(task, o_ins, effective_records, effective_bytes); flb_task_deactivate_route(task, o_ins); flb_task_release_lock(task); diff --git a/include/fluent-bit/flb_task.h b/include/fluent-bit/flb_task.h index 6d93ba1535e..683f9f9207d 100644 --- a/include/fluent-bit/flb_task.h +++ b/include/fluent-bit/flb_task.h @@ -57,6 +57,8 @@ struct flb_task_route { int status; + int records; + size_t bytes; struct flb_output_instance *out; struct mk_list _head; }; @@ -257,6 +259,54 @@ static FLB_INLINE void flb_task_set_route_status( } } +static FLB_INLINE void flb_task_set_route_metrics( + struct flb_task *task, + struct flb_output_instance *o_ins, + int records, + size_t bytes) +{ + struct mk_list *iterator; + struct flb_task_route *route; + + mk_list_foreach(iterator, &task->routes) { + route = mk_list_entry(iterator, struct flb_task_route, _head); + + if (route->out == o_ins) { + route->records = records; + route->bytes = bytes; + break; + } + } +} + +static FLB_INLINE int flb_task_get_route_metrics( + struct flb_task *task, + struct flb_output_instance *o_ins, + int *records, + size_t *bytes) +{ + struct mk_list *iterator; + struct flb_task_route *route; + + mk_list_foreach(iterator, &task->routes) { + route = mk_list_entry(iterator, struct flb_task_route, _head); + + if (route->out == o_ins) { + if (records != NULL) { + *records = route->records; + } + + if (bytes != NULL) { + *bytes = route->bytes; + } + + return 0; + } + } + + return -1; +} + static FLB_INLINE void flb_task_activate_route( struct flb_task *task, diff --git a/src/flb_engine.c b/src/flb_engine.c index 9d998b1b702..ed1f86d0140 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -290,7 +290,9 @@ static inline int handle_output_event(uint64_t ts, int retry_seconds; uint32_t type; uint32_t key; + int effective_records; double latency_seconds; + size_t effective_bytes; char *in_name; char *out_name; struct flb_task *task; @@ -340,6 +342,9 @@ static inline int handle_output_event(uint64_t ts, } in_name = (char *) flb_input_name(task->i_ins); out_name = (char *) flb_output_name(ins); + effective_records = task->event_chunk->total_events; + effective_bytes = task->event_chunk->size; + flb_task_get_route_metrics(task, ins, &effective_records, &effective_bytes); /* If we are in synchronous mode, flush the next waiting task */ if (ins->flags & FLB_OUTPUT_SYNCHRONOUS) { @@ -351,19 +356,19 @@ static inline int handle_output_event(uint64_t ts, /* A task has finished, delete it */ if (ret == FLB_OK) { /* cmetrics */ - cmt_counter_add(ins->cmt_proc_records, ts, task->event_chunk->total_events, + cmt_counter_add(ins->cmt_proc_records, ts, effective_records, 1, (char *[]) {out_name}); - cmt_counter_add(ins->cmt_proc_bytes, ts, task->event_chunk->size, + cmt_counter_add(ins->cmt_proc_bytes, ts, effective_bytes, 1, (char *[]) {out_name}); if (config->router && task->event_chunk->type == FLB_EVENT_TYPE_LOGS) { cmt_counter_add(config->router->logs_records_total, ts, - task->event_chunk->total_events, + effective_records, 2, (char *[]) {in_name, out_name}); cmt_counter_add(config->router->logs_bytes_total, ts, - task->event_chunk->size, + effective_bytes, 2, (char *[]) {in_name, out_name}); } @@ -378,9 +383,9 @@ static inline int handle_output_event(uint64_t ts, #ifdef FLB_HAVE_METRICS if (ins->metrics) { flb_metrics_sum(FLB_METRIC_OUT_OK_RECORDS, - task->event_chunk->total_events, ins->metrics); + effective_records, ins->metrics); flb_metrics_sum(FLB_METRIC_OUT_OK_BYTES, - task->event_chunk->size, ins->metrics); + effective_bytes, ins->metrics); } #endif /* Inform the user if a 'retry' succedeed */ @@ -416,17 +421,17 @@ static inline int handle_output_event(uint64_t ts, handle_dlq_if_available(config, task, ins, 0); /* cmetrics: output_dropped_records_total */ - cmt_counter_add(ins->cmt_dropped_records, ts, task->records, + cmt_counter_add(ins->cmt_dropped_records, ts, effective_records, 1, (char *[]) {out_name}); if (config->router && task->event_chunk && task->event_chunk->type == FLB_EVENT_TYPE_LOGS) { cmt_counter_add(config->router->logs_drop_records_total, ts, - task->records, + effective_records, 2, (char *[]) {in_name, out_name}); cmt_counter_add(config->router->logs_drop_bytes_total, ts, - task->event_chunk->size, + effective_bytes, 2, (char *[]) {in_name, out_name}); } @@ -436,7 +441,7 @@ static inline int handle_output_event(uint64_t ts, /* OLD metrics API */ #ifdef FLB_HAVE_METRICS - flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS, task->records, ins->metrics); + flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS, effective_records, ins->metrics); #endif flb_info("[engine] chunk '%s' is not retried (no retry config): " "task_id=%i, input=%s > output=%s (out_id=%i)", @@ -465,17 +470,17 @@ static inline int handle_output_event(uint64_t ts, /* cmetrics */ cmt_counter_inc(ins->cmt_retries_failed, ts, 1, (char *[]) {out_name}); - cmt_counter_add(ins->cmt_dropped_records, ts, task->records, + cmt_counter_add(ins->cmt_dropped_records, ts, effective_records, 1, (char *[]) {out_name}); if (config->router && task->event_chunk && task->event_chunk->type == FLB_EVENT_TYPE_LOGS) { cmt_counter_add(config->router->logs_drop_records_total, ts, - task->records, + effective_records, 2, (char *[]) {in_name, out_name}); cmt_counter_add(config->router->logs_drop_bytes_total, ts, - task->event_chunk->size, + effective_bytes, 2, (char *[]) {in_name, out_name}); } @@ -486,7 +491,7 @@ static inline int handle_output_event(uint64_t ts, /* OLD metrics API */ #ifdef FLB_HAVE_METRICS flb_metrics_sum(FLB_METRIC_OUT_RETRY_FAILED, 1, ins->metrics); - flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS, task->records, ins->metrics); + flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS, effective_records, ins->metrics); #endif /* Notify about this failed retry */ flb_error("[engine] chunk '%s' cannot be retried: " @@ -538,7 +543,7 @@ static inline int handle_output_event(uint64_t ts, /* cmetrics */ cmt_counter_inc(ins->cmt_retries, ts, 1, (char *[]) {out_name}); - cmt_counter_add(ins->cmt_retried_records, ts, task->records, + cmt_counter_add(ins->cmt_retried_records, ts, effective_records, 1, (char *[]) {out_name}); cmt_gauge_set(ins->cmt_chunk_available_capacity_percent, ts, @@ -548,7 +553,7 @@ static inline int handle_output_event(uint64_t ts, /* OLD metrics API: update the metrics since a new retry is coming */ #ifdef FLB_HAVE_METRICS flb_metrics_sum(FLB_METRIC_OUT_RETRY, 1, ins->metrics); - flb_metrics_sum(FLB_METRIC_OUT_RETRIED_RECORDS, task->records, ins->metrics); + flb_metrics_sum(FLB_METRIC_OUT_RETRIED_RECORDS, effective_records, ins->metrics); #endif } } @@ -556,17 +561,17 @@ static inline int handle_output_event(uint64_t ts, handle_dlq_if_available(config, task, ins, 0); /* cmetrics */ cmt_counter_inc(ins->cmt_errors, ts, 1, (char *[]) {out_name}); - cmt_counter_add(ins->cmt_dropped_records, ts, task->records, + cmt_counter_add(ins->cmt_dropped_records, ts, effective_records, 1, (char *[]) {out_name}); if (config->router && task->event_chunk && task->event_chunk->type == FLB_EVENT_TYPE_LOGS) { cmt_counter_add(config->router->logs_drop_records_total, ts, - task->records, + effective_records, 2, (char *[]) {in_name, out_name}); cmt_counter_add(config->router->logs_drop_bytes_total, ts, - task->event_chunk->size, + effective_bytes, 2, (char *[]) {in_name, out_name}); } @@ -577,7 +582,7 @@ static inline int handle_output_event(uint64_t ts, /* OLD API */ #ifdef FLB_HAVE_METRICS flb_metrics_sum(FLB_METRIC_OUT_ERROR, 1, ins->metrics); - flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS, task->records, ins->metrics); + flb_metrics_sum(FLB_METRIC_OUT_DROPPED_RECORDS, effective_records, ins->metrics); #endif flb_task_retry_clean(task, ins); diff --git a/src/flb_task.c b/src/flb_task.c index 5e671f5ee40..b7d4f24166a 100644 --- a/src/flb_task.c +++ b/src/flb_task.c @@ -708,6 +708,8 @@ struct flb_task *flb_task_create(uint64_t ref_id, } route->status = FLB_TASK_ROUTE_INACTIVE; + route->records = task->event_chunk->total_events; + route->bytes = task->event_chunk->size; route->out = stored_matches[stored_match_index]; mk_list_add(&route->_head, &task->routes); direct_count++; @@ -810,6 +812,8 @@ struct flb_task *flb_task_create(uint64_t ref_id, } route->status = FLB_TASK_ROUTE_INACTIVE; + route->records = task->event_chunk->total_events; + route->bytes = task->event_chunk->size; route->out = o_ins; mk_list_add(&route->_head, &task->routes); direct_count++; @@ -856,6 +860,8 @@ struct flb_task *flb_task_create(uint64_t ref_id, } route->status = FLB_TASK_ROUTE_INACTIVE; + route->records = task->event_chunk->total_events; + route->bytes = task->event_chunk->size; route->out = o_ins; mk_list_add(&route->_head, &task->routes); count++; diff --git a/tests/runtime/CMakeLists.txt b/tests/runtime/CMakeLists.txt index dd76c16faee..1c57f6ea3a7 100644 --- a/tests/runtime/CMakeLists.txt +++ b/tests/runtime/CMakeLists.txt @@ -254,6 +254,11 @@ if (FLB_PROCESSOR_CONTENT_MODIFIER) FLB_RT_TEST(FLB_PROCESSOR_CONTENT_MODIFIER "processor_content_modifier.c") endif() +if (FLB_IN_DUMMY AND FLB_IN_FLUENTBIT_METRICS AND FLB_OUT_LIB AND + FLB_OUT_STDOUT AND FLB_FILTER_LUA) + FLB_RT_CORE_TEST(ON "processor_output_counters.c") +endif() + # HTTP Client Debug (requires -DFLB_HTTP_CLIENT_DEBUG=On) if(FLB_HTTP_CLIENT_DEBUG) FLB_RT_TEST(FLB_OUT_TD "http_callbacks.c") diff --git a/tests/runtime/processor_output_counters.c b/tests/runtime/processor_output_counters.c new file mode 100644 index 00000000000..0f4d590aac7 --- /dev/null +++ b/tests/runtime/processor_output_counters.c @@ -0,0 +1,213 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +#include +#include +#include +#include + +#include +#include + +#include "flb_tests_runtime.h" + +static pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER; +static int metrics_condition_met = FLB_FALSE; + +static void set_metrics_condition_met(int value) +{ + pthread_mutex_lock(&result_mutex); + metrics_condition_met = value; + pthread_mutex_unlock(&result_mutex); +} + +static int get_metrics_condition_met(void) +{ + int result; + + pthread_mutex_lock(&result_mutex); + result = metrics_condition_met; + pthread_mutex_unlock(&result_mutex); + + return result; +} + +static double find_metric_value(const char *text, + const char *metric_name, + const char *output_name) +{ + char *line; + char *next; + size_t prefix_size; + double value; + char prefix[256]; + + snprintf(prefix, sizeof(prefix), "%s{name=\"%s\"} ", + metric_name, output_name); + prefix_size = strlen(prefix); + + line = (char *) text; + while (line != NULL && *line != '\0') { + next = strchr(line, '\n'); + + if (strncmp(line, prefix, prefix_size) == 0) { + value = strtod(line + prefix_size, NULL); + return value; + } + + if (next == NULL) { + break; + } + + line = next + 1; + } + + return -1.0; +} + +static int cb_check_output_processor_counters(void *record, size_t size, void *data) +{ + int ret; + size_t off = 0; + struct cmt *cmt; + cfl_sds_t text; + double proc_records; + double dropped_records; + + (void) data; + + cmt = NULL; + text = NULL; + + ret = cmt_decode_msgpack_create(&cmt, (char *) record, size, &off); + if (ret != 0) { + if (record != NULL) { + flb_free(record); + } + + return -1; + } + + text = cmt_encode_text_create(cmt); + if (text != NULL) { + proc_records = find_metric_value(text, + "fluentbit_output_proc_records_total", + "stdout.0"); + dropped_records = find_metric_value(text, + "fluentbit_output_dropped_records_total", + "stdout.0"); + + if (proc_records == 0.0 && dropped_records > 0.0) { + set_metrics_condition_met(FLB_TRUE); + } + + cmt_encode_text_destroy(text); + } + + cmt_destroy(cmt); + + if (record != NULL) { + flb_free(record); + } + + return 0; +} + +void flb_test_output_processor_drop_counters(void) +{ + int ret; + int in_ffd; + int metrics_in_ffd; + int out_ffd; + int metrics_out_ffd; + int wait_cycles; + flb_ctx_t *ctx; + struct flb_processor *proc; + struct flb_processor_unit *pu; + struct flb_lib_out_cb cb_data; + struct cfl_variant call_property = { + .type = CFL_VARIANT_STRING, + .data.as_string = "cb_drop", + }; + struct cfl_variant code_property = { + .type = CFL_VARIANT_STRING, + .data.as_string = + "function cb_drop(tag, timestamp, record)\n" + " return -1, timestamp, record\n" + "end", + }; + + cb_data.cb = cb_check_output_processor_counters; + cb_data.data = NULL; + + set_metrics_condition_met(FLB_FALSE); + + ctx = flb_create(); + TEST_CHECK(ctx != NULL); + flb_service_set(ctx, + "Flush", "0.200000000", + "Grace", "2", + "Log_Level", "error", + NULL); + + in_ffd = flb_input(ctx, (char *) "dummy", NULL); + TEST_CHECK(in_ffd >= 0); + ret = flb_input_set(ctx, in_ffd, + "tag", "dummy.data", + "rate", "10", + NULL); + TEST_CHECK(ret == 0); + + out_ffd = flb_output(ctx, (char *) "stdout", NULL); + TEST_CHECK(out_ffd >= 0); + ret = flb_output_set(ctx, out_ffd, "match", "*", NULL); + TEST_CHECK(ret == 0); + + proc = flb_processor_create(ctx->config, "unit_test", NULL, 0); + TEST_CHECK(proc != NULL); + + pu = flb_processor_unit_create(proc, FLB_PROCESSOR_LOGS, "lua"); + TEST_CHECK(pu != NULL); + + ret = flb_processor_unit_set_property(pu, "call", &call_property); + TEST_CHECK(ret == 0); + ret = flb_processor_unit_set_property(pu, "code", &code_property); + TEST_CHECK(ret == 0); + + ret = flb_output_set_processor(ctx, out_ffd, proc); + TEST_CHECK(ret == 0); + + metrics_in_ffd = flb_input(ctx, (char *) "fluentbit_metrics", NULL); + TEST_CHECK(metrics_in_ffd >= 0); + ret = flb_input_set(ctx, metrics_in_ffd, + "tag", "fb.metrics", + "scrape_on_start", "true", + "scrape_interval", "1", + NULL); + TEST_CHECK(ret == 0); + + metrics_out_ffd = flb_output(ctx, (char *) "lib", &cb_data); + TEST_CHECK(metrics_out_ffd >= 0); + ret = flb_output_set(ctx, metrics_out_ffd, "match", "fb.metrics", NULL); + TEST_CHECK(ret == 0); + + ret = flb_start(ctx); + TEST_CHECK(ret == 0); + + for (wait_cycles = 0; + wait_cycles < 30 && get_metrics_condition_met() == FLB_FALSE; + wait_cycles++) { + flb_time_msleep(200); + } + + TEST_CHECK(get_metrics_condition_met() == FLB_TRUE); + + flb_stop(ctx); + flb_destroy(ctx); +} + +TEST_LIST = { +#if defined(FLB_HAVE_METRICS) && defined(FLB_FILTER_LUA) + {"output_processor_drop_counters", flb_test_output_processor_drop_counters}, +#endif + {NULL, NULL} +};