diff --git a/otelcollector/main/main.go b/otelcollector/main/main.go index bb78dfb0c..c32e76bec 100644 --- a/otelcollector/main/main.go +++ b/otelcollector/main/main.go @@ -31,6 +31,12 @@ func main() { osType := os.Getenv("OS_TYPE") customEnvironment := shared.GetEnv("customEnvironment", "") + // Setup JSON logging with pod and containerID for CCP metrics mode + // This must be done early so all subsequent log output includes these fields + if strings.EqualFold(ccpMetricsEnabled, "true") { + shared.SetupCCPLogging() + } + if osType == "windows" { env := strings.ToLower(customEnvironment) switch env { diff --git a/otelcollector/shared/ccp_logger.go b/otelcollector/shared/ccp_logger.go new file mode 100644 index 000000000..051d6a3d2 --- /dev/null +++ b/otelcollector/shared/ccp_logger.go @@ -0,0 +1,67 @@ +package shared + +import ( + "encoding/json" + "io" + "log" + "os" + "strings" + "time" +) + +// CCPLogWriter wraps an io.Writer to add pod and containerID fields to each log line +type CCPLogWriter struct { + dest io.Writer + pod string + containerID string +} + +// NewCCPLogWriter creates a new CCPLogWriter that adds pod and containerID to each log line +func NewCCPLogWriter(dest io.Writer) *CCPLogWriter { + pod := os.Getenv("POD_NAME") + if pod == "" { + pod = os.Getenv("HOSTNAME") + } + containerID := os.Getenv("CONTAINER_ID") + + return &CCPLogWriter{ + dest: dest, + pod: pod, + containerID: containerID, + } +} + +// Write implements io.Writer and wraps each log line with JSON containing pod and containerID +func (w *CCPLogWriter) Write(p []byte) (n int, err error) { + msg := strings.TrimSuffix(string(p), "\n") + + logEntry := map[string]interface{}{ + "time": time.Now().UTC().Format(time.RFC3339Nano), + "pod": w.pod, + "containerID": w.containerID, + "message": msg, + } + + jsonBytes, err := json.Marshal(logEntry) + if err != nil { + // Fall back to writing the original message if JSON marshaling fails + return w.dest.Write(p) + } + + jsonBytes = append(jsonBytes, '\n') + _, err = w.dest.Write(jsonBytes) + if err != nil { + return 0, err + } + + // Return original length to satisfy the interface contract + return len(p), nil +} + +// SetupCCPLogging configures the global logger to output JSON with pod and containerID fields +// This should be called early in main() when CCP_METRICS_ENABLED is true +func SetupCCPLogging() { + ccpWriter := NewCCPLogWriter(os.Stdout) + log.SetOutput(ccpWriter) + log.SetFlags(0) // Disable default timestamp since we include it in JSON +} diff --git a/otelcollector/shared/ccp_logger_test.go b/otelcollector/shared/ccp_logger_test.go new file mode 100644 index 000000000..57f8aea59 --- /dev/null +++ b/otelcollector/shared/ccp_logger_test.go @@ -0,0 +1,131 @@ +package shared + +import ( + "bytes" + "encoding/json" + "os" + "strings" + "testing" +) + +func TestCCPLogWriter(t *testing.T) { + tests := []struct { + name string + pod string + containerID string + input string + wantPod string + wantCID string + wantMsg string + }{ + { + name: "basic log message", + pod: "test-pod", + containerID: "test-container-id", + input: "test message\n", + wantPod: "test-pod", + wantCID: "test-container-id", + wantMsg: "test message", + }, + { + name: "log message without trailing newline", + pod: "pod-123", + containerID: "cid-456", + input: "no newline message", + wantPod: "pod-123", + wantCID: "cid-456", + wantMsg: "no newline message", + }, + { + name: "empty pod and containerID", + pod: "", + containerID: "", + input: "message with empty fields\n", + wantPod: "", + wantCID: "", + wantMsg: "message with empty fields", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + var buf bytes.Buffer + + // Create writer with specific pod/containerID + writer := &CCPLogWriter{ + dest: &buf, + pod: tc.pod, + containerID: tc.containerID, + } + + n, err := writer.Write([]byte(tc.input)) + if err != nil { + t.Fatalf("Write() error = %v", err) + } + if n != len(tc.input) { + t.Errorf("Write() returned %d, want %d", n, len(tc.input)) + } + + // Parse the output JSON + output := strings.TrimSuffix(buf.String(), "\n") + var result map[string]interface{} + if err := json.Unmarshal([]byte(output), &result); err != nil { + t.Fatalf("Failed to parse JSON output: %v, output was: %s", err, output) + } + + if result["pod"] != tc.wantPod { + t.Errorf("pod = %v, want %v", result["pod"], tc.wantPod) + } + if result["containerID"] != tc.wantCID { + t.Errorf("containerID = %v, want %v", result["containerID"], tc.wantCID) + } + if result["message"] != tc.wantMsg { + t.Errorf("message = %v, want %v", result["message"], tc.wantMsg) + } + if _, ok := result["time"]; !ok { + t.Error("time field is missing from output") + } + }) + } +} + +func TestNewCCPLogWriter(t *testing.T) { + // Save and restore env vars + origPodName := os.Getenv("POD_NAME") + origHostname := os.Getenv("HOSTNAME") + origContainerID := os.Getenv("CONTAINER_ID") + defer func() { + os.Setenv("POD_NAME", origPodName) + os.Setenv("HOSTNAME", origHostname) + os.Setenv("CONTAINER_ID", origContainerID) + }() + + t.Run("uses POD_NAME when set", func(t *testing.T) { + os.Setenv("POD_NAME", "my-pod") + os.Setenv("HOSTNAME", "my-hostname") + os.Setenv("CONTAINER_ID", "my-container") + + var buf bytes.Buffer + writer := NewCCPLogWriter(&buf) + + if writer.pod != "my-pod" { + t.Errorf("pod = %v, want my-pod", writer.pod) + } + if writer.containerID != "my-container" { + t.Errorf("containerID = %v, want my-container", writer.containerID) + } + }) + + t.Run("falls back to HOSTNAME when POD_NAME is empty", func(t *testing.T) { + os.Setenv("POD_NAME", "") + os.Setenv("HOSTNAME", "fallback-hostname") + os.Setenv("CONTAINER_ID", "cid") + + var buf bytes.Buffer + writer := NewCCPLogWriter(&buf) + + if writer.pod != "fallback-hostname" { + t.Errorf("pod = %v, want fallback-hostname", writer.pod) + } + }) +}