Skip to content

Commit fac867d

Browse files
refactored pruning (#81)
1 parent d15901b commit fac867d

File tree

11 files changed

+84
-70
lines changed

11 files changed

+84
-70
lines changed

db/batchDeleter/batchDeleter.go

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ type Config struct {
6161

6262
type BatchDeleter struct {
6363
pruner db.Pruner
64-
deleteQueue chan []int
64+
deleteQueue chan db.RecordToDelete
6565
deleteWorkers semaphore.Interface
6666
wg sync.WaitGroup
6767
measures *Measures
@@ -100,7 +100,7 @@ func NewBatchDeleter(config Config, logger log.Logger, metricsRegistry provider.
100100

101101
measures := NewMeasures(metricsRegistry)
102102
workers := semaphore.New(config.MaxWorkers)
103-
queue := make(chan []int, config.QueueSize)
103+
queue := make(chan db.RecordToDelete, config.QueueSize)
104104
stop := make(chan struct{}, 1)
105105

106106
return &BatchDeleter{
@@ -137,25 +137,32 @@ func (d *BatchDeleter) getRecordsToDelete(ticker <-chan time.Time) {
137137
close(d.deleteQueue)
138138
return
139139
case <-ticker:
140-
vals, err := d.pruner.GetRecordIDs(d.config.Shard, d.config.GetLimit, time.Now().Unix())
140+
vals, err := d.pruner.GetRecordsToDelete(d.config.Shard, d.config.GetLimit, time.Now().Unix())
141141
if err != nil {
142142
logging.Error(d.logger, emperror.Context(err)...).Log(logging.MessageKey(),
143143
"Failed to get record IDs from the database", logging.ErrorKey(), err.Error())
144144
// just in case
145-
vals = []int{}
145+
// vals = []int{}
146146
}
147-
logging.Debug(d.logger).Log(logging.MessageKey(), "got record ids", "record ids", vals)
148-
i := 0
149-
for i < len(vals) {
150-
endVal := i + d.config.MaxBatchSize
151-
if endVal > len(vals) {
152-
endVal = len(vals)
153-
}
154-
d.deleteQueue <- vals[i:endVal]
147+
logging.Debug(d.logger).Log(logging.MessageKey(), "got records", "records", vals)
148+
// i := 0
149+
// for i < len(vals) {
150+
// endVal := i + d.config.MaxBatchSize
151+
// if endVal > len(vals) {
152+
// endVal = len(vals)
153+
// }
154+
// d.deleteQueue <- vals[i:endVal]
155+
// if d.measures != nil {
156+
// d.measures.DeletingQueue.Add(1.0)
157+
// }
158+
// i = endVal
159+
// }
160+
161+
for _, i := range vals {
162+
d.deleteQueue <- i
155163
if d.measures != nil {
156164
d.measures.DeletingQueue.Add(1.0)
157165
}
158-
i = endVal
159166
}
160167
}
161168
}
@@ -178,13 +185,13 @@ func (d *BatchDeleter) delete() {
178185
}
179186
}
180187

181-
func (d *BatchDeleter) deleteWorker(records []int) {
188+
func (d *BatchDeleter) deleteWorker(record db.RecordToDelete) {
182189
defer d.deleteWorkers.Release()
183-
err := d.pruner.PruneRecords(records)
190+
err := d.pruner.DeleteRecord(d.config.Shard, record.DeathDate, record.RecordID)
184191
if err != nil {
185192
logging.Error(d.logger, emperror.Context(err)...).Log(logging.MessageKey(),
186193
"Failed to delete records from the database", logging.ErrorKey(), err.Error())
187194
return
188195
}
189-
logging.Info(d.logger).Log(logging.MessageKey(), "Successfully deleted records", "records", records)
196+
logging.Debug(d.logger).Log(logging.MessageKey(), "Successfully deleted record", "record id", record.RecordID)
190197
}

db/batchDeleter/batchDeleter_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,10 @@ func TestNewBatchDeleter(t *testing.T) {
122122

123123
func TestGetRecordsToDeleteSuccess(t *testing.T) {
124124
assert := assert.New(t)
125-
vals := []int{1, 2, 3, 4, 5}
125+
vals := []db.RecordToDelete{{DeathDate: 1, RecordID: 2}, {DeathDate: 3, RecordID: 4}}
126126
pruner := new(mockPruner)
127-
pruner.On("GetRecordIDs", mock.Anything, mock.Anything, mock.Anything).Return(vals, nil).Once()
128-
queue := make(chan []int, 2)
127+
pruner.On("GetRecordsToDelete", mock.Anything, mock.Anything, mock.Anything).Return(vals, nil).Once()
128+
queue := make(chan db.RecordToDelete, 2)
129129
tickerChan := make(chan time.Time, 1)
130130
stopChan := make(chan struct{}, 1)
131131
p := xmetricstest.NewProvider(nil, Metrics)
@@ -164,8 +164,8 @@ func TestGetRecordsToDeleteSuccess(t *testing.T) {
164164
func TestGetRecordsToDeleteError(t *testing.T) {
165165
assert := assert.New(t)
166166
pruner := new(mockPruner)
167-
pruner.On("GetRecordIDs", mock.Anything, mock.Anything, mock.Anything).Return([]int{}, errors.New("test error")).Once()
168-
queue := make(chan []int, 2)
167+
pruner.On("GetRecordsToDelete", mock.Anything, mock.Anything, mock.Anything).Return([]db.RecordToDelete{}, errors.New("test error")).Once()
168+
queue := make(chan db.RecordToDelete, 2)
169169
tickerChan := make(chan time.Time, 1)
170170
stopChan := make(chan struct{}, 1)
171171
p := xmetricstest.NewProvider(nil, Metrics)
@@ -203,10 +203,10 @@ func TestGetRecordsToDeleteError(t *testing.T) {
203203

204204
func TestDelete(t *testing.T) {
205205
assert := assert.New(t)
206-
vals := []int{4, 2}
206+
vals := db.RecordToDelete{DeathDate: 111, RecordID: 88888}
207207
pruner := new(mockPruner)
208-
pruner.On("PruneRecords", vals).Return(nil).Once()
209-
queue := make(chan []int, 2)
208+
pruner.On("DeleteRecord", 0, vals.DeathDate, vals.RecordID).Return(nil).Once()
209+
queue := make(chan db.RecordToDelete, 2)
210210
p := xmetricstest.NewProvider(nil, Metrics)
211211
measures := NewMeasures(p)
212212

db/batchDeleter/mocks_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,20 @@
1818
package batchDeleter
1919

2020
import (
21+
"github.com/Comcast/codex/db"
2122
"github.com/stretchr/testify/mock"
2223
)
2324

2425
type mockPruner struct {
2526
mock.Mock
2627
}
2728

28-
func (p *mockPruner) GetRecordIDs(shard int, limit int, deathDate int64) ([]int, error) {
29+
func (p *mockPruner) GetRecordsToDelete(shard int, limit int, deathDate int64) ([]db.RecordToDelete, error) {
2930
args := p.Called(shard, limit, deathDate)
30-
return args.Get(0).([]int), args.Error(1)
31+
return args.Get(0).([]db.RecordToDelete), args.Error(1)
3132
}
3233

33-
func (p *mockPruner) PruneRecords(records []int) error {
34-
args := p.Called(records)
34+
func (p *mockPruner) DeleteRecord(shard int, deathdate int64, recordID int64) error {
35+
args := p.Called(shard, deathdate, recordID)
3536
return args.Error(0)
3637
}

db/db.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ type Record struct {
2020
KID string `json:"kid" bson:"kid" gorm:"Column:kid"`
2121
}
2222

23+
type RecordToDelete struct {
24+
DeathDate int64 `json:"deathdate" bson:"deathdate"`
25+
RecordID int64 `json:"recordid" bson:"recordid"`
26+
}
27+
2328
// set Record's table name to be `events`
2429
func (Record) TableName() string {
2530
return "events"
@@ -30,8 +35,9 @@ type Inserter interface {
3035
}
3136

3237
type Pruner interface {
33-
GetRecordIDs(shard int, limit int, deathDate int64) ([]int, error)
34-
PruneRecords(records []int) error
38+
GetRecordsToDelete(shard int, limit int, deathDate int64) ([]RecordToDelete, error)
39+
// PruneRecords(records []int) error
40+
DeleteRecord(shard int, deathdate int64, recordID int64) error
3541
}
3642

3743
type RecordGetter interface {

db/postgresql/db.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -272,15 +272,15 @@ func (c *Connection) GetRecordsOfType(deviceID string, limit int, eventType db.E
272272
return deviceInfo, nil
273273
}
274274

275-
func (c *Connection) GetRecordIDs(shard int, limit int, deathDate int64) ([]int, error) {
276-
ids, err := c.finder.findRecordIDs(limit, shard, deathDate)
275+
func (c *Connection) GetRecordsToDelete(shard int, limit int, deathDate int64) ([]db.RecordToDelete, error) {
276+
recordsToDelete, err := c.finder.findRecordsToDelete(limit, shard, deathDate)
277277
if err != nil {
278278
c.measures.SQLQueryFailureCount.With(db.TypeLabel, db.ReadType).Add(1.0)
279-
return []int{}, emperror.WrapWith(err, "Getting record IDs from database failed", "shard", shard, "death date", deathDate)
279+
return []db.RecordToDelete{}, emperror.WrapWith(err, "Getting record IDs from database failed", "shard", shard, "death date", deathDate)
280280
}
281-
c.measures.SQLReadRecords.Add(float64(len(ids)))
281+
c.measures.SQLReadRecords.Add(float64(len(recordsToDelete)))
282282
c.measures.SQLQuerySuccessCount.With(db.TypeLabel, db.ReadType).Add(1.0)
283-
return ids, nil
283+
return recordsToDelete, nil
284284
}
285285

286286
// GetBlacklist returns a list of blacklisted devices
@@ -294,13 +294,13 @@ func (c *Connection) GetBlacklist() (list []blacklist.BlackListedItem, err error
294294
return
295295
}
296296

297-
// PruneRecords removes records past their deathdate.
298-
func (c *Connection) PruneRecords(records []int) error {
299-
rowsAffected, err := c.deleter.delete(&db.Record{}, len(records), "record_id IN (?)", records)
297+
// DeleteRecord removes a record.
298+
func (c *Connection) DeleteRecord(shard int, deathDate int64, recordID int64) error {
299+
rowsAffected, err := c.deleter.delete(&db.Record{}, 1, "shard = ? AND death_date = ? AND record_id = ?", shard, deathDate, recordID)
300300
c.measures.SQLDeletedRecords.Add(float64(rowsAffected))
301301
if err != nil {
302302
c.measures.SQLQueryFailureCount.With(db.TypeLabel, db.DeleteType).Add(1.0)
303-
return emperror.WrapWith(err, "Prune records failed", "record ids", records)
303+
return emperror.WrapWith(err, "Prune records failed", "record id", recordID)
304304
}
305305
c.measures.SQLQuerySuccessCount.With(db.TypeLabel, db.DeleteType).Add(1.0)
306306
return nil

db/postgresql/db_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func TestGetRecordIDs(t *testing.T) {
177177
tests := []struct {
178178
description string
179179
deviceID string
180-
expectedRecordIDs []int
180+
expectedRecords []db.RecordToDelete
181181
expectedSuccessMetric float64
182182
expectedFailureMetric float64
183183
expectedErr error
@@ -186,15 +186,15 @@ func TestGetRecordIDs(t *testing.T) {
186186
{
187187
description: "Success",
188188
deviceID: "1234",
189-
expectedRecordIDs: []int{12345},
189+
expectedRecords: []db.RecordToDelete{{DeathDate: 222, RecordID: 12345}},
190190
expectedSuccessMetric: 1.0,
191191
expectedErr: nil,
192192
expectedCalls: 1,
193193
},
194194
{
195195
description: "Get Error",
196196
deviceID: "1234",
197-
expectedRecordIDs: []int{},
197+
expectedRecords: []db.RecordToDelete{},
198198
expectedFailureMetric: 1.0,
199199
expectedErr: errors.New("test Get error"),
200200
expectedCalls: 1,
@@ -212,12 +212,12 @@ func TestGetRecordIDs(t *testing.T) {
212212
finder: mockObj,
213213
}
214214
if tc.expectedCalls > 0 {
215-
mockObj.On("findRecordIDs", mock.Anything, mock.Anything, mock.Anything).Return(tc.expectedRecordIDs, tc.expectedErr).Times(tc.expectedCalls)
215+
mockObj.On("findRecordsToDelete", mock.Anything, mock.Anything, mock.Anything).Return(tc.expectedRecords, tc.expectedErr).Times(tc.expectedCalls)
216216
}
217217
p.Assert(t, SQLQuerySuccessCounter)(xmetricstest.Value(0.0))
218218
p.Assert(t, SQLQueryFailureCounter)(xmetricstest.Value(0.0))
219219

220-
records, err := dbConnection.GetRecordIDs(0, 0, time.Now().Unix())
220+
records, err := dbConnection.GetRecordsToDelete(0, 0, time.Now().Unix())
221221
mockObj.AssertExpectations(t)
222222
p.Assert(t, SQLQuerySuccessCounter, db.TypeLabel, db.ReadType)(xmetricstest.Value(tc.expectedSuccessMetric))
223223
p.Assert(t, SQLQueryFailureCounter, db.TypeLabel, db.ReadType)(xmetricstest.Value(tc.expectedFailureMetric))
@@ -226,7 +226,7 @@ func TestGetRecordIDs(t *testing.T) {
226226
} else {
227227
assert.Contains(err.Error(), tc.expectedErr.Error())
228228
}
229-
assert.Equal(tc.expectedRecordIDs, records)
229+
assert.Equal(tc.expectedRecords, records)
230230
})
231231
}
232232
}
@@ -269,7 +269,7 @@ func TestPruneRecords(t *testing.T) {
269269
p.Assert(t, SQLQueryFailureCounter)(xmetricstest.Value(0.0))
270270
p.Assert(t, SQLDeletedRecordsCounter)(xmetricstest.Value(0.0))
271271

272-
err := dbConnection.PruneRecords([]int{3, 5})
272+
err := dbConnection.DeleteRecord(0, 0, 0)
273273
mockObj.AssertExpectations(t)
274274
p.Assert(t, SQLQuerySuccessCounter, db.TypeLabel, db.DeleteType)(xmetricstest.Value(tc.expectedSuccessMetric))
275275
p.Assert(t, SQLQueryFailureCounter, db.TypeLabel, db.DeleteType)(xmetricstest.Value(tc.expectedFailureMetric))
@@ -480,9 +480,9 @@ func TestImplementsInterfaces(t *testing.T) {
480480
assert := assert.New(t)
481481
dbConn = &Connection{}
482482
_, ok := dbConn.(db.Inserter)
483-
assert.True(ok)
483+
assert.True(ok, "not an inserter")
484484
_, ok = dbConn.(db.Pruner)
485-
assert.True(ok)
485+
assert.True(ok, "not a pruner")
486486
_, ok = dbConn.(db.RecordGetter)
487-
assert.True(ok)
487+
assert.True(ok, "not an record getter")
488488
}

db/postgresql/executer.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import (
3434
type (
3535
finder interface {
3636
findRecords(out *[]db.Record, limit int, where ...interface{}) error
37-
findRecordIDs(limit int, shard int, deathDate int64) ([]int, error)
37+
findRecordsToDelete(limit int, shard int, deathDate int64) ([]db.RecordToDelete, error)
3838
}
3939
findList interface {
4040
findBlacklist(out *[]blacklist.BlackListedItem) error
@@ -65,11 +65,11 @@ func (b *dbDecorator) findRecords(out *[]db.Record, limit int, where ...interfac
6565
return db.Error
6666
}
6767

68-
func (b *dbDecorator) findRecordIDs(limit int, shard int, deathDate int64) ([]int, error) {
68+
func (b *dbDecorator) findRecordsToDelete(limit int, shard int, deathDate int64) ([]db.RecordToDelete, error) {
6969
var (
70-
out []int
70+
out []db.RecordToDelete
7171
)
72-
db := b.Raw("SELECT record_id from devices.events WHERE shard = ? AND death_date < ? LIMIT ?", shard, deathDate, limit).Pluck("record_id", &out)
72+
db := b.Raw("SELECT death_date, record_id from devices.events WHERE shard = ? AND death_date < ? LIMIT ?", shard, deathDate, limit).Scan(&out)
7373
// db := b.Order("birth_date desc").Limit(limit).Find(&records, where...).Pluck("record_id", out)
7474
return out, db.Error
7575
}

db/postgresql/mocks_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ func (f *mockFinder) findRecords(out *[]db.Record, limit int, where ...interface
3737
return args.Error(0)
3838
}
3939

40-
func (f *mockFinder) findRecordIDs(limit int, shard int, deathDate int64) ([]int, error) {
40+
func (f *mockFinder) findRecordsToDelete(limit int, shard int, deathDate int64) ([]db.RecordToDelete, error) {
4141
args := f.Called(limit, shard, deathDate)
42-
return args.Get(0).([]int), args.Error(1)
42+
return args.Get(0).([]db.RecordToDelete), args.Error(1)
4343
}
4444

4545
type mockMultiInsert struct {

db/retry/retry.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,10 @@ type RetryUpdateService struct {
138138
config retryConfig
139139
}
140140

141-
func (ru RetryUpdateService) GetRecordIDs(shard int, limit int, deathDate int64) ([]int, error) {
141+
func (ru RetryUpdateService) GetRecordsToDelete(shard int, limit int, deathDate int64) ([]db.RecordToDelete, error) {
142142
var (
143143
err error
144-
recordIDs []int
144+
recordIDs []db.RecordToDelete
145145
)
146146

147147
retries := ru.config.retries
@@ -156,7 +156,7 @@ func (ru RetryUpdateService) GetRecordIDs(shard int, limit int, deathDate int64)
156156
ru.config.sleep(sleepTime)
157157
sleepTime = sleepTime * ru.config.intervalMult
158158
}
159-
if recordIDs, err = ru.pruner.GetRecordIDs(shard, limit, deathDate); err == nil {
159+
if recordIDs, err = ru.pruner.GetRecordsToDelete(shard, limit, deathDate); err == nil {
160160
break
161161
}
162162
}
@@ -165,7 +165,7 @@ func (ru RetryUpdateService) GetRecordIDs(shard int, limit int, deathDate int64)
165165
return recordIDs, err
166166
}
167167

168-
func (ru RetryUpdateService) PruneRecords(records []int) error {
168+
func (ru RetryUpdateService) DeleteRecord(shard int, deathdate int64, recordID int64) error {
169169
var err error
170170

171171
retries := ru.config.retries
@@ -180,7 +180,7 @@ func (ru RetryUpdateService) PruneRecords(records []int) error {
180180
ru.config.sleep(sleepTime)
181181
sleepTime = sleepTime * ru.config.intervalMult
182182
}
183-
if err = ru.pruner.PruneRecords(records); err == nil {
183+
if err = ru.pruner.DeleteRecord(shard, deathdate, recordID); err == nil {
184184
break
185185
}
186186
}

db/retry/retry_mocks_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,13 @@ type mockPruner struct {
3636
mock.Mock
3737
}
3838

39-
func (p *mockPruner) GetRecordIDs(shard int, limit int, deathDate int64) ([]int, error) {
39+
func (p *mockPruner) GetRecordsToDelete(shard int, limit int, deathDate int64) ([]db.RecordToDelete, error) {
4040
args := p.Called(shard, limit, deathDate)
41-
return args.Get(0).([]int), args.Error(1)
41+
return args.Get(0).([]db.RecordToDelete), args.Error(1)
4242
}
4343

44-
func (p *mockPruner) PruneRecords(records []int) error {
45-
args := p.Called(records)
44+
func (p *mockPruner) DeleteRecord(shard int, deathdate int64, recordID int64) error {
45+
args := p.Called(shard, deathdate, recordID)
4646
return args.Error(0)
4747
}
4848

0 commit comments

Comments
 (0)