diff --git a/connectors/memory/memory.go b/connectors/memory/memory.go index ca833031..d82a2604 100644 --- a/connectors/memory/memory.go +++ b/connectors/memory/memory.go @@ -65,6 +65,8 @@ type partitionRange struct { end int } +const defaultRangeLimit = 200 + // remove deletes the values referenced by the partitionRange. Since this function modifies // the data stored in the in-memory connector, a write lock must be held when calling // this function. @@ -492,6 +494,11 @@ func (c *Connector) Range(_ context.Context, ei *dosa.EntityInfo, columnConditio partitionRange.start += offset } } + + if limit == dosa.AdaptiveRangeLimit { + limit = defaultRangeLimit + } + slice := partitionRange.values() token = "" if len(slice) > limit { diff --git a/connectors/memory/memory_test.go b/connectors/memory/memory_test.go index 9dd97703..70109089 100644 --- a/connectors/memory/memory_test.go +++ b/connectors/memory/memory_test.go @@ -665,6 +665,13 @@ func TestConnector_Range(t *testing.T) { assert.NoError(t, err) assert.Empty(t, data) assert.Empty(t, token) + + // Test with adaptive limits + data, _, _ = sut.Range(context.TODO(), clusteredEi, map[string][]*dosa.Condition{ + "f1": {{Op: dosa.Eq, Value: dosa.FieldValue("data")}}, + "c1": {{Op: dosa.Eq, Value: dosa.FieldValue(int64(1))}}, + }, dosa.All(), "", 200) + assert.Len(t, data, idcount) } func TestConnector_TUUIDs(t *testing.T) { diff --git a/connectors/random/random.go b/connectors/random/random.go index d538778e..f9a058e0 100644 --- a/connectors/random/random.go +++ b/connectors/random/random.go @@ -31,8 +31,9 @@ import ( ) const ( - maxBlobSize = 32 - maxStringSize = 64 + maxBlobSize = 32 + maxStringSize = 64 + defaultRangeLimit = 200 ) // Connector is a connector implementation for testing @@ -149,6 +150,9 @@ func (c *Connector) MultiRemove(ctx context.Context, ei *dosa.EntityInfo, multiV // Range returns a random set of data, and a random continuation token func (c *Connector) Range(ctx context.Context, ei *dosa.EntityInfo, columnConditions map[string][]*dosa.Condition, minimumFields []string, token string, limit int) ([]map[string]dosa.FieldValue, string, error) { + if limit == dosa.AdaptiveRangeLimit { + limit = defaultRangeLimit + } vals := make([]map[string]dosa.FieldValue, limit) for inx := range vals { vals[inx] = Data(ei, minimumFields) diff --git a/connectors/random/random_test.go b/connectors/random/random_test.go index 04ef04e9..1fe538d2 100644 --- a/connectors/random/random_test.go +++ b/connectors/random/random_test.go @@ -121,6 +121,12 @@ func TestRandom_Range(t *testing.T) { assert.NoError(t, err) } +func TestRandom_RangeAdaptiveLimits(t *testing.T) { + vals, _, err := sut.Range(ctx, testInfo, testConditions, minimumFields, "", dosa.AdaptiveRangeLimit) + assert.Len(t, vals, 200) + assert.NoError(t, err) +} + func TestRandom_Scan(t *testing.T) { vals, _, err := sut.Scan(ctx, testInfo, minimumFields, "", 32) assert.NotNil(t, vals) diff --git a/pager.go b/pager.go index 8b99d237..73bf730a 100644 --- a/pager.go +++ b/pager.go @@ -32,7 +32,7 @@ type pager struct { } func addLimitTokenString(w io.Writer, limit int, token string) { - if limit > 0 { + if limit == AdaptiveRangeLimit || limit > 0 { fmt.Fprintf(w, " limit %d", limit) } if token != "" { diff --git a/range.go b/range.go index bb5bc708..a89a8215 100644 --- a/range.go +++ b/range.go @@ -30,6 +30,14 @@ import ( "github.com/pkg/errors" ) +const ( + // AdaptiveRangeLimit is a sentinel value that is used to indicate an intent + // to range over data in a partition as fast as possible. The server will + // determine an appropriate limit to use to range over the partition as fast + // as possible while ensuring the server remains healthy. + AdaptiveRangeLimit = -1 +) + // RangeOp is used to specify constraints to Range calls type RangeOp struct { pager diff --git a/range_test.go b/range_test.go index 4eb6bfd1..cf1449c8 100644 --- a/range_test.go +++ b/range_test.go @@ -69,6 +69,12 @@ var rangeTestCases = []struct { stringer: " limit 10", converted: " limit 10", }, + { + descript: "empty with adaptive limit", + rop: NewRangeOp(&AllTypes{}).Limit(AdaptiveRangeLimit), + stringer: " limit -1", + converted: " limit -1", + }, { descript: "empty with token", rop: NewRangeOp(&AllTypes{}).Offset("toketoketoke"),