Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions otelcollector/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ func main() {
osType := os.Getenv("OS_TYPE")
customEnvironment := shared.GetEnv("customEnvironment", "")

// Setup JSON logging with pod and containerID for CCP metrics mode
// This must be done early so all subsequent log output includes these fields
if strings.EqualFold(ccpMetricsEnabled, "true") {
shared.SetupCCPLogging()
}

if osType == "windows" {
env := strings.ToLower(customEnvironment)
switch env {
Expand Down
67 changes: 67 additions & 0 deletions otelcollector/shared/ccp_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package shared

import (
"encoding/json"
"io"
"log"
"os"
"strings"
"time"
)

// CCPLogWriter wraps an io.Writer to add pod and containerID fields to each log line
type CCPLogWriter struct {
dest io.Writer
pod string
containerID string
}

// NewCCPLogWriter creates a new CCPLogWriter that adds pod and containerID to each log line
func NewCCPLogWriter(dest io.Writer) *CCPLogWriter {
pod := os.Getenv("POD_NAME")
if pod == "" {
pod = os.Getenv("HOSTNAME")
}
containerID := os.Getenv("CONTAINER_ID")

return &CCPLogWriter{
dest: dest,
pod: pod,
containerID: containerID,
}
}

// Write implements io.Writer and wraps each log line with JSON containing pod and containerID
func (w *CCPLogWriter) Write(p []byte) (n int, err error) {
msg := strings.TrimSuffix(string(p), "\n")

logEntry := map[string]interface{}{
"time": time.Now().UTC().Format(time.RFC3339Nano),
"pod": w.pod,
"containerID": w.containerID,
"message": msg,
}

jsonBytes, err := json.Marshal(logEntry)
if err != nil {
// Fall back to writing the original message if JSON marshaling fails
return w.dest.Write(p)
}

jsonBytes = append(jsonBytes, '\n')
_, err = w.dest.Write(jsonBytes)
if err != nil {
return 0, err
}

// Return original length to satisfy the interface contract
return len(p), nil
}

// SetupCCPLogging configures the global logger to output JSON with pod and containerID fields
// This should be called early in main() when CCP_METRICS_ENABLED is true
func SetupCCPLogging() {
ccpWriter := NewCCPLogWriter(os.Stdout)
log.SetOutput(ccpWriter)
log.SetFlags(0) // Disable default timestamp since we include it in JSON
}
131 changes: 131 additions & 0 deletions otelcollector/shared/ccp_logger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package shared

import (
"bytes"
"encoding/json"
"os"
"strings"
"testing"
)

func TestCCPLogWriter(t *testing.T) {
tests := []struct {
name string
pod string
containerID string
input string
wantPod string
wantCID string
wantMsg string
}{
{
name: "basic log message",
pod: "test-pod",
containerID: "test-container-id",
input: "test message\n",
wantPod: "test-pod",
wantCID: "test-container-id",
wantMsg: "test message",
},
{
name: "log message without trailing newline",
pod: "pod-123",
containerID: "cid-456",
input: "no newline message",
wantPod: "pod-123",
wantCID: "cid-456",
wantMsg: "no newline message",
},
{
name: "empty pod and containerID",
pod: "",
containerID: "",
input: "message with empty fields\n",
wantPod: "",
wantCID: "",
wantMsg: "message with empty fields",
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
var buf bytes.Buffer

// Create writer with specific pod/containerID
writer := &CCPLogWriter{
dest: &buf,
pod: tc.pod,
containerID: tc.containerID,
}

n, err := writer.Write([]byte(tc.input))
if err != nil {
t.Fatalf("Write() error = %v", err)
}
if n != len(tc.input) {
t.Errorf("Write() returned %d, want %d", n, len(tc.input))
}

// Parse the output JSON
output := strings.TrimSuffix(buf.String(), "\n")
var result map[string]interface{}
if err := json.Unmarshal([]byte(output), &result); err != nil {
t.Fatalf("Failed to parse JSON output: %v, output was: %s", err, output)
}

if result["pod"] != tc.wantPod {
t.Errorf("pod = %v, want %v", result["pod"], tc.wantPod)
}
if result["containerID"] != tc.wantCID {
t.Errorf("containerID = %v, want %v", result["containerID"], tc.wantCID)
}
if result["message"] != tc.wantMsg {
t.Errorf("message = %v, want %v", result["message"], tc.wantMsg)
}
if _, ok := result["time"]; !ok {
t.Error("time field is missing from output")
}
})
}
}

func TestNewCCPLogWriter(t *testing.T) {
// Save and restore env vars
origPodName := os.Getenv("POD_NAME")
origHostname := os.Getenv("HOSTNAME")
origContainerID := os.Getenv("CONTAINER_ID")
defer func() {
os.Setenv("POD_NAME", origPodName)
os.Setenv("HOSTNAME", origHostname)
os.Setenv("CONTAINER_ID", origContainerID)
}()

t.Run("uses POD_NAME when set", func(t *testing.T) {
os.Setenv("POD_NAME", "my-pod")
os.Setenv("HOSTNAME", "my-hostname")
os.Setenv("CONTAINER_ID", "my-container")

var buf bytes.Buffer
writer := NewCCPLogWriter(&buf)

if writer.pod != "my-pod" {
t.Errorf("pod = %v, want my-pod", writer.pod)
}
if writer.containerID != "my-container" {
t.Errorf("containerID = %v, want my-container", writer.containerID)
}
})

t.Run("falls back to HOSTNAME when POD_NAME is empty", func(t *testing.T) {
os.Setenv("POD_NAME", "")
os.Setenv("HOSTNAME", "fallback-hostname")
os.Setenv("CONTAINER_ID", "cid")

var buf bytes.Buffer
writer := NewCCPLogWriter(&buf)

if writer.pod != "fallback-hostname" {
t.Errorf("pod = %v, want fallback-hostname", writer.pod)
}
})
}