Skip to content

Commit da0fd78

Browse files
authored
Add checkpoint publication related metrics (#837)
Adds publication latency histogram and attempt count (by outcome) metrics to each of the backends.
1 parent a9118ff commit da0fd78

File tree

8 files changed

+151
-27
lines changed

8 files changed

+151
-27
lines changed

storage/aws/aws.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ import (
5959
"github.com/transparency-dev/tessera/internal/migrate"
6060
"github.com/transparency-dev/tessera/internal/parse"
6161
storage "github.com/transparency-dev/tessera/storage/internal"
62+
"go.opentelemetry.io/otel/metric"
6263
"golang.org/x/sync/errgroup"
6364
"k8s.io/klog/v2"
6465

@@ -301,7 +302,7 @@ func (a *Appender) publishCheckpointJob(ctx context.Context, pubInterval, republ
301302
case <-a.treeUpdated:
302303
case <-t.C:
303304
}
304-
if err := a.sequencer.publishCheckpoint(ctx, pubInterval, republishInterval, a.publishCheckpoint); err != nil {
305+
if err := a.sequencer.publishCheckpoint(ctx, pubInterval, republishInterval, a.updateCheckpoint); err != nil {
305306
klog.Warningf("publishCheckpoint: %v", err)
306307
}
307308
}
@@ -380,7 +381,7 @@ func (a *Appender) init(ctx context.Context) error {
380381
return nil
381382
}
382383

383-
func (a *Appender) publishCheckpoint(ctx context.Context, size uint64, root []byte) error {
384+
func (a *Appender) updateCheckpoint(ctx context.Context, size uint64, root []byte) error {
384385
cpRaw, err := a.newCP(ctx, size, root)
385386
if err != nil {
386387
return fmt.Errorf("newCP: %v", err)
@@ -390,7 +391,7 @@ func (a *Appender) publishCheckpoint(ctx context.Context, size uint64, root []by
390391
return fmt.Errorf("writeCheckpoint: %v", err)
391392
}
392393

393-
klog.V(2).Infof("Published latest checkpoint: %d, %x", size, root)
394+
klog.V(2).Infof("Created and stored latest checkpoint: %d, %x", size, root)
394395

395396
return nil
396397
}
@@ -718,14 +719,17 @@ func (lrs *logResourceStore) setCheckpoint(ctx context.Context, cpRaw []byte) er
718719
//
719720
// The location to which the tile is written is defined by the tile layout spec.
720721
func (lrs *logResourceStore) setTile(ctx context.Context, level, index, logSize uint64, tile *api.HashTile) error {
722+
start := time.Now()
721723
data, err := tile.MarshalText()
722724
if err != nil {
723725
return err
724726
}
725727
tPath := layout.TilePath(level, index, layout.PartialTileSize(level, index, logSize))
726728
klog.V(2).Infof("StoreTile: %s (%d entries)", tPath, len(tile.Nodes))
727729

728-
return lrs.objStore.setObjectIfNoneMatch(ctx, tPath, data, logContType, logCacheControl)
730+
err = lrs.objStore.setObjectIfNoneMatch(ctx, tPath, data, logContType, logCacheControl)
731+
opsHistogram.Record(ctx, time.Since(start).Milliseconds(), metric.WithAttributes(opNameKey.String("writeTile")))
732+
return err
729733
}
730734

731735
// getTiles returns the tiles with the given tile-coords for the specified log size.
@@ -1210,7 +1214,16 @@ func (s *mySQLSequencer) nextIndex(ctx context.Context) (uint64, error) {
12101214
//
12111215
// This function uses PubCoord with an exclusive lock to guarantee that only one tessera instance can attempt to publish
12121216
// a checkpoint at any given time.
1213-
func (s *mySQLSequencer) publishCheckpoint(ctx context.Context, minStaleActive, minStaleRepub time.Duration, f func(context.Context, uint64, []byte) error) error {
1217+
func (s *mySQLSequencer) publishCheckpoint(ctx context.Context, minStaleActive, minStaleRepub time.Duration, f func(context.Context, uint64, []byte) error) (errR error) {
1218+
start := time.Now()
1219+
defer func() {
1220+
// Detect any errors and update metrics accordingly.
1221+
// Non-error cases are explicitly handled in the body of the function below.
1222+
if errR != nil {
1223+
publishCount.Add(ctx, 1, metric.WithAttributes(errorTypeKey.String("error")))
1224+
}
1225+
}()
1226+
12141227
tx, err := s.dbPool.Begin()
12151228
if err != nil {
12161229
return err
@@ -1230,6 +1243,7 @@ func (s *mySQLSequencer) publishCheckpoint(ctx context.Context, minStaleActive,
12301243
cpAge := time.Since(time.Unix(pubAt, 0))
12311244
if cpAge < minStaleActive {
12321245
klog.V(1).Infof("publishCheckpoint: last checkpoint published %s ago (< required %s), not publishing new checkpoint", cpAge, minStaleActive)
1246+
publishCount.Add(ctx, 1, metric.WithAttributes(errorTypeKey.String("skipped")))
12331247
return nil
12341248
}
12351249

@@ -1254,6 +1268,7 @@ func (s *mySQLSequencer) publishCheckpoint(ctx context.Context, minStaleActive,
12541268

12551269
if !shouldPublish {
12561270
klog.V(1).Infof("publishCheckpoint: skipping publish because tree hasn't grown and previous checkpoint is too recent")
1271+
publishCount.Add(ctx, 1, metric.WithAttributes(errorTypeKey.String("skipped_no_growth")))
12571272
return nil
12581273
}
12591274

@@ -1269,6 +1284,8 @@ func (s *mySQLSequencer) publishCheckpoint(ctx context.Context, minStaleActive,
12691284
if err := tx.Commit(); err != nil {
12701285
return err
12711286
}
1287+
opsHistogram.Record(ctx, time.Since(start).Milliseconds(), metric.WithAttributes(opNameKey.String("publishCheckpoint")))
1288+
publishCount.Add(ctx, 1)
12721289
tx = nil
12731290

12741291
return nil

storage/aws/aws_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ func TestPublishTree(t *testing.T) {
441441
if err := storage.init(ctx); err != nil {
442442
t.Fatalf("storage.init: %v", err)
443443
}
444-
if err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.publishCheckpoint); err != nil {
444+
if err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.updateCheckpoint); err != nil {
445445
t.Fatalf("publishTree: %v", err)
446446
}
447447
cpOld := []byte("bananas")
@@ -451,7 +451,7 @@ func TestPublishTree(t *testing.T) {
451451
updatesSeen := 0
452452
for _, d := range test.attempts {
453453
time.Sleep(d)
454-
if err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.publishCheckpoint); err != nil {
454+
if err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.updateCheckpoint); err != nil {
455455
t.Fatalf("publishTree: %v", err)
456456
}
457457
cpNew, err := m.getObject(ctx, layout.CheckpointPath)
@@ -507,7 +507,7 @@ func TestGarbageCollect(t *testing.T) {
507507
if err != nil {
508508
t.Fatalf("newAppender: %v", err)
509509
}
510-
if err := appender.publishCheckpoint(ctx, 0, []byte("")); err != nil {
510+
if err := appender.updateCheckpoint(ctx, 0, []byte("")); err != nil {
511511
t.Fatalf("publishCheckpoint: %v", err)
512512
}
513513

@@ -631,7 +631,7 @@ func TestGarbageCollectOption(t *testing.T) {
631631
if err != nil {
632632
t.Fatalf("newAppender: %v", err)
633633
}
634-
if err := appender.publishCheckpoint(ctx, 0, []byte("")); err != nil {
634+
if err := appender.updateCheckpoint(ctx, 0, []byte("")); err != nil {
635635
t.Fatalf("publishCheckpoint: %v", err)
636636
}
637637

storage/aws/otel.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,46 @@ package aws
1717
import (
1818
"go.opentelemetry.io/otel"
1919
"go.opentelemetry.io/otel/attribute"
20+
"go.opentelemetry.io/otel/metric"
21+
"k8s.io/klog/v2"
2022
)
2123

2224
const name = "github.com/transparency-dev/tessera/storage/aws"
2325

2426
var (
27+
meter = otel.Meter(name)
2528
tracer = otel.Tracer(name)
2629
)
2730

2831
var (
32+
errorTypeKey = attribute.Key("error.type")
2933
objectPathKey = attribute.Key("tessera.objectPath")
34+
opNameKey = attribute.Key("op_name")
35+
36+
opsHistogram metric.Int64Histogram
37+
publishCount metric.Int64Counter
38+
39+
// Custom histogram buckets as we're interested in low-millis upto low-seconds.
40+
histogramBuckets = []float64{0, 1, 2, 5, 10, 20, 50, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1200, 1400, 1600, 1800, 2000, 2500, 3000, 4000, 5000, 6000, 8000, 10000}
3041
)
42+
43+
func init() {
44+
var err error
45+
46+
opsHistogram, err = meter.Int64Histogram(
47+
"tessera.appender.ops.duration",
48+
metric.WithDescription("Duration of calls to storage operations"),
49+
metric.WithUnit("ms"),
50+
metric.WithExplicitBucketBoundaries(histogramBuckets...))
51+
if err != nil {
52+
klog.Exitf("Failed to create opsHistogram metric: %v", err)
53+
}
54+
55+
publishCount, err = meter.Int64Counter(
56+
"tessera.appender.checkpoint.publication.counter",
57+
metric.WithDescription("Number of checkpoint publication attempts by result"),
58+
metric.WithUnit("{call}"))
59+
if err != nil {
60+
klog.Exitf("Failed to create checkpoint publication counter metric: %v", err)
61+
}
62+
}

storage/gcp/gcp.go

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ import (
4545
database "cloud.google.com/go/spanner/admin/database/apiv1"
4646
adminpb "cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
4747
"cloud.google.com/go/spanner/apiv1/spannerpb"
48+
"go.opentelemetry.io/otel/attribute"
49+
"go.opentelemetry.io/otel/metric"
4850

4951
gcs "cloud.google.com/go/storage"
5052
"github.com/google/go-cmp/cmp"
@@ -349,14 +351,9 @@ func (a *Appender) publishCheckpointJob(ctx context.Context, pubInterval, republ
349351
case <-a.cpUpdated:
350352
case <-t.C:
351353
}
352-
func() {
353-
ctx, span := tracer.Start(ctx, "tessera.storage.gcp.publishCheckpointJob")
354-
defer span.End()
355-
356-
if err := a.sequencer.publishCheckpoint(ctx, pubInterval, republishInterval, a.publishCheckpoint); err != nil {
357-
klog.Warningf("publishCheckpoint failed: %v", err)
358-
}
359-
}()
354+
if err := a.sequencer.publishCheckpoint(ctx, pubInterval, republishInterval, a.updateCheckpoint); err != nil {
355+
klog.Warningf("publishCheckpoint failed: %v", err)
356+
}
360357
}
361358
}
362359

@@ -427,8 +424,8 @@ func (a *Appender) init(ctx context.Context) error {
427424
return nil
428425
}
429426

430-
func (a *Appender) publishCheckpoint(ctx context.Context, size uint64, root []byte) error {
431-
ctx, span := tracer.Start(ctx, "tessera.storage.gcp.publishCheckpoint")
427+
func (a *Appender) updateCheckpoint(ctx context.Context, size uint64, root []byte) error {
428+
ctx, span := tracer.Start(ctx, "tessera.storage.gcp.updateCheckpoint")
432429
defer span.End()
433430
span.SetAttributes(treeSizeKey.Int64(otel.Clamp64(size)))
434431

@@ -441,7 +438,7 @@ func (a *Appender) publishCheckpoint(ctx context.Context, size uint64, root []by
441438
return fmt.Errorf("writeCheckpoint: %v", err)
442439
}
443440

444-
klog.V(2).Infof("Published latest checkpoint: %d, %x", size, root)
441+
klog.V(2).Infof("Created and stored latest checkpoint: %d, %x", size, root)
445442

446443
return nil
447444

@@ -473,16 +470,23 @@ func (lrs *logResourceStore) getCheckpoint(ctx context.Context) ([]byte, error)
473470
//
474471
// The location to which the tile is written is defined by the tile layout spec.
475472
func (s *logResourceStore) setTile(ctx context.Context, level, index uint64, partial uint8, data []byte) error {
473+
start := time.Now()
474+
476475
tPath := layout.TilePath(level, index, partial)
477-
return s.objStore.setObject(ctx, tPath, data, &gcs.Conditions{DoesNotExist: true}, logContType, logCacheControl)
476+
err := s.objStore.setObject(ctx, tPath, data, &gcs.Conditions{DoesNotExist: true}, logContType, logCacheControl)
477+
opsHistogram.Record(ctx, time.Since(start).Milliseconds(), metric.WithAttributes(opNameKey.String("writeTile")))
478+
return err
478479
}
479480

480481
// getTile retrieves the raw tile from the provided location.
481482
//
482483
// The location to which the tile is written is defined by the tile layout spec.
483484
func (s *logResourceStore) getTile(ctx context.Context, level, index uint64, partial uint8) ([]byte, error) {
485+
start := time.Now()
486+
484487
tPath := layout.TilePath(level, index, partial)
485488
d, _, err := s.objStore.getObject(ctx, tPath)
489+
opsHistogram.Record(ctx, time.Since(start).Milliseconds(), metric.WithAttributes(opNameKey.String("readTile")))
486490
return d, err
487491
}
488492

@@ -641,6 +645,7 @@ func (a *Appender) updateEntryBundles(ctx context.Context, fromSeq uint64, entri
641645
return nil
642646
}
643647

648+
numAdded := uint64(0)
644649
bundleIndex, entriesInBundle := fromSeq/layout.EntryBundleWidth, fromSeq%layout.EntryBundleWidth
645650
bundleWriter := &bytes.Buffer{}
646651
if entriesInBundle > 0 {
@@ -674,6 +679,8 @@ func (a *Appender) updateEntryBundles(ctx context.Context, fromSeq uint64, entri
674679
return fmt.Errorf("bundleWriter.Write: %v", err)
675680
}
676681
entriesInBundle++
682+
fromSeq++
683+
numAdded++
677684
if entriesInBundle == layout.EntryBundleWidth {
678685
// This bundle is full, so we need to write it out...
679686
klog.V(1).Infof("In-memory bundle idx %d is full, attempting write to GCS", bundleIndex)
@@ -1010,7 +1017,17 @@ func (s *spannerCoordinator) nextIndex(ctx context.Context) (uint64, error) {
10101017
// This function uses PubCoord with an exclusive lock to guarantee that only one tessera instance can attempt to publish
10111018
// a checkpoint at any given time.
10121019
func (s *spannerCoordinator) publishCheckpoint(ctx context.Context, minStaleActive, minStaleRepub time.Duration, f func(context.Context, uint64, []byte) error) error {
1020+
ctx, span := tracer.Start(ctx, "tessera.storage.gcp.publishCheckpoint")
1021+
defer span.End()
1022+
1023+
// outcomeAttrs is used to track any attributes which need to be attached to metrics based on the outcome of the attempt to publish.
1024+
var outcomeAttrs []attribute.KeyValue
1025+
start := time.Now()
1026+
10131027
if _, err := s.dbPool.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
1028+
// Reset outcome attributes from any prior transaction attempts.
1029+
outcomeAttrs = []attribute.KeyValue{}
1030+
10141031
pRow, err := txn.ReadRowWithOptions(ctx, "PubCoord", spanner.Key{0}, []string{"publishedAt", "size"}, &spanner.ReadOptions{LockHint: spannerpb.ReadRequest_LOCK_HINT_EXCLUSIVE})
10151032
if err != nil {
10161033
return fmt.Errorf("failed to read PubCoord: %w", err)
@@ -1024,6 +1041,7 @@ func (s *spannerCoordinator) publishCheckpoint(ctx context.Context, minStaleActi
10241041
cpAge := time.Since(pubAt)
10251042
if cpAge < minStaleActive {
10261043
klog.V(1).Infof("publishCheckpoint: last checkpoint published %s ago (< required %s), not publishing new checkpoint", cpAge, minStaleActive)
1044+
outcomeAttrs = append(outcomeAttrs, errorTypeKey.String("skipped"))
10271045
return nil
10281046
}
10291047

@@ -1051,6 +1069,7 @@ func (s *spannerCoordinator) publishCheckpoint(ctx context.Context, minStaleActi
10511069

10521070
if !shouldPublish {
10531071
klog.V(1).Infof("publishCheckpoint: skipping publish because tree hasn't grown and previous checkpoint is too recent")
1072+
outcomeAttrs = append(outcomeAttrs, errorTypeKey.String("skipped_no_growth"))
10541073
return nil
10551074
}
10561075

@@ -1065,8 +1084,11 @@ func (s *spannerCoordinator) publishCheckpoint(ctx context.Context, minStaleActi
10651084

10661085
return nil
10671086
}); err != nil {
1087+
publishCount.Add(ctx, 1, metric.WithAttributes(errorTypeKey.String("error")))
10681088
return err
10691089
}
1090+
opsHistogram.Record(ctx, time.Since(start).Milliseconds(), metric.WithAttributes(opNameKey.String("publishCheckpoint")))
1091+
publishCount.Add(ctx, 1, metric.WithAttributes(outcomeAttrs...))
10701092
return nil
10711093
}
10721094

storage/gcp/gcp_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ func TestPublishTree(t *testing.T) {
426426
t.Fatalf("storage.init: %v", err)
427427
}
428428

429-
if err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.publishCheckpoint); err != nil {
429+
if err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.updateCheckpoint); err != nil {
430430
t.Fatalf("publishTree: %v", err)
431431
}
432432
cpOld := []byte("bananas")
@@ -436,7 +436,7 @@ func TestPublishTree(t *testing.T) {
436436
updatesSeen := 0
437437
for _, d := range test.attempts {
438438
time.Sleep(d)
439-
if err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.publishCheckpoint); err != nil {
439+
if err := s.publishCheckpoint(ctx, test.publishInterval, test.republishInterval, storage.updateCheckpoint); err != nil {
440440
t.Fatalf("publishTree: %v", err)
441441
}
442442
cpNew, _, err := m.getObject(ctx, layout.CheckpointPath)
@@ -484,7 +484,7 @@ func TestGarbageCollect(t *testing.T) {
484484
if err != nil {
485485
t.Fatalf("newAppender: %v", err)
486486
}
487-
if err := appender.publishCheckpoint(ctx, 0, []byte("")); err != nil {
487+
if err := appender.updateCheckpoint(ctx, 0, []byte("")); err != nil {
488488
t.Fatalf("publishCheckpoint: %v", err)
489489
}
490490

@@ -598,7 +598,7 @@ func TestGarbageCollectOption(t *testing.T) {
598598
if err != nil {
599599
t.Fatalf("newAppender: %v", err)
600600
}
601-
if err := appender.publishCheckpoint(ctx, 0, []byte("")); err != nil {
601+
if err := appender.updateCheckpoint(ctx, 0, []byte("")); err != nil {
602602
t.Fatalf("publishCheckpoint: %v", err)
603603
}
604604

0 commit comments

Comments
 (0)