Skip to content

Commit 7c94eee

Browse files
danschdatsciHeiko Zuerker
andauthored
Initial public release
Co-authored-by: Heiko Zuerker <[email protected]>
1 parent b8d7e68 commit 7c94eee

File tree

8 files changed

+1249
-22
lines changed

8 files changed

+1249
-22
lines changed

ACKNOWLEDGEMENT.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
## Acknolwedgement
2+
3+
This adapter is based off code found in the official prometheus repo: https://github.com/prometheus/prometheus/tree/master/documentation/examples/remote_storage/remote_storage_adapter
4+
5+
Thanks to all the great contributors support of this project!

Makefile

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
BINARY = prometheus_bigquery_remote_storage_adapter
2+
COMMIT := $(shell git rev-parse HEAD)
3+
BRANCH := $(shell git symbolic-ref --short -q HEAD || echo HEAD)
4+
DATE := $(shell date -u +%Y%m%d-%H:%M:%S)
5+
VERSION_PKG = github.com/KohlsTechnology/prometheus_bigquery_remote_storage_adapter/pkg/version
6+
LDFLAGS := "-X ${VERSION_PKG}.Branch=${BRANCH} -X ${VERSION_PKG}.BuildDate=${DATE} \
7+
-X ${VERSION_PKG}.GitSHA1=${COMMIT}"
8+
TAG?=""
9+
10+
.PHONY: all
11+
all: build
12+
13+
.PHONY: clean
14+
clean:
15+
rm -rf $(BINARY) dist/
16+
17+
.PHONY: build
18+
build:
19+
CGO_ENABLED=0 go build -o $(BINARY) -ldflags $(LDFLAGS)
20+
21+
.PHONY: test
22+
test: fmt vet test-unit
23+
24+
.PHONY: test-unit
25+
test-unit:
26+
go test -race -coverprofile=coverage.txt -covermode=atomic ./...
27+
28+
.PHONY: test-e2e
29+
test-e2e: build
30+
go test -v ./pkg/e2e/e2e_test.go
31+
32+
# Make sure go.mod and go.sum are not modified
33+
.PHONY: test-dirty
34+
test-dirty: build
35+
go mod tidy
36+
git diff --exit-code
37+
38+
# Make sure goreleaser is working
39+
.PHONY: test-release
40+
test-release:
41+
BRANCH=$(BRANCH) COMMIT=$(COMMIT) DATE=$(DATE) VERSION_PKG=$(VERSION_PKG) goreleaser --snapshot --skip-publish --rm-dist
42+
43+
.PHONY: fmt
44+
fmt:
45+
test -z "$(shell gofmt -l .)"
46+
47+
.PHONY: lint
48+
lint:
49+
LINT_INPUT="$(shell go list ./...)"; golint -set_exit_status $$LINT_INPUT
50+
51+
.PHONY: vet
52+
vet:
53+
VET_INPUT="$(shell go list ./...)"; go vet $$VET_INPUT
54+
55+
.PHONY: tag
56+
tag:
57+
git tag -a $(TAG) -m "Release $(TAG)"
58+
git push origin $(TAG)
59+
60+
# Requires GITHUB_TOKEN environment variable to be set
61+
.PHONY: release
62+
release:
63+
BRANCH=$(BRANCH) COMMIT=$(COMMIT) DATE=$(DATE) VERSION_PKG=$(VERSION_PKG) goreleaser

README.md

Lines changed: 46 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,63 @@
1-
# YOURPROJECTNAME
1+
# BigQuery Remote Storage Adapter for Prometheus
22

3-
Describe what this project is all about.
3+
This is a write adapter that receives samples via Prometheus's remote write protocol and stores them in Google BigQuery. This adapter is based off code found in the official prometheus repo:
44

5-
## Installation
5+
https://github.com/prometheus/prometheus/tree/master/documentation/examples/remote_storage/remote_storage_adapter
66

7-
Add steps for installing here.
7+
Remote read is not currently supported by this adapter.
88

9-
## Documentation
9+
Billing MUST be enabled on the GCP project with the destination BigQuery tables. This adapter uses the "streaming inserts" API. More information is available here: https://cloud.google.com/bigquery/streaming-data-into-bigquery#before_you_begin
1010

