Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions plugins/in_splunk/splunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@ struct flb_splunk {
struct flb_downstream *downstream; /* Client manager */
struct mk_list connections; /* linked list of connections */
struct mk_server *server;

/* Remote address */
flb_sds_t current_remote_addr;
size_t current_remote_addr_len;
};


Expand Down
3 changes: 0 additions & 3 deletions plugins/in_splunk/splunk_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,6 @@ struct flb_splunk *splunk_config_create(struct flb_input_instance *ins)

ctx->ingested_auth_header = NULL;

ctx->current_remote_addr = NULL;
ctx->current_remote_addr_len = 0;

ret = setup_hec_tokens(ctx);
if (ret != 0) {
splunk_config_destroy(ctx);
Expand Down
134 changes: 73 additions & 61 deletions plugins/in_splunk/splunk_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,10 @@ static flb_sds_t tag_key(struct flb_splunk *ctx, msgpack_object *map)
* Process a raw text payload for Splunk HEC requests, uses the delimited character to split records,
* return the number of processed bytes
*/
static int process_raw_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char *buf, size_t size)
static int process_raw_payload_pack(struct flb_splunk *ctx, flb_sds_t tag,
char *buf, size_t size,
const char *remote_addr,
size_t remote_addr_len)
{
int ret = FLB_EVENT_ENCODER_SUCCESS;

Expand Down Expand Up @@ -388,8 +391,8 @@ static int process_raw_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = append_remote_addr(ctx,
ctx->current_remote_addr,
ctx->current_remote_addr_len);
remote_addr,
remote_addr_len);
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
Expand Down Expand Up @@ -481,9 +484,7 @@ static void process_flb_log_append(struct flb_splunk *ctx, msgpack_object *recor
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = append_remote_addr(ctx,
ctx->current_remote_addr,
ctx->current_remote_addr_len);
ret = append_remote_addr(ctx, remote_addr, remote_addr_len);
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
Expand Down Expand Up @@ -519,7 +520,10 @@ static void process_flb_log_append(struct flb_splunk *ctx, msgpack_object *recor
}
}

static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char *buf, size_t size)
static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag,
char *buf, size_t size,
const char *remote_addr,
size_t remote_addr_len)
{
size_t off = 0;
msgpack_unpacked result;
Expand All @@ -540,8 +544,8 @@ static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char
}

process_flb_log_append(ctx, &result.data, tag, tag_from_record, tm,
ctx->current_remote_addr,
ctx->current_remote_addr_len);
remote_addr,
remote_addr_len);

flb_log_event_encoder_reset(&ctx->log_encoder);
}
Expand All @@ -557,8 +561,8 @@ static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char
}

process_flb_log_append(ctx, &record, tag, tag_from_record, tm,
ctx->current_remote_addr,
ctx->current_remote_addr_len);
remote_addr,
remote_addr_len);

/* TODO : Optimize this
*
Expand Down Expand Up @@ -588,7 +592,9 @@ static int process_json_payload_pack(struct flb_splunk *ctx, flb_sds_t tag, char
}

static ssize_t parse_hec_payload_json(struct flb_splunk *ctx, flb_sds_t tag,
char *payload, size_t size)
char *payload, size_t size,
const char *remote_addr,
size_t remote_addr_len)
{
int ret;
int out_size;
Expand Down Expand Up @@ -617,7 +623,8 @@ static ssize_t parse_hec_payload_json(struct flb_splunk *ctx, flb_sds_t tag,
}

/* Process the packaged JSON and return the last byte used */
process_json_payload_pack(ctx, tag, pack, out_size);
process_json_payload_pack(ctx, tag, pack, out_size,
remote_addr, remote_addr_len);
flb_free(pack);

return 0;
Expand Down Expand Up @@ -675,22 +682,28 @@ static int validate_auth_header(struct flb_splunk *ctx, struct mk_http_request *
}

