Skip to content

Commit c45171b

Browse files
[8.19](backport #45328) feat(otel): add skeleton of Beat processor (#46760)
* feat(otel): add skeleton of Beat processor (#45328) * feat(otel): add skeleton of Beat processor * refactor: separate the OTel Beat processor from Beat processors in code * docs: add note on compatibility with receivers * docs: make the processor description clearer * docs: add README in `x-pack/otel` directory (cherry picked from commit 65c0dc2) # Conflicts: # NOTICE.txt # go.mod # go.sum * fix conflicts --------- Co-authored-by: Andrzej Stencel <[email protected]>
1 parent 0f58fd0 commit c45171b

File tree

9 files changed

+851
-330
lines changed

9 files changed

+851
-330
lines changed

NOTICE.txt

Lines changed: 680 additions & 294 deletions
Large diffs are not rendered by default.

filebeat/input/journald/environment_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ func (e *inputTestingEnvironment) waitUntilEventCount(count int) {
136136
fmt.Fprintf(&msg, "too few events; expected: %d, actual: %d", count, sum)
137137

138138
return false
139-
}, 5*time.Second, 10*time.Millisecond, &msg)
139+
}, 10*time.Second, 10*time.Millisecond, &msg)
140140
}
141141

142142
func (e *inputTestingEnvironment) RequireStatuses(expected []statusUpdate) {

go.mod

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ require (
116116
github.com/shopspring/decimal v1.3.1 // indirect
117117
github.com/spf13/cobra v1.9.1
118118
github.com/spf13/pflag v1.0.6
119-
github.com/stretchr/testify v1.10.0
119+
github.com/stretchr/testify v1.11.1
120120
github.com/ugorji/go/codec v1.1.8
121121
github.com/vmware/govmomi v0.51.0
122122
go.elastic.co/ecszap v1.0.2
@@ -135,8 +135,8 @@ require (
135135
golang.org/x/tools v0.35.0
136136
google.golang.org/api v0.238.0
137137
google.golang.org/genproto v0.0.0-20250505200425-f936aa4a68b2 // indirect
138-
google.golang.org/grpc v1.75.0
139-
google.golang.org/protobuf v1.36.7
138+
google.golang.org/grpc v1.75.1
139+
google.golang.org/protobuf v1.36.9
140140
gopkg.in/inf.v0 v0.9.1
141141
gopkg.in/yaml.v2 v2.4.0
142142
gotest.tools/gotestsum v1.7.0
@@ -241,6 +241,8 @@ require (
241241
go.opentelemetry.io/collector/client v1.38.0
242242
go.opentelemetry.io/collector/component/componenttest v0.132.0
243243
go.opentelemetry.io/collector/consumer/consumertest v0.132.0
244+
go.opentelemetry.io/collector/processor v1.38.0
245+
go.opentelemetry.io/collector/processor/processorhelper v0.132.0
244246
go.uber.org/goleak v1.3.0
245247
)
246248

@@ -451,7 +453,6 @@ require (
451453
go.opentelemetry.io/collector/pdata/xpdata v0.132.0 // indirect
452454
go.opentelemetry.io/collector/pipeline v1.38.0 // indirect
453455
go.opentelemetry.io/collector/pipeline/xpipeline v0.132.0 // indirect
454-
go.opentelemetry.io/collector/processor v1.38.0 // indirect
455456
go.opentelemetry.io/collector/processor/processortest v0.132.0 // indirect
456457
go.opentelemetry.io/collector/processor/xprocessor v0.132.0 // indirect
457458
go.opentelemetry.io/collector/receiver/receivertest v0.132.0 // indirect
@@ -465,7 +466,7 @@ require (
465466
go.opentelemetry.io/contrib/otelconf v0.16.0 // indirect
466467
go.opentelemetry.io/contrib/propagators/b3 v1.36.0 // indirect
467468
go.opentelemetry.io/ebpf-profiler v0.0.0-20250212075250-7bf12d3f962f // indirect
468-
go.opentelemetry.io/otel v1.37.0 // indirect
469+
go.opentelemetry.io/otel v1.38.0 // indirect
469470
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.13.0 // indirect
470471
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.13.0 // indirect
471472
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.37.0 // indirect
@@ -477,12 +478,12 @@ require (
477478
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.13.0 // indirect
478479
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.37.0 // indirect
479480
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.37.0 // indirect
480-
go.opentelemetry.io/otel/log v0.13.0 // indirect
481-
go.opentelemetry.io/otel/metric v1.37.0 // indirect
482-
go.opentelemetry.io/otel/sdk v1.37.0 // indirect
481+
go.opentelemetry.io/otel/log v0.14.0 // indirect
482+
go.opentelemetry.io/otel/metric v1.38.0 // indirect
483+
go.opentelemetry.io/otel/sdk v1.38.0 // indirect
483484
go.opentelemetry.io/otel/sdk/log v0.13.0 // indirect
484-
go.opentelemetry.io/otel/sdk/metric v1.37.0 // indirect
485-
go.opentelemetry.io/otel/trace v1.37.0 // indirect
485+
go.opentelemetry.io/otel/sdk/metric v1.38.0 // indirect
486+
go.opentelemetry.io/otel/trace v1.38.0 // indirect
486487
go.opentelemetry.io/proto/otlp v1.7.0 // indirect
487488
go.uber.org/ratelimit v0.3.1 // indirect
488489
go.yaml.in/yaml/v2 v2.4.2 // indirect

go.sum

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -956,8 +956,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
956956
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
957957
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
958958
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
959-
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
960-
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
959+
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
960+
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
961961
github.com/teambition/rrule-go v1.8.2 h1:lIjpjvWTj9fFUZCmuoVDrKVOtdiyzbzc93qTmRVe/J8=
962962
github.com/teambition/rrule-go v1.8.2/go.mod h1:Ieq5AbrKGciP1V//Wq8ktsTXwSwJHDD5mD/wLBGl3p4=
963963
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
@@ -1137,6 +1137,8 @@ go.opentelemetry.io/collector/pipeline/xpipeline v0.132.0 h1:ISE9c9TvywcnIGIPfLO
11371137
go.opentelemetry.io/collector/pipeline/xpipeline v0.132.0/go.mod h1:aneg0Kepxwa2RoTSGJx1bg6JKl6dlKTijmqloR0hbC8=
11381138
go.opentelemetry.io/collector/processor v1.38.0 h1:OGZ+2ku4cyzlSehCJb4QdSrBOYeWgM0zPHHlq7qBZqM=
11391139
go.opentelemetry.io/collector/processor v1.38.0/go.mod h1:wFky0NRSLlwvuHQOzP/DUIKUL1A/YKj5rezF9lzTAGM=
1140+
go.opentelemetry.io/collector/processor/processorhelper v0.132.0 h1:PsKrdBj6E0qxEDMUvaWlHEeIhsL+f7IhWuYtGe8eQuQ=
1141+
go.opentelemetry.io/collector/processor/processorhelper v0.132.0/go.mod h1:InJZfNrIuu5d/rEvvDJTcrcFejGiQ+PCubDgar+RjhI=
11401142
go.opentelemetry.io/collector/processor/processortest v0.132.0 h1:p8vk2ICOB2LlpVd7Y8JF0uvtNxJA57XOG4/EDi3zlgA=
11411143
go.opentelemetry.io/collector/processor/processortest v0.132.0/go.mod h1:hYYON5yz+EDdvM0RRCXKCAaoJn149hrUHZCd/zMngMo=
11421144
go.opentelemetry.io/collector/processor/xprocessor v0.132.0 h1:cuEJqX5hZf/N27nPgnl0tm0ECOMHQqhmsoVDmAVfeYg=
@@ -1167,8 +1169,8 @@ go.opentelemetry.io/contrib/zpages v0.62.0 h1:9fUYTLmrK0x/lweM2uM+BOx069jLx8PxVq
11671169
go.opentelemetry.io/contrib/zpages v0.62.0/go.mod h1:C8kXoiC1Ytvereztus2R+kqdSa6W/MZ8FfS8Zwj+LiM=
11681170
go.opentelemetry.io/ebpf-profiler v0.0.0-20250212075250-7bf12d3f962f h1:DqRQ7JaRjf3TwWwfwHIvsBB/aLUs+kgrX+MrAIllALI=
11691171
go.opentelemetry.io/ebpf-profiler v0.0.0-20250212075250-7bf12d3f962f/go.mod h1:hfAVBjRN6FZjSgZUBsNzvRDJWlS46R5Y0SGVr4Jl86s=
1170-
go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ=
1171-
go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I=
1172+
go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8=
1173+
go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM=
11721174
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.13.0 h1:z6lNIajgEBVtQZHjfw2hAccPEBDs+nx58VemmXWa2ec=
11731175
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.13.0/go.mod h1:+kyc3bRx/Qkq05P6OCu3mTEIOxYRYzoIg+JsUp5X+PM=
11741176
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.13.0 h1:zUfYw8cscHHLwaY8Xz3fiJu+R59xBnkgq2Zr1lwmK/0=
@@ -1191,22 +1193,22 @@ go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.37.0 h1:6VjV6Et+1Hd2iL
11911193
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.37.0/go.mod h1:u8hcp8ji5gaM/RfcOo8z9NMnf1pVLfVY7lBY2VOGuUU=
11921194
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.37.0 h1:SNhVp/9q4Go/XHBkQ1/d5u9P/U+L1yaGPoi0x+mStaI=
11931195
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.37.0/go.mod h1:tx8OOlGH6R4kLV67YaYO44GFXloEjGPZuMjEkaaqIp4=
1194-
go.opentelemetry.io/otel/log v0.13.0 h1:yoxRoIZcohB6Xf0lNv9QIyCzQvrtGZklVbdCoyb7dls=
1195-
go.opentelemetry.io/otel/log v0.13.0/go.mod h1:INKfG4k1O9CL25BaM1qLe0zIedOpvlS5Z7XgSbmN83E=
1196+
go.opentelemetry.io/otel/log v0.14.0 h1:2rzJ+pOAZ8qmZ3DDHg73NEKzSZkhkGIua9gXtxNGgrM=
1197+
go.opentelemetry.io/otel/log v0.14.0/go.mod h1:5jRG92fEAgx0SU/vFPxmJvhIuDU9E1SUnEQrMlJpOno=
11961198
go.opentelemetry.io/otel/log/logtest v0.13.0 h1:xxaIcgoEEtnwdgj6D6Uo9K/Dynz9jqIxSDu2YObJ69Q=
11971199
go.opentelemetry.io/otel/log/logtest v0.13.0/go.mod h1:+OrkmsAH38b+ygyag1tLjSFMYiES5UHggzrtY1IIEA8=
1198-
go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE=
1199-
go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E=
1200-
go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI=
1201-
go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg=
1200+
go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA=
1201+
go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI=
1202+
go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E=
1203+
go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg=
12021204
go.opentelemetry.io/otel/sdk/log v0.13.0 h1:I3CGUszjM926OphK8ZdzF+kLqFvfRY/IIoFq/TjwfaQ=
12031205
go.opentelemetry.io/otel/sdk/log v0.13.0/go.mod h1:lOrQyCCXmpZdN7NchXb6DOZZa1N5G1R2tm5GMMTpDBw=
12041206
go.opentelemetry.io/otel/sdk/log/logtest v0.13.0 h1:9yio6AFZ3QD9j9oqshV1Ibm9gPLlHNxurno5BreMtIA=
12051207
go.opentelemetry.io/otel/sdk/log/logtest v0.13.0/go.mod h1:QOGiAJHl+fob8Nu85ifXfuQYmJTFAvcrxL6w5/tu168=
1206-
go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFhbjxHHspCPc=
1207-
go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps=
1208-
go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4=
1209-
go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0=
1208+
go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM=
1209+
go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA=
1210+
go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE=
1211+
go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs=
12101212
go.opentelemetry.io/proto/otlp v1.7.0 h1:jX1VolD6nHuFzOYso2E73H85i92Mv8JQYk0K9vz09os=
12111213
go.opentelemetry.io/proto/otlp v1.7.0/go.mod h1:fSKjH6YJ7HDlwzltzyMj036AJ3ejJLCgCSHGj4efDDo=
12121214
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
@@ -1420,8 +1422,8 @@ google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac
14201422
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
14211423
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
14221424
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
1423-
google.golang.org/grpc v1.75.0 h1:+TW+dqTd2Biwe6KKfhE5JpiYIBWq865PhKGSXiivqt4=
1424-
google.golang.org/grpc v1.75.0/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ=
1425+
google.golang.org/grpc v1.75.1 h1:/ODCNEuf9VghjgO3rqLcfg8fiOP0nSluljWFlDxELLI=
1426+
google.golang.org/grpc v1.75.1/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ=
14251427
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
14261428
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
14271429
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -1433,8 +1435,8 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
14331435
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
14341436
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
14351437
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
1436-
google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A=
1437-
google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
1438+
google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw=
1439+
google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
14381440
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
14391441
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
14401442
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

libbeat/tests/integration/elasticsearch_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ logging.level: debug
126126

127127
// 1. wait mockbeat to start
128128
mockbeat.WaitForLogs(
129-
fmt.Sprint("mockbeat start running"),
129+
"mockbeat start running",
130130
10*time.Second,
131131
"did not find 'mockbeat start running' log")
132132

@@ -148,7 +148,7 @@ logging.level: debug
148148

149149
// 5. wait mockbeat to start again
150150
mockbeat.WaitForLogs(
151-
fmt.Sprint("mockbeat start running"),
151+
"mockbeat start running",
152152
10*time.Second,
153153
"did not find 'mockbeat start running' log again")
154154
}
@@ -160,19 +160,19 @@ func startMockES(t *testing.T, addr string) (*http.Server, metrics.Registry) {
160160

161161
s := http.Server{Addr: addr, Handler: es, ReadHeaderTimeout: time.Second}
162162
go func() {
163-
if err := s.ListenAndServe(); !errors.Is(http.ErrServerClosed, err) {
163+
if err := s.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
164164
t.Errorf("could not start mock-es server: %s", err)
165165
}
166166
}()
167167

168168
require.Eventually(t, func() bool {
169-
resp, err := http.Get("http://" + addr) // nolint: noctx // It's just a test
169+
resp, err := http.Get("http://" + addr) //nolint: noctx // It's just a test
170170
if err != nil {
171-
// nolint: errcheck // We're just draining the body, we can ignore the error
172-
io.Copy(io.Discard, resp.Body)
173-
resp.Body.Close()
174171
return false
175172
}
173+
//nolint: errcheck // We're just draining the body, we can ignore the error
174+
io.Copy(io.Discard, resp.Body)
175+
resp.Body.Close()
176176
return true
177177
},
178178
time.Second, time.Millisecond, "mock-es server did not start on '%s'", addr)

x-pack/otel/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# OpenTelemetry Collector Components in Beats
2+
3+
This is the home of OpenTelemetry Collector components like receivers, processors, exporters, extensions, connectors, etc. that are related to Beats.
4+
5+
The intended structure of this directory is to mimic the structure in the [OpenTelemetry Collector Contrib] repository, specifically:
6+
7+
- Put X receiver in `receiver/xreceiver` subdirectory,
8+
- Put Y processor in `processor/yprocessor` subdirectory,
9+
- Put Z exporter in `exporter/zexporter` subdirectory,
10+
- and so on.
11+
12+
There should be no need to put any Go files directly in this directory.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Beat Processor
2+
3+
| Status | |
4+
| --------- | ------------------- |
5+
| Stability | [development]: logs |
6+
7+
[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
8+
9+
> [!NOTE]
10+
> This component is currently in development and no functionality is implemented.
11+
> Including it in a pipeline is a no-op.
12+
> The documentation describes the intended state after the functionality is implemented.
13+
14+
The Beat processor (`beat`) is an OpenTelemetry Collector processor that wraps the [Beat processors].
15+
This allows you to use Beat processorss like e.g. [add_host_metadata] anywhere in the OpenTelemetry Collector's pipeline, independently of Beat receivers.
16+
17+
> [!NOTE]
18+
> This component is only expected to work correctly with data from the Beat receivers: [Filebeat receiver], [Metricbeat receiver].
19+
> This is because it relies on the specific structure of telemetry emitted by those components.
20+
> Using it with data coming from other components is not recommended and may result in unexpected behavior.
21+
22+
## Example
23+
24+
The following [Filebeat receiver] configuration
25+
26+
```yaml
27+
receivers:
28+
filebeatreceiver:
29+
filebeat:
30+
inputs:
31+
- type: filestream
32+
id: host-logs
33+
paths:
34+
- /var/log/*.log
35+
processors:
36+
- add_host_metadata: ~
37+
output:
38+
otelconsumer:
39+
```
40+
41+
is functionally equivalent to this one, using the Beat processor:
42+
43+
```yaml
44+
receivers:
45+
filebeatreceiver:
46+
filebeat:
47+
inputs:
48+
- type: filestream
49+
id: host-logs
50+
paths:
51+
- /var/log/*.log
52+
output:
53+
otelconsumer:
54+
55+
processors:
56+
beat:
57+
processors:
58+
- add_host_metadata: ~
59+
```
60+
61+
[Beat processors]: https://www.elastic.co/docs/reference/beats/filebeat/filtering-enhancing-data#using-processors
62+
[Filebeat receiver]: https://github.com/elastic/beats/tree/main/x-pack/filebeat/fbreceiver
63+
[Metricbeat receiver]: https://github.com/elastic/beats/tree/main/x-pack/metricbeat/mbreceiver
64+
[add_host_metadata]: https://www.elastic.co/docs/reference/beats/filebeat/add-host-metadata
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package beatprocessor
6+
7+
type Config struct{}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package beatprocessor
6+
7+
import (
8+
"context"
9+
10+
"go.opentelemetry.io/collector/component"
11+
"go.opentelemetry.io/collector/consumer"
12+
"go.opentelemetry.io/collector/pdata/plog"
13+
"go.opentelemetry.io/collector/processor"
14+
"go.opentelemetry.io/collector/processor/processorhelper"
15+
)
16+
17+
const (
18+
Name = "beat"
19+
)
20+
21+
func NewFactory() processor.Factory {
22+
return processor.NewFactory(
23+
component.MustNewType(Name),
24+
createDefaultConfig,
25+
processor.WithLogs(createLogsProcessor, component.StabilityLevelDevelopment),
26+
)
27+
}
28+
29+
func createDefaultConfig() component.Config {
30+
return &Config{}
31+
}
32+
33+
func createLogsProcessor(
34+
ctx context.Context,
35+
set processor.Settings,
36+
cfg component.Config,
37+
nextConsumer consumer.Logs,
38+
) (processor.Logs, error) {
39+
return processorhelper.NewLogs(
40+
ctx,
41+
set,
42+
cfg,
43+
nextConsumer,
44+
func(_ context.Context, logs plog.Logs) (plog.Logs, error) {
45+
// This is a placeholder for the actual processing logic.
46+
return logs, nil
47+
},
48+
)
49+
}

0 commit comments

Comments
 (0)