11-
Add documentaiton here, or add a link to
12-
the full documentation.
1311

14-
## Developing
12+
The table schema in BigQuery should be the following format:
1513

16-
See [CONTRIBUTING.md](.github/CONTRIBUTING.md)
17-
for details.
14+
| Field name | Type | Mode |
15+
| --- | --- | --- |
16+
| metricname | STRING | NULLABLE |
17+
| tags | STRING | NULLABLE |
18+
| value | FLOAT | NULLABLE |
19+
| timestamp | TIMESTAMP | NULLABLE |
1820

19-
## License
2021

21-
See [LICENSE](LICENSE) for details.
22+
It is recommended that the BigQuery table is partitioned on the timestamp column for performance.
2223

23-
**START REMOVE THIS BEFORE RELEASE OF PROJECT**
24+
The tags field is a json string and can be easily extracted. Here is an example query:
2425

25-
Each source file must include a header comment
26-
with the Apache 2.0 license details. The copyright
27-
section in the header must be filled in with the below
28-
details.
26+
```
27+
SELECT metricname, tags, JSON_EXTRACT(tags, '$.some_label')
28+
AS some_label, value, timestamp
29+
FROM `your_gcp_project.prometheus.metrics_stream`
30+
WHERE JSON_EXTRACT(tags, '$.some_label') = "\\"target_label_value\\""
31+
```
32+
33+
## Building
2934

3035
```
31-
Copyright 2019 Kohl's Department Stores, Inc.
36+
go build
3237
```
33-
**END REMOVE THIS BEFORE RELEASE OF PROJECT**
3438

35-
## Code of Conduct
39+
## Running
3640

37-
See [CODE_OF_CONDUCT.md](.github/CODE_OF_CONDUCT.md)
38-
for details.
41+
```
42+
./bigquery_remote_storage_adapter \
43+
--googleAPIjsonkeypath=/secret/gcp_service_account.json \
44+
--googleAPIdatasetID=prometheus \
45+
--googleAPItableID=metrics_stream
46+
```
47+
48+
To show all flags:
3949

50+
```
51+
./bigquery_remote_storage_adapter -h
52+
```
53+
54+
## Configuring Prometheus
55+
56+
To configure Prometheus to send samples to this binary, add the following to your `prometheus.yml`:
57+
58+
```yaml
59+
# Remote write configuration (for Google BigQuery).
60+
remote_write:
61+
- url: "http://localhost:9201/write"
62+
63+
```