static int handle_hec_payload(struct flb_splunk *ctx, int content_type,
flb_sds_t tag, char *buf, size_t size)
flb_sds_t tag, char *buf, size_t size,
const char *remote_addr,
size_t remote_addr_len)
{
int ret = -1;

if (content_type == HTTP_CONTENT_JSON) {
ret = parse_hec_payload_json(ctx, tag, buf, size);
ret = parse_hec_payload_json(ctx, tag, buf, size,
remote_addr, remote_addr_len);
}
else if (content_type == HTTP_CONTENT_TEXT) {
ret = process_raw_payload_pack(ctx, tag, buf, size);
ret = process_raw_payload_pack(ctx, tag, buf, size,
remote_addr, remote_addr_len);
}
else if (content_type == HTTP_CONTENT_UNKNOWN) {
if (buf[0] == '{') {
ret = parse_hec_payload_json(ctx, tag, buf, size);
ret = parse_hec_payload_json(ctx, tag, buf, size,
remote_addr, remote_addr_len);
}
else {
ret = process_raw_payload_pack(ctx, tag, buf, size);
ret = process_raw_payload_pack(ctx, tag, buf, size,
remote_addr, remote_addr_len);
}
}

Expand All @@ -700,7 +713,9 @@ static int handle_hec_payload(struct flb_splunk *ctx, int content_type,
static int process_hec_payload(struct flb_splunk *ctx, struct splunk_conn *conn,
flb_sds_t tag,
struct mk_http_session *session,
struct mk_http_request *request)
struct mk_http_request *request,
const char *remote_addr,
size_t remote_addr_len)
{
int i = 0;
int ret = 0;
Expand Down Expand Up @@ -768,11 +783,13 @@ static int process_hec_payload(struct flb_splunk *ctx, struct splunk_conn *conn,
return -1;
}

ret = handle_hec_payload(ctx, type, tag, gz_data, gz_size);
ret = handle_hec_payload(ctx, type, tag, gz_data, gz_size,
remote_addr, remote_addr_len);
flb_free(gz_data);
}
else {
ret = handle_hec_payload(ctx, type, tag, request->data.data, request->data.len);
ret = handle_hec_payload(ctx, type, tag, request->data.data, request->data.len,
remote_addr, remote_addr_len);
}

return ret;
Expand Down Expand Up @@ -814,7 +831,8 @@ static int process_hec_raw_payload(struct flb_splunk *ctx, struct splunk_conn *c
}

/* Always handle as raw type of payloads here */
ret = process_raw_payload_pack(ctx, tag, request->data.data, request->data.len);
ret = process_raw_payload_pack(ctx, tag, request->data.data, request->data.len,
remote_addr, remote_addr_len);

return ret;
}
Expand Down Expand Up @@ -861,6 +879,8 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
char *hval = NULL;
size_t hlen = 0;
const char *peer;
const char *remote_addr = NULL;
size_t remote_addr_len = 0;

if (request->uri.data[0] != '/') {
send_response(conn, 400, "error: invalid request\n");
Expand Down Expand Up @@ -1005,22 +1025,17 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
request->data.len = out_chunked_size;
}

/* Resolve per-request remote address */
ctx->current_remote_addr = NULL;
ctx->current_remote_addr_len = 0;

