diff --git a/plugins/in_splunk/splunk.h b/plugins/in_splunk/splunk.h index 9140747a7fb..93db1f444bf 100644 --- a/plugins/in_splunk/splunk.h +++ b/plugins/in_splunk/splunk.h @@ -73,10 +73,6 @@ struct flb_splunk { struct flb_downstream *downstream; /* Client manager */ struct mk_list connections; /* linked list of connections */ struct mk_server *server; - - /* Remote address */ - flb_sds_t current_remote_addr; - size_t current_remote_addr_len; }; diff --git a/plugins/in_splunk/splunk_config.c b/plugins/in_splunk/splunk_config.c index 014dc8aab69..7e3d058f208 100644 --- a/plugins/in_splunk/splunk_config.c +++ b/plugins/in_splunk/splunk_config.c @@ -146,9 +146,6 @@ struct flb_splunk *splunk_config_create(struct flb_input_instance *ins) ctx->ingested_auth_header = NULL; - ctx->current_remote_addr = NULL; - ctx->current_remote_addr_len = 0; - ret = setup_hec_tokens(ctx); if (ret != 0) { splunk_config_destroy(ctx); diff --git a/plugins/in_splunk/splunk_prot.c b/plugins/in_splunk/splunk_prot.c index 8541575e832..0d81ef26304 100644 --- a/plugins/in_splunk/splunk_prot.c +++ b/plugins/in_splunk/splunk_prot.c @@ -341,7 +341,10 @@ static flb_sds_t tag_key(struct flb_splunk *ctx, msgpack_object *map) * Process a raw text payload for Splunk HEC requests, uses the delimited character to split records, * return the number of processed bytes */ -static int process_raw_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char *buf, size_t size) +static int process_raw_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, + char *buf, size_t size, + const char *remote_addr, + size_t remote_addr_len) { int ret = FLB_EVENT_ENCODER_SUCCESS; @@ -388,8 +391,8 @@ static int process_raw_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char if (ret == FLB_EVENT_ENCODER_SUCCESS) { ret = append_remote_addr(ctx, - ctx->current_remote_addr, - ctx->current_remote_addr_len); + remote_addr, + remote_addr_len); } if (ret == FLB_EVENT_ENCODER_SUCCESS) { @@ -481,9 +484,7 @@ static void process_flb_log_append(struct flb_splunk *ctx, msgpack_object *recor } if (ret == FLB_EVENT_ENCODER_SUCCESS) { - ret = append_remote_addr(ctx, - ctx->current_remote_addr, - ctx->current_remote_addr_len); + ret = append_remote_addr(ctx, remote_addr, remote_addr_len); } if (ret == FLB_EVENT_ENCODER_SUCCESS) { @@ -519,7 +520,10 @@ static void process_flb_log_append(struct flb_splunk *ctx, msgpack_object *recor } } -static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char *buf, size_t size) +static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, + char *buf, size_t size, + const char *remote_addr, + size_t remote_addr_len) { size_t off = 0; msgpack_unpacked result; @@ -540,8 +544,8 @@ static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char } process_flb_log_append(ctx, &result.data, tag, tag_from_record, tm, - ctx->current_remote_addr, - ctx->current_remote_addr_len); + remote_addr, + remote_addr_len); flb_log_event_encoder_reset(&ctx->log_encoder); } @@ -557,8 +561,8 @@ static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char } process_flb_log_append(ctx, &record, tag, tag_from_record, tm, - ctx->current_remote_addr, - ctx->current_remote_addr_len); + remote_addr, + remote_addr_len); /* TODO : Optimize this * @@ -588,7 +592,9 @@ static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char } static ssize_t parse_hec_payload_json(struct flb_splunk *ctx, flb_sds_t tag, - char *payload, size_t size) + char *payload, size_t size, + const char *remote_addr, + size_t remote_addr_len) { int ret; int out_size; @@ -617,7 +623,8 @@ static ssize_t parse_hec_payload_json(struct flb_splunk *ctx, flb_sds_t tag, } /* Process the packaged JSON and return the last byte used */ - process_json_payload_pack(ctx, tag, pack, out_size); + process_json_payload_pack(ctx, tag, pack, out_size, + remote_addr, remote_addr_len); flb_free(pack); return 0; @@ -675,22 +682,28 @@ static int validate_auth_header(struct flb_splunk *ctx, struct mk_http_request * } static int handle_hec_payload(struct flb_splunk *ctx, int content_type, - flb_sds_t tag, char *buf, size_t size) + flb_sds_t tag, char *buf, size_t size, + const char *remote_addr, + size_t remote_addr_len) { int ret = -1; if (content_type == HTTP_CONTENT_JSON) { - ret = parse_hec_payload_json(ctx, tag, buf, size); + ret = parse_hec_payload_json(ctx, tag, buf, size, + remote_addr, remote_addr_len); } else if (content_type == HTTP_CONTENT_TEXT) { - ret = process_raw_payload_pack(ctx, tag, buf, size); + ret = process_raw_payload_pack(ctx, tag, buf, size, + remote_addr, remote_addr_len); } else if (content_type == HTTP_CONTENT_UNKNOWN) { if (buf[0] == '{') { - ret = parse_hec_payload_json(ctx, tag, buf, size); + ret = parse_hec_payload_json(ctx, tag, buf, size, + remote_addr, remote_addr_len); } else { - ret = process_raw_payload_pack(ctx, tag, buf, size); + ret = process_raw_payload_pack(ctx, tag, buf, size, + remote_addr, remote_addr_len); } } @@ -700,7 +713,9 @@ static int handle_hec_payload(struct flb_splunk *ctx, int content_type, static int process_hec_payload(struct flb_splunk *ctx, struct splunk_conn *conn, flb_sds_t tag, struct mk_http_session *session, - struct mk_http_request *request) + struct mk_http_request *request, + const char *remote_addr, + size_t remote_addr_len) { int i = 0; int ret = 0; @@ -768,11 +783,13 @@ static int process_hec_payload(struct flb_splunk *ctx, struct splunk_conn *conn, return -1; } - ret = handle_hec_payload(ctx, type, tag, gz_data, gz_size); + ret = handle_hec_payload(ctx, type, tag, gz_data, gz_size, + remote_addr, remote_addr_len); flb_free(gz_data); } else { - ret = handle_hec_payload(ctx, type, tag, request->data.data, request->data.len); + ret = handle_hec_payload(ctx, type, tag, request->data.data, request->data.len, + remote_addr, remote_addr_len); } return ret; @@ -814,7 +831,8 @@ static int process_hec_raw_payload(struct flb_splunk *ctx, struct splunk_conn *c } /* Always handle as raw type of payloads here */ - ret = process_raw_payload_pack(ctx, tag, request->data.data, request->data.len); + ret = process_raw_payload_pack(ctx, tag, request->data.data, request->data.len, + remote_addr, remote_addr_len); return ret; } @@ -861,6 +879,8 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, char *hval = NULL; size_t hlen = 0; const char *peer; + const char *remote_addr = NULL; + size_t remote_addr_len = 0; if (request->uri.data[0] != '/') { send_response(conn, 400, "error: invalid request\n"); @@ -1005,22 +1025,17 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, request->data.len = out_chunked_size; } - /* Resolve per-request remote address */ - ctx->current_remote_addr = NULL; - ctx->current_remote_addr_len = 0; - if (http_header_lookup(HTTP_PROTOCOL_VERSION_11, request, SPLUNK_XFF_HEADER, &hval, &hlen) == 0) { extract_remote_address(hval, hlen, conn->connection, - &ctx->current_remote_addr, - &ctx->current_remote_addr_len); + (char **) &remote_addr, + &remote_addr_len); } - else { - /* fallback to peer addr */ + if (remote_addr == NULL || remote_addr_len == 0) { peer = flb_connection_get_remote_address(conn->connection); - if (peer) { - ctx->current_remote_addr = peer; - ctx->current_remote_addr_len = strlen(peer); + if (peer != NULL) { + remote_addr = peer; + remote_addr_len = strlen(peer); } } @@ -1031,8 +1046,7 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, if (strcasecmp(uri, "/services/collector/raw/1.0") == 0 || strcasecmp(uri, "/services/collector/raw") == 0) { ret = process_hec_raw_payload(ctx, conn, tag, session, request, - ctx->current_remote_addr, - ctx->current_remote_addr_len); + remote_addr, remote_addr_len); if (ret == -2) { /* Response already sent, skip further response */ @@ -1057,7 +1071,8 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, strcasecmp(uri, "/services/collector/event") == 0 || strcasecmp(uri, "/services/collector") == 0) { - ret = process_hec_payload(ctx, conn, tag, session, request); + ret = process_hec_payload(ctx, conn, tag, session, request, + remote_addr, remote_addr_len); if (ret == -2) { flb_sds_destroy(tag); mk_mem_free(uri); @@ -1118,10 +1133,6 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn, request->data.data = original_data; request->data.len = original_data_size; - /* Clear per-request remote address to avoid leakage across keep-alive/pipeline */ - ctx->current_remote_addr = NULL; - ctx->current_remote_addr_len = 0; - return ret; } @@ -1251,7 +1262,9 @@ static int validate_auth_header_ng(struct flb_splunk *ctx, struct flb_http_reque static int process_hec_payload_ng(struct flb_http_request *request, struct flb_http_response *response, flb_sds_t tag, - struct flb_splunk *ctx) + struct flb_splunk *ctx, + const char *remote_addr, + size_t remote_addr_len) { int type = -1; int ret = 0; @@ -1287,13 +1300,16 @@ static int process_hec_payload_ng(struct flb_http_request *request, return -2; } - return handle_hec_payload(ctx, type, tag, request->body, cfl_sds_len(request->body)); + return handle_hec_payload(ctx, type, tag, request->body, cfl_sds_len(request->body), + remote_addr, remote_addr_len); } static int process_hec_raw_payload_ng(struct flb_http_request *request, struct flb_http_response *response, flb_sds_t tag, - struct flb_splunk *ctx) + struct flb_splunk *ctx, + const char *remote_addr, + size_t remote_addr_len) { int ret = 0; size_t size = 0; @@ -1324,7 +1340,8 @@ static int process_hec_raw_payload_ng(struct flb_http_request *request, } /* Always handle as raw type of payloads here */ - return process_raw_payload_pack(ctx, tag, request->body, cfl_sds_len(request->body)); + return process_raw_payload_pack(ctx, tag, request->body, cfl_sds_len(request->body), + remote_addr, remote_addr_len); } int splunk_prot_handle_ng(struct flb_http_request *request, @@ -1337,6 +1354,8 @@ int splunk_prot_handle_ng(struct flb_http_request *request, char *hval = NULL; size_t hlen = 0; const char *peer; + const char *remote_addr = NULL; + size_t remote_addr_len = 0; context = (struct flb_splunk *) response->stream->user_data; @@ -1384,24 +1403,19 @@ int splunk_prot_handle_ng(struct flb_http_request *request, /* Handle every ingested payload cleanly */ flb_log_event_encoder_reset(&context->log_encoder); - /* Resolve per-request remote address */ - context->current_remote_addr = NULL; - context->current_remote_addr_len = 0; - parent_session = (struct flb_http_server_session *) request->stream->parent; if (parent_session != NULL) { if (http_header_lookup(HTTP_PROTOCOL_VERSION_20, request, SPLUNK_XFF_HEADER, &hval, &hlen) == 0) { extract_remote_address(hval, hlen, parent_session->connection, - &context->current_remote_addr, - &context->current_remote_addr_len); + (char **) &remote_addr, + &remote_addr_len); } - else { - /* fallback to peer addr */ + if (remote_addr == NULL || remote_addr_len == 0) { peer = flb_connection_get_remote_address(parent_session->connection); - if (peer) { - context->current_remote_addr = peer; - context->current_remote_addr_len = strlen(peer); + if (peer != NULL) { + remote_addr = peer; + remote_addr_len = strlen(peer); } } } @@ -1421,7 +1435,8 @@ int splunk_prot_handle_ng(struct flb_http_request *request, if (strcasecmp(request->path, "/services/collector/raw/1.0") == 0 || strcasecmp(request->path, "/services/collector/raw") == 0) { - ret = process_hec_raw_payload_ng(request, response, tag, context); + ret = process_hec_raw_payload_ng(request, response, tag, context, + remote_addr, remote_addr_len); if (ret == -2) { /* Response already sent, skip further response */ flb_sds_destroy(tag); @@ -1439,7 +1454,8 @@ int splunk_prot_handle_ng(struct flb_http_request *request, else if (strcasecmp(request->path, "/services/collector/event/1.0") == 0 || strcasecmp(request->path, "/services/collector/event") == 0 || strcasecmp(request->path, "/services/collector") == 0) { - ret = process_hec_payload_ng(request, response, tag, context); + ret = process_hec_payload_ng(request, response, tag, context, + remote_addr, remote_addr_len); if (ret == -2) { /* Response already sent, skip further response */ flb_sds_destroy(tag); @@ -1461,9 +1477,5 @@ int splunk_prot_handle_ng(struct flb_http_request *request, flb_sds_destroy(tag); - /* Clear per-request remote address to avoid leakage across keep-alive/pipeline */ - context->current_remote_addr = NULL; - context->current_remote_addr_len = 0; - return ret; }