bigquerydb/client.go

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
Copyright 2020 Kohl's Department Stores, Inc.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package bigquerydb
15+
16+
import (
17+
"context"
18+
"encoding/json"
19+
"fmt"
20+
"io/ioutil"
21+
"math"
22+
"os"
23+
"strings"
24+
"time"
25+
26+
"cloud.google.com/go/bigquery"
27+
"github.com/go-kit/kit/log"
28+
"github.com/go-kit/kit/log/level"
29+
"github.com/prometheus/client_golang/prometheus"
30+
"github.com/prometheus/common/model"
31+
"google.golang.org/api/option"
32+
)
33+
34+
// BigqueryClient allows sending batches of Prometheus samples to Bigquery.
35+
type BigqueryClient struct {
36+
logger log.Logger
37+
client bigquery.Client
38+
datasetID string
39+
tableID string
40+
timeout time.Duration
41+
ignoredSamples prometheus.Counter
42+
}
43+
44+
// NewClient creates a new Client.
45+
func NewClient(logger log.Logger, googleAPIjsonkeypath, googleAPIdatasetID, googleAPItableID string, remoteTimeout time.Duration) *BigqueryClient {
46+
ctx := context.Background()
47+
48+
jsonFile, err := os.Open(googleAPIjsonkeypath)
49+
if err != nil {
50+
level.Error(logger).Log("err", err)
51+
os.Exit(1)
52+
}
53+
54+
byteValue, _ := ioutil.ReadAll(jsonFile)
55+
56+
var result map[string]interface{}
57+
json.Unmarshal([]byte(byteValue), &result)
58+
59+
//fmt.Println(result["type"])
60+
jsonFile.Close()
61+
62+
projectID := fmt.Sprintf("%v", result["project_id"])
63+
64+
c, err := bigquery.NewClient(ctx, projectID, option.WithCredentialsFile(googleAPIjsonkeypath))
65+
if err != nil {
66+
level.Error(logger).Log("err", err)
67+
os.Exit(1)
68+
}
69+
70+
if logger == nil {
71+
logger = log.NewNopLogger()
72+
}
73+
74+
return &BigqueryClient{
75+
logger: logger,
76+
client: *c,
77+
datasetID: googleAPIdatasetID,
78+
tableID: googleAPItableID,
79+
timeout: remoteTimeout,
80+
ignoredSamples: prometheus.NewCounter(
81+
prometheus.CounterOpts{
82+
Name: "prometheus_bigquery_ignored_samples_total",
83+
Help: "The total number of samples not sent to BigQuery due to unsupported float values (Inf, -Inf, NaN).",
84+
},
85+
),
86+
}
87+
}
88+
89+
// Item represents a row item.
90+
type Item struct {
91+
value float64
92+
metricname string
93+
timestamp time.Time
94+
tags string
95+
}
96+
97+
// Save implements the ValueSaver interface.
98+
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
99+
return map[string]bigquery.Value{
100+
"value": i.value,
101+
"metricname": i.metricname,
102+
"timestamp": i.timestamp,
103+
"tags": i.tags,
104+
}, "", nil
105+
}
106+
107+
// tagsFromMetric extracts tags from a Prometheus MetricNameLabel.
108+
func tagsFromMetric(m model.Metric) string {
109+
tags := make(map[string]interface{}, len(m)-1)
110+
for l, v := range m {
111+
if l != model.MetricNameLabel {
112+
tags[string(l)] = string(v)
113+
}
114+
}
115+
tagsmarshaled, _ := json.Marshal(tags)
116+
return string(tagsmarshaled)
117+
}
118+
119+
// Write sends a batch of samples to BigQuery via the client.
120+
func (c *BigqueryClient) Write(samples model.Samples) error {
121+
inserter := c.client.Dataset(c.datasetID).Table(c.tableID).Inserter()
122+
inserter.SkipInvalidRows = true
123+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
124+
125+
batch := make([]*Item, 0, len(samples))
126+
127+
for _, s := range samples {
128+
v := float64(s.Value)
129+
if math.IsNaN(v) || math.IsInf(v, 0) {
130+
level.Debug(c.logger).Log("msg", "cannot send to BigQuery, skipping sample", "value", v, "sample", s)
131+
c.ignoredSamples.Inc()
132+
continue
133+
}
134+
135+
batch = append(batch, &Item{
136+
value: v,
137+
metricname: string(s.Metric[model.MetricNameLabel]),
138+
timestamp: s.Timestamp.Time(),
139+
tags: tagsFromMetric(s.Metric),
140+
})
141+
142+
}
143+
144+
if err := inserter.Put(ctx, batch); err != nil {
145+
if multiError, ok := err.(bigquery.PutMultiError); ok {
146+
for _, err1 := range multiError {
147+
for _, err2 := range err1.Errors {
148+
fmt.Println(err2)
149+
}
150+
}
151+
}
152+
defer cancel()
153+
return err
154+
}
155+
defer cancel()
156+
return nil
157+
}
158+
159+
func concatLabels(labels map[string]string) string {
160+
// 0xff cannot occur in valid UTF-8 sequences, so use it
161+
// as a separator here.
162+
separator := "\xff"
163+
pairs := make([]string, 0, len(labels))
164+
for k, v := range labels {
165+
pairs = append(pairs, k+separator+v)
166+
}
167+
return strings.Join(pairs, separator)
168+
}
169+
170+
// Name identifies the client as a BigQuery client.
171+
func (c BigqueryClient) Name() string {
172+
return "bigquerydb"
173+
}
174+
175+
// Describe implements prometheus.Collector.
176+
func (c *BigqueryClient) Describe(ch chan<- *prometheus.Desc) {
177+
ch <- c.ignoredSamples.Desc()
178+
}
179+
180+
// Collect implements prometheus.Collector.
181+
func (c *BigqueryClient) Collect(ch chan<- prometheus.Metric) {
182+
ch <- c.ignoredSamples
183+
}

0 commit comments

Comments
 (0)