Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions include/fluent-bit/flb_log_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@
* Any other negative timestamp is considered invalid and will be skipped during decoding.
* Encoders must respect this contract and only use -1/-2 for group markers.
*/
#define FLB_LOG_EVENT_NORMAL (int32_t) 0
#define FLB_LOG_EVENT_GROUP_START (int32_t) -1
#define FLB_LOG_EVENT_GROUP_END (int32_t) -2
#define FLB_LOG_EVENT_NORMAL (time_t) 0
#define FLB_LOG_EVENT_GROUP_START (time_t) -1
#define FLB_LOG_EVENT_GROUP_END (time_t) -2

struct flb_log_event {
msgpack_object *group_attributes;
Expand Down
38 changes: 25 additions & 13 deletions src/flb_log_event_decoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ void flb_log_event_decoder_destroy(struct flb_log_event_decoder *context)
int flb_log_event_decoder_decode_timestamp(msgpack_object *input,
struct flb_time *output)
{
uint32_t packed_sec;

flb_time_zero(output);

if (input->type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
Expand All @@ -194,13 +196,23 @@ int flb_log_event_decoder_decode_timestamp(msgpack_object *input,
return FLB_EVENT_DECODER_ERROR_WRONG_TIMESTAMP_TYPE;
}

output->tm.tv_sec =
(int32_t) FLB_UINT32_TO_HOST_BYTE_ORDER(
/* convert uint_32t style seconds to time_t style seconds */
packed_sec = FLB_UINT32_TO_HOST_BYTE_ORDER(
FLB_ALIGNED_DWORD_READ(
(unsigned char *) &input->via.ext.ptr[0]));

if (packed_sec == (uint32_t) FLB_LOG_EVENT_GROUP_START) {
output->tm.tv_sec = FLB_LOG_EVENT_GROUP_START;
}
else if (packed_sec == (uint32_t) FLB_LOG_EVENT_GROUP_END) {
output->tm.tv_sec = FLB_LOG_EVENT_GROUP_END;
}
else {
output->tm.tv_sec = (time_t) packed_sec;
}

output->tm.tv_nsec =
(int32_t) FLB_UINT32_TO_HOST_BYTE_ORDER(
(time_t) FLB_UINT32_TO_HOST_BYTE_ORDER(
FLB_ALIGNED_DWORD_READ(
(unsigned char *) &input->via.ext.ptr[4]));
}
Expand Down Expand Up @@ -313,7 +325,7 @@ int flb_log_event_decoder_next(struct flb_log_event_decoder *context,
int result;
int record_type;
size_t previous_offset;
int32_t invalid_timestamp;
int64_t invalid_timestamp;

if (context == NULL) {
return FLB_EVENT_DECODER_ERROR_INVALID_CONTEXT;
Expand Down Expand Up @@ -368,8 +380,8 @@ int flb_log_event_decoder_next(struct flb_log_event_decoder *context,
* to avoid losing valid group metadata if corruption occurs mid-group.
* Skip the record and continue processing.
*/
invalid_timestamp = (int32_t) event->timestamp.tm.tv_sec;
flb_debug("[decoder] Invalid group marker timestamp (%d), skipping record. "
invalid_timestamp = (int64_t) event->timestamp.tm.tv_sec;
flb_debug("[decoder] Invalid group marker timestamp (%ld), skipping record. "
"Group state preserved.", invalid_timestamp);

/* Increment recursion depth before recursive call */
Expand Down Expand Up @@ -457,22 +469,22 @@ int flb_log_event_decoder_next(struct flb_log_event_decoder *context,

int flb_log_event_decoder_get_record_type(struct flb_log_event *event, int32_t *type)
{
int32_t s;
time_t s;

s = (int32_t) event->timestamp.tm.tv_sec;
s = event->timestamp.tm.tv_sec;

if (s >= 0) {
*type = FLB_LOG_EVENT_NORMAL;
return 0;
}
else if (s == FLB_LOG_EVENT_GROUP_START) {
if (s == FLB_LOG_EVENT_GROUP_START) {
*type = FLB_LOG_EVENT_GROUP_START;
return 0;
}
else if (s == FLB_LOG_EVENT_GROUP_END) {
*type = FLB_LOG_EVENT_GROUP_END;
return 0;
}
else if (s >= 0) {
*type = FLB_LOG_EVENT_NORMAL;
return 0;
}

return -1;
}
Expand Down