Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/goleak"
"io"
"math/rand"
"net/http"
Expand All @@ -23,6 +24,8 @@ import (
)

func TestV2E2E(t *testing.T) {
goleak.VerifyNone(t)

dir := t.TempDir()
totalSeries := atomic.NewInt32(0)
mut := sync.Mutex{}
Expand Down Expand Up @@ -61,8 +64,7 @@ func TestV2E2E(t *testing.T) {
q, err := prom.NewQueue("test", cc, dir, 10000, 1*time.Second, 1*time.Hour, prometheus.NewRegistry(), "alloy", log.NewLogfmtLogger(os.Stderr))

require.NoError(t, err)
go q.Start()
defer q.Stop()
go q.Start(context.Background())

metricCount := 100
sends := 2
Expand Down Expand Up @@ -96,6 +98,8 @@ func TestV2E2E(t *testing.T) {
require.Eventually(t, func() bool {
return totalSeries.Load() == int32(metricCount*sends)
}, 50*time.Second, 1*time.Second)
q.Stop()
srv.Close()
}

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
Expand Down
64 changes: 30 additions & 34 deletions filequeue/filequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,17 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/walqueue/types"
"github.com/vladopajic/go-actor/actor"
)

var _ actor.Worker = (*queue)(nil)
var _ types.FileStorage = (*queue)(nil)

// queue represents an on-disk queue. This is a list implemented as files ordered by id with a name pattern: <id>.committed
// Each file contains a byte buffer and an optional metatdata map.
type queue struct {
self actor.Actor
directory string
maxID int
logger log.Logger
dataQueue actor.Mailbox[types.Data]
dataQueue *types.Mailbox[types.Data]
// Out is where to send data when pulled from queue, it is assumed that it will
// block until ready for another record.
out func(ctx context.Context, dh types.DataHandle)
Expand Down Expand Up @@ -65,7 +62,7 @@ func NewQueue(directory string, out func(ctx context.Context, dh types.DataHandl
maxID: currentMaxID,
logger: logger,
out: out,
dataQueue: actor.NewMailbox[types.Data](),
dataQueue: types.NewMailbox[types.Data](),
files: make([]string, 0),
}

Expand All @@ -77,24 +74,22 @@ func NewQueue(directory string, out func(ctx context.Context, dh types.DataHandl
return q, nil
}

func (q *queue) Start() {
q.self = actor.New(q)
q.self.Start()
q.dataQueue.Start()
func (q *queue) Start(ctx context.Context) {
go q.run(ctx)
}

func (q *queue) Stop() {
q.self.Stop()
q.dataQueue.Stop()
q.dataQueue.Close()
}

// Store will add records to the dataQueue that will add the data to the filesystem. This is an unbuffered channel.
// Its possible in the future we would want to make it a buffer of 1, but so far it hasnt been an issue in testing.
func (q *queue) Store(ctx context.Context, meta map[string]string, data []byte) error {
return q.dataQueue.Send(ctx, types.Data{
d := types.Data{
Meta: meta,
Data: data,
})
}
return q.dataQueue.Send(ctx, d)
}

// get returns the data of the file or an error if something wrong went on.
Expand All @@ -112,8 +107,8 @@ func get(logger log.Logger, name string) (map[string]string, []byte, error) {
return r.Meta, r.Data, nil
}

// DoWork allows most of the queue to be single threaded with work only coming in and going out via mailboxes(channels).
func (q *queue) DoWork(ctx actor.Context) actor.WorkerStatus {
// run allows most of the queue to be single threaded with work only coming in and going out via mailboxes(channels).
func (q *queue) run(ctx context.Context) {
// Queue up our existing items, we cant do this earlier since the actor isnt started.
for _, name := range q.files {
q.out(ctx, types.DataHandle{
Expand All @@ -125,26 +120,27 @@ func (q *queue) DoWork(ctx actor.Context) actor.WorkerStatus {
}
// We only want to process existing files once.
q.files = nil
select {
case <-ctx.Done():
return actor.WorkerEnd
case item, ok := <-q.dataQueue.ReceiveC():
if !ok {
return actor.WorkerEnd
}
name, err := q.add(item.Meta, item.Data)
if err != nil {
level.Error(q.logger).Log("msg", "error adding item - dropping data", "err", err)
return actor.WorkerContinue
for {
select {
case <-ctx.Done():
return
case item, ok := <-q.dataQueue.Receive():
if !ok {
return
}
name, err := q.add(item.Meta, item.Data)
if err != nil {
level.Error(q.logger).Log("msg", "error adding item - dropping data", "err", err)
continue
}
// The idea is that this will callee will block/process until the callee is ready for another file.
q.out(ctx, types.DataHandle{
Name: name,
Pop: func() (map[string]string, []byte, error) {
return get(q.logger, name)
},
})
}
// The idea is that this will callee will block/process until the callee is ready for another file.
q.out(ctx, types.DataHandle{
Name: name,
Pop: func() (map[string]string, []byte, error) {
return get(q.logger, name)
},
})
return actor.WorkerContinue
}
}

Expand Down
60 changes: 27 additions & 33 deletions filequeue/filequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/go-kit/log"
"github.com/grafana/walqueue/types"
"github.com/stretchr/testify/require"
"github.com/vladopajic/go-actor/actor"
"go.uber.org/goleak"
)

Expand All @@ -20,15 +19,15 @@ func TestFileQueue(t *testing.T) {

dir := t.TempDir()
log := log.NewNopLogger()
mbx := actor.NewMailbox[types.DataHandle]()
mbx.Start()
defer mbx.Stop()
mbx := types.NewMailbox[types.DataHandle]()
defer mbx.Close()
q, err := NewQueue(dir, func(ctx context.Context, dh types.DataHandle) {
_ = mbx.Send(ctx, dh)
}, log)
require.NoError(t, err)
q.Start()
q.Start(context.Background())
defer q.Stop()

err = q.Store(context.Background(), nil, []byte("test"))

require.NoError(t, err)
Expand All @@ -43,23 +42,23 @@ func TestFileQueue(t *testing.T) {
select {
case <-timer.C:
return
case <-mbx.ReceiveC():
case <-mbx.Receive():
require.True(t, false)
}

}

func TestMetaFileQueue(t *testing.T) {
defer goleak.VerifyNone(t)

dir := t.TempDir()
log := log.NewNopLogger()
mbx := actor.NewMailbox[types.DataHandle]()
mbx.Start()
defer mbx.Stop()
mbx := types.NewMailbox[types.DataHandle]()
defer mbx.Close()
q, err := NewQueue(dir, func(ctx context.Context, dh types.DataHandle) {
_ = mbx.Send(ctx, dh)
}, log)
q.Start()
q.Start(context.Background())
defer q.Stop()
require.NoError(t, err)
err = q.Store(context.Background(), map[string]string{"name": "bob"}, []byte("test"))
Expand All @@ -77,13 +76,12 @@ func TestCorruption(t *testing.T) {

dir := t.TempDir()
log := log.NewNopLogger()
mbx := actor.NewMailbox[types.DataHandle]()
mbx.Start()
defer mbx.Stop()
mbx := types.NewMailbox[types.DataHandle]()
defer mbx.Close()
q, err := NewQueue(dir, func(ctx context.Context, dh types.DataHandle) {
_ = mbx.Send(ctx, dh)
}, log)
q.Start()
q.Start(context.Background())
defer q.Stop()
require.NoError(t, err)

Expand Down Expand Up @@ -120,13 +118,12 @@ func TestFileDeleted(t *testing.T) {

dir := t.TempDir()
log := log.NewNopLogger()
mbx := actor.NewMailbox[types.DataHandle]()
mbx.Start()
defer mbx.Stop()
mbx := types.NewMailbox[types.DataHandle]()
defer mbx.Close()
q, err := NewQueue(dir, func(ctx context.Context, dh types.DataHandle) {
_ = mbx.Send(ctx, dh)
}, log)
q.Start()
q.Start(context.Background())
defer q.Stop()
require.NoError(t, err)

Expand Down Expand Up @@ -166,13 +163,12 @@ func TestOtherFiles(t *testing.T) {

dir := t.TempDir()
log := log.NewNopLogger()
mbx := actor.NewMailbox[types.DataHandle]()
mbx.Start()
defer mbx.Stop()
mbx := types.NewMailbox[types.DataHandle]()
defer mbx.Close()
q, err := NewQueue(dir, func(ctx context.Context, dh types.DataHandle) {
_ = mbx.Send(ctx, dh)
}, log)
q.Start()
q.Start(context.Background())
defer q.Stop()
require.NoError(t, err)

Expand All @@ -189,12 +185,12 @@ func TestResuming(t *testing.T) {

dir := t.TempDir()
log := log.NewNopLogger()
mbx := actor.NewMailbox[types.DataHandle]()
mbx.Start()
mbx := types.NewMailbox[types.DataHandle]()
defer mbx.Close()
q, err := NewQueue(dir, func(ctx context.Context, dh types.DataHandle) {
_ = mbx.Send(ctx, dh)
}, log)
q.Start()
q.Start(context.Background())
require.NoError(t, err)

err = q.Store(context.Background(), nil, []byte("first"))
Expand All @@ -205,17 +201,15 @@ func TestResuming(t *testing.T) {

require.NoError(t, err)
time.Sleep(1 * time.Second)
mbx.Stop()
q.Stop()

mbx2 := actor.NewMailbox[types.DataHandle]()
mbx2.Start()
defer mbx2.Stop()
mbx2 := types.NewMailbox[types.DataHandle]()
defer mbx2.Close()
q2, err := NewQueue(dir, func(ctx context.Context, dh types.DataHandle) {
_ = mbx2.Send(ctx, dh)
}, log)
require.NoError(t, err)
q2.Start()
q2.Start(context.Background())
defer q2.Stop()
err = q2.Store(context.Background(), nil, []byte("third"))

Expand All @@ -233,14 +227,14 @@ func TestResuming(t *testing.T) {
require.True(t, string(buf) == "third")
}

func getHandle(t *testing.T, mbx actor.MailboxReceiver[types.DataHandle]) (map[string]string, []byte, error) {
timer := time.NewTicker(5 * time.Second)
func getHandle(t *testing.T, mbx *types.Mailbox[types.DataHandle]) (map[string]string, []byte, error) {
timer := time.NewTicker(50 * time.Second)
select {
case <-timer.C:
require.True(t, false)
// This is only here to satisfy the linting.
return nil, nil, nil
case item, ok := <-mbx.ReceiveC():
case item, ok := <-mbx.Receive():
require.True(t, ok)
return item.Pop()
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ require (
github.com/prometheus/prometheus v0.55.1
github.com/stretchr/testify v1.10.0
github.com/tinylib/msgp v1.2.4
github.com/vladopajic/go-actor v0.9.1-0.20241115212052-39d92aec6093
go.uber.org/atomic v1.11.0
go.uber.org/goleak v1.3.0
golang.design/x/chann v0.1.2
)

require (
Expand All @@ -26,11 +26,11 @@ require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dennwc/varint v1.0.0 // indirect
github.com/gammazero/deque v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c // indirect
Expand Down
7 changes: 3 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deneonet/benc v1.1.2 h1:JNJSnA53zVLjt4Bz1HwxG4tQg475LP+kd8rgUuV4tc4=
Expand All @@ -32,8 +33,6 @@ github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4A
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0=
github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU=
github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0=
github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4=
Expand Down Expand Up @@ -104,8 +103,6 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tinylib/msgp v1.2.4 h1:yLFeUGostXXSGW5vxfT5dXG/qzkn4schv2I7at5+hVU=
github.com/tinylib/msgp v1.2.4/go.mod h1:ykjzy2wzgrlvpDCRc4LA8UXy6D8bzMSuAF3WD57Gok0=
github.com/vladopajic/go-actor v0.9.1-0.20241115212052-39d92aec6093 h1:wZ2W9ei2PSGeeUWb0dZOnX7s+hIDLqwkRnpU30ejEpw=
github.com/vladopajic/go-actor v0.9.1-0.20241115212052-39d92aec6093/go.mod h1:qcIpcfAXGBaWKyBtzyrhEhPjlKpw4nK83rbyZONymcg=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
Expand All @@ -122,6 +119,8 @@ go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.design/x/chann v0.1.2 h1:eHF9wjuQnpp+j4ryWhyxC/pFuYzbvMAkudA/I5ALovY=
golang.design/x/chann v0.1.2/go.mod h1:Rh5KhCAp+0qu9+FfKPymHpu8onmjl89sFwMeiw3SK14=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down
Loading