if (http_header_lookup(HTTP_PROTOCOL_VERSION_11, request,
SPLUNK_XFF_HEADER, &hval, &hlen) == 0) {
extract_remote_address(hval, hlen, conn->connection,
&ctx->current_remote_addr,
&ctx->current_remote_addr_len);
(char **) &remote_addr,
&remote_addr_len);
}
else {
/* fallback to peer addr */
if (remote_addr == NULL || remote_addr_len == 0) {
peer = flb_connection_get_remote_address(conn->connection);
if (peer) {
ctx->current_remote_addr = peer;
ctx->current_remote_addr_len = strlen(peer);
if (peer != NULL) {
remote_addr = peer;
remote_addr_len = strlen(peer);
}
}

Expand All @@ -1031,8 +1046,7 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
if (strcasecmp(uri, "/services/collector/raw/1.0") == 0 ||
strcasecmp(uri, "/services/collector/raw") == 0) {
ret = process_hec_raw_payload(ctx, conn, tag, session, request,
ctx->current_remote_addr,
ctx->current_remote_addr_len);
remote_addr, remote_addr_len);

if (ret == -2) {
/* Response already sent, skip further response */
Expand All @@ -1057,7 +1071,8 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
strcasecmp(uri, "/services/collector/event") == 0 ||
strcasecmp(uri, "/services/collector") == 0) {

ret = process_hec_payload(ctx, conn, tag, session, request);
ret = process_hec_payload(ctx, conn, tag, session, request,
remote_addr, remote_addr_len);
if (ret == -2) {
flb_sds_destroy(tag);
mk_mem_free(uri);
Expand Down Expand Up @@ -1118,10 +1133,6 @@ int splunk_prot_handle(struct flb_splunk *ctx, struct splunk_conn *conn,
request->data.data = original_data;
request->data.len = original_data_size;

/* Clear per-request remote address to avoid leakage across keep-alive/pipeline */
ctx->current_remote_addr = NULL;
ctx->current_remote_addr_len = 0;

return ret;
}

Expand Down Expand Up @@ -1251,7 +1262,9 @@ static int validate_auth_header_ng(struct flb_splunk *ctx, struct flb_http_reque
static int process_hec_payload_ng(struct flb_http_request *request,
struct flb_http_response *response,
flb_sds_t tag,
struct flb_splunk *ctx)
struct flb_splunk *ctx,
const char *remote_addr,
size_t remote_addr_len)
{
int type = -1;
int ret = 0;
Expand Down Expand Up @@ -1287,13 +1300,16 @@ static int process_hec_payload_ng(struct flb_http_request *request,
return -2;
}

return handle_hec_payload(ctx, type, tag, request->body, cfl_sds_len(request->body));
return handle_hec_payload(ctx, type, tag, request->body, cfl_sds_len(request->body),
remote_addr, remote_addr_len);
}

static int process_hec_raw_payload_ng(struct flb_http_request *request,
struct flb_http_response *response,
flb_sds_t tag,
struct flb_splunk *ctx)
struct flb_splunk *ctx,
const char *remote_addr,
size_t remote_addr_len)
{
int ret = 0;
size_t size = 0;
Expand Down Expand Up @@ -1324,7 +1340,8 @@ static int process_hec_raw_payload_ng(struct flb_http_request *request,
}

/* Always handle as raw type of payloads here */
return process_raw_payload_pack(ctx, tag, request->body, cfl_sds_len(request->body));
return process_raw_payload_pack(ctx, tag, request->body, cfl_sds_len(request->body),
remote_addr, remote_addr_len);
}

int splunk_prot_handle_ng(struct flb_http_request *request,
Expand All @@ -1337,6 +1354,8 @@ int splunk_prot_handle_ng(struct flb_http_request *request,
char *hval = NULL;
size_t hlen = 0;
const char *peer;
const char *remote_addr = NULL;
size_t remote_addr_len = 0;

context = (struct flb_splunk *) response->stream->user_data;

Expand Down Expand Up @@ -1384,24 +1403,19 @@ int splunk_prot_handle_ng(struct flb_http_request *request,
/* Handle every ingested payload cleanly */
flb_log_event_encoder_reset(&context->log_encoder);

/* Resolve per-request remote address */
context->current_remote_addr = NULL;
context->current_remote_addr_len = 0;

parent_session = (struct flb_http_server_session *) request->stream->parent;
if (parent_session != NULL) {
if (http_header_lookup(HTTP_PROTOCOL_VERSION_20, request,
SPLUNK_XFF_HEADER, &hval, &hlen) == 0) {
extract_remote_address(hval, hlen, parent_session->connection,
&context->current_remote_addr,
&context->current_remote_addr_len);
(char **) &remote_addr,
&remote_addr_len);
}
else {
/* fallback to peer addr */
if (remote_addr == NULL || remote_addr_len == 0) {
peer = flb_connection_get_remote_address(parent_session->connection);
if (peer) {
context->current_remote_addr = peer;
context->current_remote_addr_len = strlen(peer);
if (peer != NULL) {
remote_addr = peer;
remote_addr_len = strlen(peer);
}
}
}
Expand All @@ -1421,7 +1435,8 @@ int splunk_prot_handle_ng(struct flb_http_request *request,

if (strcasecmp(request->path, "/services/collector/raw/1.0") == 0 ||
strcasecmp(request->path, "/services/collector/raw") == 0) {
ret = process_hec_raw_payload_ng(request, response, tag, context);
ret = process_hec_raw_payload_ng(request, response, tag, context,
remote_addr, remote_addr_len);
if (ret == -2) {
/* Response already sent, skip further response */
flb_sds_destroy(tag);
Expand All @@ -1439,7 +1454,8 @@ int splunk_prot_handle_ng(struct flb_http_request *request,
else if (strcasecmp(request->path, "/services/collector/event/1.0") == 0 ||
strcasecmp(request->path, "/services/collector/event") == 0 ||
strcasecmp(request->path, "/services/collector") == 0) {
ret = process_hec_payload_ng(request, response, tag, context);
ret = process_hec_payload_ng(request, response, tag, context,
remote_addr, remote_addr_len);
if (ret == -2) {
/* Response already sent, skip further response */
flb_sds_destroy(tag);
Expand All @@ -1461,9 +1477,5 @@ int splunk_prot_handle_ng(struct flb_http_request *request,

flb_sds_destroy(tag);

/* Clear per-request remote address to avoid leakage across keep-alive/pipeline */
context->current_remote_addr = NULL;
context->current_remote_addr_len = 0;

return ret;
}
Loading