diff --git a/examples/ints/internal/ints/recordwriter.go b/examples/ints/internal/ints/recordwriter.go index 61922008..cc6188be 100644 --- a/examples/ints/internal/ints/recordwriter.go +++ b/examples/ints/internal/ints/recordwriter.go @@ -21,6 +21,7 @@ type RecordWriter struct { writeBufs pkg.WriteBufs frameRecordCount uint64 recordCount uint64 + uvarintBuf [binary.MaxVarintLen64]byte } func NewRecordWriter(dst pkg.ChunkWriter, opts pkg.WriterOptions) (*RecordWriter, error) { @@ -66,10 +67,12 @@ func NewRecordWriter(dst pkg.ChunkWriter, opts pkg.WriterOptions) (*RecordWriter } if err := writer.writeFixedHeader(); err != nil { + writer.frameEncoder.Close() return nil, err } if err := writer.writeVarHeader(); err != nil { + writer.frameEncoder.Close() return nil, err } @@ -175,8 +178,9 @@ func (w *RecordWriter) restartFrame(nextFrameFlags pkg.FrameFlags) error { w.encoder.Reset() } - // Write record count. - if _, err := w.frameEncoder.Write(binary.AppendUvarint(nil, w.frameRecordCount)); err != nil { + // Write record count using pre-allocated buffer to avoid allocation. + n := binary.PutUvarint(w.uvarintBuf[:], w.frameRecordCount) + if _, err := w.frameEncoder.Write(w.uvarintBuf[:n]); err != nil { return err } w.frameRecordCount = 0 @@ -208,3 +212,11 @@ func (w *RecordWriter) Flush() error { } return w.restartFrame(w.opts.FrameRestartFlags) } + +// Close releases resources held by the writer. For zstd compression, +// the encoder is returned to the pool for reuse. Close must be called +// when the writer is no longer needed. It is the caller's responsibility +// to call Flush before Close if there is unflushed data. +func (w *RecordWriter) Close() { + w.frameEncoder.Close() +} diff --git a/examples/ints/internal/ints/recordwriter_test.go b/examples/ints/internal/ints/recordwriter_test.go index b7d54e99..7511d329 100644 --- a/examples/ints/internal/ints/recordwriter_test.go +++ b/examples/ints/internal/ints/recordwriter_test.go @@ -77,6 +77,7 @@ func testRecordWriteReadSeed(t *testing.T, seed uint64) (retVal bool) { buf := &pkg.MemChunkWriter{} writer, err := NewRecordWriter(buf, optCpy) require.NoError(t, err, "seed %v", seed) + defer writer.Close() // Generate records pseudo-randomly records := genRecordRecords(random, schem) @@ -157,6 +158,7 @@ func TestRecordWriteReadLong(t *testing.T) { writer, err := NewRecordWriter(mem, pkg.WriterOptions{Compression: pkg.CompressionZstd}) require.NoError(t, err, "seed %v", seed) + defer writer.Close() reader, err := NewRecordReader(mem) require.NoError(t, err, "seed %v", seed) @@ -215,6 +217,7 @@ func FuzzRecordReader(f *testing.F) { buf := &pkg.MemChunkWriter{} writer, err := NewRecordWriter(buf, opt) require.NoError(f, err) + defer writer.Close() recCount := (1 << (2 * i)) - 1 var record Record diff --git a/examples/jsonl/internal/jsonstef/recordwriter.go b/examples/jsonl/internal/jsonstef/recordwriter.go index 74240e9a..2294252b 100644 --- a/examples/jsonl/internal/jsonstef/recordwriter.go +++ b/examples/jsonl/internal/jsonstef/recordwriter.go @@ -21,6 +21,7 @@ type RecordWriter struct { writeBufs pkg.WriteBufs frameRecordCount uint64 recordCount uint64 + uvarintBuf [binary.MaxVarintLen64]byte } func NewRecordWriter(dst pkg.ChunkWriter, opts pkg.WriterOptions) (*RecordWriter, error) { @@ -66,10 +67,12 @@ func NewRecordWriter(dst pkg.ChunkWriter, opts pkg.WriterOptions) (*RecordWriter } if err := writer.writeFixedHeader(); err != nil { + writer.frameEncoder.Close() return nil, err } if err := writer.writeVarHeader(); err != nil { + writer.frameEncoder.Close() return nil, err } @@ -175,8 +178,9 @@ func (w *RecordWriter) restartFrame(nextFrameFlags pkg.FrameFlags) error { w.encoder.Reset() } - // Write record count. - if _, err := w.frameEncoder.Write(binary.AppendUvarint(nil, w.frameRecordCount)); err != nil { + // Write record count using pre-allocated buffer to avoid allocation. + n := binary.PutUvarint(w.uvarintBuf[:], w.frameRecordCount) + if _, err := w.frameEncoder.Write(w.uvarintBuf[:n]); err != nil { return err } w.frameRecordCount = 0 @@ -208,3 +212,11 @@ func (w *RecordWriter) Flush() error { } return w.restartFrame(w.opts.FrameRestartFlags) } + +// Close releases resources held by the writer. For zstd compression, +// the encoder is returned to the pool for reuse. Close must be called +// when the writer is no longer needed. It is the caller's responsibility +// to call Flush before Close if there is unflushed data. +func (w *RecordWriter) Close() { + w.frameEncoder.Close() +} diff --git a/examples/jsonl/internal/jsonstef/recordwriter_test.go b/examples/jsonl/internal/jsonstef/recordwriter_test.go index def6eff6..eeeb3167 100644 --- a/examples/jsonl/internal/jsonstef/recordwriter_test.go +++ b/examples/jsonl/internal/jsonstef/recordwriter_test.go @@ -77,6 +77,7 @@ func testRecordWriteReadSeed(t *testing.T, seed uint64) (retVal bool) { buf := &pkg.MemChunkWriter{} writer, err := NewRecordWriter(buf, optCpy) require.NoError(t, err, "seed %v", seed) + defer writer.Close() // Generate records pseudo-randomly records := genRecordRecords(random, schem) @@ -157,6 +158,7 @@ func TestRecordWriteReadLong(t *testing.T) { writer, err := NewRecordWriter(mem, pkg.WriterOptions{Compression: pkg.CompressionZstd}) require.NoError(t, err, "seed %v", seed) + defer writer.Close() reader, err := NewRecordReader(mem) require.NoError(t, err, "seed %v", seed) @@ -215,6 +217,7 @@ func FuzzRecordReader(f *testing.F) { buf := &pkg.MemChunkWriter{} writer, err := NewRecordWriter(buf, opt) require.NoError(f, err) + defer writer.Close() recCount := (1 << (2 * i)) - 1 var record Record diff --git a/examples/profile/internal/profile/samplewriter.go b/examples/profile/internal/profile/samplewriter.go index 6502b1fc..d8792c74 100644 --- a/examples/profile/internal/profile/samplewriter.go +++ b/examples/profile/internal/profile/samplewriter.go @@ -21,6 +21,7 @@ type SampleWriter struct { writeBufs pkg.WriteBufs frameRecordCount uint64 recordCount uint64 + uvarintBuf [binary.MaxVarintLen64]byte } func NewSampleWriter(dst pkg.ChunkWriter, opts pkg.WriterOptions) (*SampleWriter, error) { @@ -66,10 +67,12 @@ func NewSampleWriter(dst pkg.ChunkWriter, opts pkg.WriterOptions) (*SampleWriter } if err := writer.writeFixedHeader(); err != nil { + writer.frameEncoder.Close() return nil, err } if err := writer.writeVarHeader(); err != nil { + writer.frameEncoder.Close() return nil, err } @@ -175,8 +178,9 @@ func (w *SampleWriter) restartFrame(nextFrameFlags pkg.FrameFlags) error { w.encoder.Reset() } - // Write record count. - if _, err := w.frameEncoder.Write(binary.AppendUvarint(nil, w.frameRecordCount)); err != nil { + // Write record count using pre-allocated buffer to avoid allocation. + n := binary.PutUvarint(w.uvarintBuf[:], w.frameRecordCount) + if _, err := w.frameEncoder.Write(w.uvarintBuf[:n]); err != nil { return err } w.frameRecordCount = 0 @@ -208,3 +212,11 @@ func (w *SampleWriter) Flush() error { } return w.restartFrame(w.opts.FrameRestartFlags) } + +// Close releases resources held by the writer. For zstd compression, +// the encoder is returned to the pool for reuse. Close must be called +// when the writer is no longer needed. It is the caller's responsibility +// to call Flush before Close if there is unflushed data. +func (w *SampleWriter) Close() { + w.frameEncoder.Close() +} diff --git a/examples/profile/internal/profile/samplewriter_test.go b/examples/profile/internal/profile/samplewriter_test.go index d6f6dfe9..5a3e133c 100644 --- a/examples/profile/internal/profile/samplewriter_test.go +++ b/examples/profile/internal/profile/samplewriter_test.go @@ -77,6 +77,7 @@ func testSampleWriteReadSeed(t *testing.T, seed uint64) (retVal bool) { buf := &pkg.MemChunkWriter{} writer, err := NewSampleWriter(buf, optCpy) require.NoError(t, err, "seed %v", seed) + defer writer.Close() // Generate records pseudo-randomly records := genSampleRecords(random, schem) @@ -157,6 +158,7 @@ func TestSampleWriteReadLong(t *testing.T) { writer, err := NewSampleWriter(mem, pkg.WriterOptions{Compression: pkg.CompressionZstd}) require.NoError(t, err, "seed %v", seed) + defer writer.Close() reader, err := NewSampleReader(mem) require.NoError(t, err, "seed %v", seed) @@ -215,6 +217,7 @@ func FuzzSampleReader(f *testing.F) { buf := &pkg.MemChunkWriter{} writer, err := NewSampleWriter(buf, opt) require.NoError(f, err) + defer writer.Close() recCount := (1 << (2 * i)) - 1 var record Sample diff --git a/go/otel/otelstef/metricswriter.go b/go/otel/otelstef/metricswriter.go index 1f839167..9019dfd7 100644 --- a/go/otel/otelstef/metricswriter.go +++ b/go/otel/otelstef/metricswriter.go @@ -21,6 +21,7 @@ type MetricsWriter struct { writeBufs pkg.WriteBufs frameRecordCount uint64 recordCount uint64 + uvarintBuf [binary.MaxVarintLen64]byte } func NewMetricsWriter(dst pkg.ChunkWriter, opts pkg.WriterOptions) (*MetricsWriter, error) { @@ -66,10 +67,12 @@ func NewMetricsWriter(dst pkg.ChunkWriter, opts pkg.WriterOptions) (*MetricsWrit } if err := writer.writeFixedHeader(); err != nil { + writer.frameEncoder.Close() return nil, err } if err := writer.writeVarHeader(); err != nil { + writer.frameEncoder.Close() return nil, err } @@ -175,8 +178,9 @@ func (w *MetricsWriter) restartFrame(nextFrameFlags pkg.FrameFlags) error { w.encoder.Reset() } - // Write record count. - if _, err := w.frameEncoder.Write(binary.AppendUvarint(nil, w.frameRecordCount)); err != nil { + // Write record count using pre-allocated buffer to avoid allocation. + n := binary.PutUvarint(w.uvarintBuf[:], w.frameRecordCount) + if _, err := w.frameEncoder.Write(w.uvarintBuf[:n]); err != nil { return err } w.frameRecordCount = 0 @@ -208,3 +212,11 @@ func (w *MetricsWriter) Flush() error { } return w.restartFrame(w.opts.FrameRestartFlags) } + +// Close releases resources held by the writer. For zstd compression, +// the encoder is returned to the pool for reuse. Close must be called +// when the writer is no longer needed. It is the caller's responsibility +// to call Flush before Close if there is unflushed data. +func (w *MetricsWriter) Close() { + w.frameEncoder.Close() +} diff --git a/go/otel/otelstef/metricswriter_test.go b/go/otel/otelstef/metricswriter_test.go index 8a54a6af..22329031 100644 --- a/go/otel/otelstef/metricswriter_test.go +++ b/go/otel/otelstef/metricswriter_test.go @@ -77,6 +77,7 @@ func testMetricsWriteReadSeed(t *testing.T, seed uint64) (retVal bool) { buf := &pkg.MemChunkWriter{} writer, err := NewMetricsWriter(buf, optCpy) require.NoError(t, err, "seed %v", seed) + defer writer.Close() // Generate records pseudo-randomly records := genMetricsRecords(random, schem) @@ -157,6 +158,7 @@ func TestMetricsWriteReadLong(t *testing.T) { writer, err := NewMetricsWriter(mem, pkg.WriterOptions{Compression: pkg.CompressionZstd}) require.NoError(t, err, "seed %v", seed) + defer writer.Close() reader, err := NewMetricsReader(mem) require.NoError(t, err, "seed %v", seed) @@ -215,6 +217,7 @@ func FuzzMetricsReader(f *testing.F) { buf := &pkg.MemChunkWriter{} writer, err := NewMetricsWriter(buf, opt) require.NoError(f, err) + defer writer.Close() recCount := (1 << (2 * i)) - 1 var record Metrics diff --git a/go/otel/otelstef/spanswriter.go b/go/otel/otelstef/spanswriter.go index a8f3b405..2d3367e5 100644 --- a/go/otel/otelstef/spanswriter.go +++ b/go/otel/otelstef/spanswriter.go @@ -21,6 +21,7 @@ type SpansWriter struct { writeBufs pkg.WriteBufs frameRecordCount uint64 recordCount uint64 + uvarintBuf [binary.MaxVarintLen64]byte } func NewSpansWriter(dst pkg.ChunkWriter, opts pkg.WriterOptions) (*SpansWriter, error) { @@ -66,10 +67,12 @@ func NewSpansWriter(dst pkg.ChunkWriter, opts pkg.WriterOptions) (*SpansWriter, } if err := writer.writeFixedHeader(); err != nil { + writer.frameEncoder.Close() return nil, err } if err := writer.writeVarHeader(); err != nil { + writer.frameEncoder.Close() return nil, err } @@ -175,8 +178,9 @@ func (w *SpansWriter) restartFrame(nextFrameFlags pkg.FrameFlags) error { w.encoder.Reset() } - // Write record count. - if _, err := w.frameEncoder.Write(binary.AppendUvarint(nil, w.frameRecordCount)); err != nil { + // Write record count using pre-allocated buffer to avoid allocation. + n := binary.PutUvarint(w.uvarintBuf[:], w.frameRecordCount) + if _, err := w.frameEncoder.Write(w.uvarintBuf[:n]); err != nil { return err } w.frameRecordCount = 0 @@ -208,3 +212,11 @@ func (w *SpansWriter) Flush() error { } return w.restartFrame(w.opts.FrameRestartFlags) } + +// Close releases resources held by the writer. For zstd compression, +// the encoder is returned to the pool for reuse. Close must be called +// when the writer is no longer needed. It is the caller's responsibility +// to call Flush before Close if there is unflushed data. +func (w *SpansWriter) Close() { + w.frameEncoder.Close() +} diff --git a/go/otel/otelstef/spanswriter_test.go b/go/otel/otelstef/spanswriter_test.go index 4a433400..f9610283 100644 --- a/go/otel/otelstef/spanswriter_test.go +++ b/go/otel/otelstef/spanswriter_test.go @@ -77,6 +77,7 @@ func testSpansWriteReadSeed(t *testing.T, seed uint64) (retVal bool) { buf := &pkg.MemChunkWriter{} writer, err := NewSpansWriter(buf, optCpy) require.NoError(t, err, "seed %v", seed) + defer writer.Close() // Generate records pseudo-randomly records := genSpansRecords(random, schem) @@ -157,6 +158,7 @@ func TestSpansWriteReadLong(t *testing.T) { writer, err := NewSpansWriter(mem, pkg.WriterOptions{Compression: pkg.CompressionZstd}) require.NoError(t, err, "seed %v", seed) + defer writer.Close() reader, err := NewSpansReader(mem) require.NoError(t, err, "seed %v", seed) @@ -215,6 +217,7 @@ func FuzzSpansReader(f *testing.F) { buf := &pkg.MemChunkWriter{} writer, err := NewSpansWriter(buf, opt) require.NoError(f, err) + defer writer.Close() recCount := (1 << (2 * i)) - 1 var record Spans diff --git a/go/pkg/frame.go b/go/pkg/frame.go index 2dc7885c..e7d4a00d 100644 --- a/go/pkg/frame.go +++ b/go/pkg/frame.go @@ -7,10 +7,28 @@ import ( "errors" "fmt" "io" + "sync" "github.com/klauspost/compress/zstd" ) +// zstdEncoderPool pools zstd encoders to amortize the cost of encoder creation +// across multiple writers. Encoders are created with WithEncoderConcurrency(1) +// to avoid goroutine overhead per encoder. +var zstdEncoderPool = sync.Pool{ + New: func() any { + enc, err := zstd.NewWriter( + nil, + zstd.WithEncoderConcurrency(1), + ) + if err != nil { + // This should never happen with valid options. + panic("failed to create zstd encoder: " + err.Error()) + } + return enc + }, +} + type FrameEncoder struct { dest ChunkWriter frameContent io.Writer @@ -29,11 +47,8 @@ func (e *FrameEncoder) Init(dest ChunkWriter, compr Compression) error { e.frameContent = &e.compressedBuf case CompressionZstd: - var err error - e.compressor, err = zstd.NewWriter(&e.compressedBuf) - if err != nil { - return err - } + e.compressor = zstdEncoderPool.Get().(*zstd.Encoder) + e.compressor.Reset(&e.compressedBuf) e.frameContent = e.compressor default: @@ -42,6 +57,17 @@ func (e *FrameEncoder) Init(dest ChunkWriter, compr Compression) error { return nil } +// Close releases resources held by the FrameEncoder. For zstd compression, +// the encoder is returned to the pool for reuse. Close must be called when +// the FrameEncoder is no longer needed. +func (e *FrameEncoder) Close() { + if e.compression == CompressionZstd && e.compressor != nil { + e.compressor.Reset(nil) + zstdEncoderPool.Put(e.compressor) + e.compressor = nil + } +} + func (e *FrameEncoder) OpenFrame(resetFlags FrameFlags) { e.hdrByte = resetFlags if resetFlags&RestartCompression != 0 { diff --git a/go/pkg/frame_test.go b/go/pkg/frame_test.go index 980fb475..493565d9 100644 --- a/go/pkg/frame_test.go +++ b/go/pkg/frame_test.go @@ -22,6 +22,7 @@ func testLastFrameAndContinue(t *testing.T, compression Compression) { buf := &MemReaderWriter{} err := encoder.Init(buf, compression) require.NoError(t, err) + defer encoder.Close() writeStr := []byte(strings.Repeat("hello", 10)) _, err = encoder.Write(writeStr) require.NoError(t, err) @@ -104,6 +105,31 @@ func TestLastFrameAndContinue(t *testing.T) { } } +func BenchmarkFrameEncoderZstd(b *testing.B) { + data := []byte(strings.Repeat("hello world this is benchmark data ", 100)) + + b.Run("pooled", func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + buf := &MemReaderWriter{} + encoder := FrameEncoder{} + err := encoder.Init(buf, CompressionZstd) + if err != nil { + b.Fatal(err) + } + _, err = encoder.Write(data) + if err != nil { + b.Fatal(err) + } + err = encoder.CloseFrame() + if err != nil { + b.Fatal(err) + } + encoder.Close() + } + }) +} + func TestLimitedReader(t *testing.T) { data := []byte("abcdef") mem := &MemReaderWriter{buf: *bytes.NewBuffer(data)} diff --git a/otelcol/internal/stefexporter/exporter.go b/otelcol/internal/stefexporter/exporter.go index 90c3b7fc..cd7eab87 100644 --- a/otelcol/internal/stefexporter/exporter.go +++ b/otelcol/internal/stefexporter/exporter.go @@ -82,6 +82,13 @@ func (s *stefExporter) Start(ctx context.Context, host component.Host) error { func (s *stefExporter) Shutdown(ctx context.Context) error { close(s.stopped) + s.writeMutex.Lock() + if s.remoteWriter != nil { + _ = s.remoteWriter.Flush() + s.remoteWriter.Close() + s.remoteWriter = nil + } + s.writeMutex.Unlock() if s.grpcConn != nil { return s.grpcConn.Close() } @@ -131,7 +138,10 @@ func (s *stefExporter) flusher() { select { case <-timer.C: s.writeMutex.Lock() - err := s.remoteWriter.Flush() + var err error + if s.remoteWriter != nil { + err = s.remoteWriter.Flush() + } s.writeMutex.Unlock() if err != nil { log.Printf("Cannot send STEF data: %v\n", err) diff --git a/stefc/templates/go/writer.go.tmpl b/stefc/templates/go/writer.go.tmpl index 48c541e6..3c6a1905 100644 --- a/stefc/templates/go/writer.go.tmpl +++ b/stefc/templates/go/writer.go.tmpl @@ -12,14 +12,15 @@ import ( type {{.StructName}}Writer struct { Record {{.StructName}} - opts pkg.WriterOptions - dst pkg.ChunkWriter - frameEncoder pkg.FrameEncoder - encoder {{.StructName}}Encoder - state WriterState - writeBufs pkg.WriteBufs + opts pkg.WriterOptions + dst pkg.ChunkWriter + frameEncoder pkg.FrameEncoder + encoder {{.StructName}}Encoder + state WriterState + writeBufs pkg.WriteBufs frameRecordCount uint64 - recordCount uint64 + recordCount uint64 + uvarintBuf [binary.MaxVarintLen64]byte } func New{{.StructName}}Writer(dst pkg.ChunkWriter, opts pkg.WriterOptions) (*{{.StructName}}Writer, error) { @@ -65,10 +66,12 @@ func New{{.StructName}}Writer(dst pkg.ChunkWriter, opts pkg.WriterOptions) (*{{. } if err := writer.writeFixedHeader(); err != nil { + writer.frameEncoder.Close() return nil, err } if err := writer.writeVarHeader(); err != nil { + writer.frameEncoder.Close() return nil, err } @@ -174,8 +177,9 @@ func (w *{{.StructName}}Writer) restartFrame(nextFrameFlags pkg.FrameFlags) erro w.encoder.Reset() } - // Write record count. - if _, err := w.frameEncoder.Write(binary.AppendUvarint(nil, w.frameRecordCount)); err != nil { + // Write record count using pre-allocated buffer to avoid allocation. + n := binary.PutUvarint(w.uvarintBuf[:], w.frameRecordCount) + if _, err := w.frameEncoder.Write(w.uvarintBuf[:n]); err != nil { return err } w.frameRecordCount = 0 @@ -207,3 +211,11 @@ func (w *{{.StructName}}Writer) Flush() error { } return w.restartFrame(w.opts.FrameRestartFlags) } + +// Close releases resources held by the writer. For zstd compression, +// the encoder is returned to the pool for reuse. Close must be called +// when the writer is no longer needed. It is the caller's responsibility +// to call Flush before Close if there is unflushed data. +func (w *{{.StructName}}Writer) Close() { + w.frameEncoder.Close() +} diff --git a/stefc/templates/go/writer_test.go.tmpl b/stefc/templates/go/writer_test.go.tmpl index eb2a4c95..d150aec9 100644 --- a/stefc/templates/go/writer_test.go.tmpl +++ b/stefc/templates/go/writer_test.go.tmpl @@ -76,6 +76,7 @@ func test{{.StructName}}WriteReadSeed(t *testing.T, seed uint64) (retVal bool) { buf := &pkg.MemChunkWriter{} writer, err := New{{.StructName}}Writer(buf, optCpy) require.NoError(t, err, "seed %v", seed) + defer writer.Close() // Generate records pseudo-randomly records := gen{{.StructName}}Records(random, schem) @@ -156,6 +157,7 @@ func Test{{.StructName}}WriteReadLong(t *testing.T) { writer, err := New{{.StructName}}Writer(mem, pkg.WriterOptions{Compression: pkg.CompressionZstd}) require.NoError(t, err, "seed %v", seed) + defer writer.Close() reader, err := New{{.StructName}}Reader(mem) require.NoError(t, err, "seed %v", seed) @@ -214,6 +216,7 @@ func Fuzz{{.StructName}}Reader(f *testing.F) { buf := &pkg.MemChunkWriter{} writer, err := New{{.StructName}}Writer(buf, opt) require.NoError(f, err) + defer writer.Close() recCount := (1 << (2*i)) - 1 var record {{.StructName}}