diff --git a/.gitignore b/.gitignore index 485dee64..85b5d5ef 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ +/.rocksdb-repo .idea diff --git a/.travis.yml b/.travis.yml index 9b331467..daa33497 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,7 @@ -dist: xenial language: go go: - - 1.12.x - - 1.13.x + - '1.10' + - 1.11 - tip before_install: @@ -17,8 +16,9 @@ before_install: - sudo dpkg -i libgflags-dev_2.0-1.1ubuntu1_amd64.deb install: - - git clone https://github.com/facebook/rocksdb.git /tmp/rocksdb + - git clone https://github.com/GetStream/rocksdb.git /tmp/rocksdb - pushd /tmp/rocksdb + - git checkout add-lower-priority-c-bindings - make clean - make shared_lib -j`nproc` - sudo cp --preserve=links ./librocksdb.* /usr/lib/ @@ -27,7 +27,7 @@ install: - go get -t ./... script: - - go test -v ./ + - GODEBUG=cgocheck=2 go test -v ./ notifications: email: diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..1fba90b4 --- /dev/null +++ b/Makefile @@ -0,0 +1,7 @@ +.PHONY: docker-clean +docker-clean: + @docker compose down -v --remove-orphans + +.PHONY: docker-test +docker-test: + @docker compose build test && docker compose run --rm -e GODEBUG=cgocheck=2 test go test -race=1 -v ./... diff --git a/backup.go b/backup.go index 87621dd9..a6673ff8 100644 --- a/backup.go +++ b/backup.go @@ -89,7 +89,7 @@ func OpenBackupEngine(opts *Options, path string) (*BackupEngine, error) { be := C.rocksdb_backup_engine_open(opts.c, cpath, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return &BackupEngine{ @@ -104,25 +104,19 @@ func (b *BackupEngine) UnsafeGetBackupEngine() unsafe.Pointer { return unsafe.Pointer(b.c) } -// CreateNewBackupFlush takes a new backup from db. If flush is set to true, -// it flushes the WAL before taking the backup. -func (b *BackupEngine) CreateNewBackupFlush(db *DB, flush bool) error { +// CreateNewBackup takes a new backup from db. +func (b *BackupEngine) CreateNewBackup(db *DB) error { var cErr *C.char - C.rocksdb_backup_engine_create_new_backup_flush(b.c, db.c, boolToChar(flush), &cErr) + C.rocksdb_backup_engine_create_new_backup(b.c, db.c, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil } -// CreateNewBackup takes a new backup from db. -func (b *BackupEngine) CreateNewBackup(db *DB) error { - return b.CreateNewBackupFlush(db, false) -} - // GetInfo gets an object that gives information about // the backups that have already been taken func (b *BackupEngine) GetInfo() *BackupEngineInfo { @@ -144,18 +138,7 @@ func (b *BackupEngine) RestoreDBFromLatestBackup(dbDir, walDir string, ro *Resto C.rocksdb_backup_engine_restore_db_from_latest_backup(b.c, cDbDir, cWalDir, ro.c, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) - return errors.New(C.GoString(cErr)) - } - return nil -} - -// PurgeOldBackups deletes all backups older than the latest 'n' backups -func (b *BackupEngine) PurgeOldBackups(n uint32) error { - var cErr *C.char - C.rocksdb_backup_engine_purge_old_backups(b.c, C.uint32_t(n), &cErr) - if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil diff --git a/cache.go b/cache.go index 866326dc..ed708d7f 100644 --- a/cache.go +++ b/cache.go @@ -9,7 +9,7 @@ type Cache struct { } // NewLRUCache creates a new LRU Cache object with the capacity given. -func NewLRUCache(capacity uint64) *Cache { +func NewLRUCache(capacity int) *Cache { return NewNativeCache(C.rocksdb_cache_create_lru(C.size_t(capacity))) } @@ -19,13 +19,13 @@ func NewNativeCache(c *C.rocksdb_cache_t) *Cache { } // GetUsage returns the Cache memory usage. -func (c *Cache) GetUsage() uint64 { - return uint64(C.rocksdb_cache_get_usage(c.c)) +func (c *Cache) GetUsage() int { + return int(C.rocksdb_cache_get_usage(c.c)) } // GetPinnedUsage returns the Cache pinned memory usage. -func (c *Cache) GetPinnedUsage() uint64 { - return uint64(C.rocksdb_cache_get_pinned_usage(c.c)) +func (c *Cache) GetPinnedUsage() int { + return int(C.rocksdb_cache_get_pinned_usage(c.c)) } // Destroy deallocates the Cache object. diff --git a/checkpoint.go b/checkpoint.go index 4a6436d2..a7d2bf40 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -43,7 +43,7 @@ func (checkpoint *Checkpoint) CreateCheckpoint(checkpoint_dir string, log_size_f C.rocksdb_checkpoint_create(checkpoint.c, cDir, C.uint64_t(log_size_for_flush), &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil diff --git a/checkpoint_test.go b/checkpoint_test.go index 1ea10fdb..9505740d 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -1,11 +1,10 @@ package gorocksdb import ( + "github.com/facebookgo/ensure" "io/ioutil" "os" "testing" - - "github.com/facebookgo/ensure" ) func TestCheckpoint(t *testing.T) { diff --git a/compaction_filter_test.go b/compaction_filter_test.go index 1dfcd63e..2735cf16 100644 --- a/compaction_filter_test.go +++ b/compaction_filter_test.go @@ -11,9 +11,12 @@ func TestCompactionFilter(t *testing.T) { var ( changeKey = []byte("change") changeValOld = []byte("old") - changeValNew = []byte("new") + changeValNew = cBackedBytes([]byte("new")) deleteKey = []byte("delete") ) + + defer freeCBackedBytes(changeValNew) + db := newTestDB(t, "TestCompactionFilter", func(opts *Options) { opts.SetCompactionFilter(&mockCompactionFilter{ filter: func(level int, key, val []byte) (remove bool, newVal []byte) { diff --git a/db.go b/db.go old mode 100755 new mode 100644 index 64735c61..d8b7bddb --- a/db.go +++ b/db.go @@ -32,7 +32,7 @@ func OpenDb(opts *Options, name string) (*DB, error) { defer C.free(unsafe.Pointer(cName)) db := C.rocksdb_open(opts.c, cName, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return &DB{ @@ -51,7 +51,7 @@ func OpenDbWithTTL(opts *Options, name string, ttl int) (*DB, error) { defer C.free(unsafe.Pointer(cName)) db := C.rocksdb_open_with_ttl(opts.c, cName, C.int(ttl), &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return &DB{ @@ -70,7 +70,7 @@ func OpenDbForReadOnly(opts *Options, name string, errorIfLogFileExist bool) (*D defer C.free(unsafe.Pointer(cName)) db := C.rocksdb_open_for_read_only(opts.c, cName, boolToChar(errorIfLogFileExist), &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return &DB{ @@ -123,7 +123,7 @@ func OpenDbColumnFamilies( &cErr, ) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return nil, nil, errors.New(C.GoString(cErr)) } @@ -185,7 +185,7 @@ func OpenDbForReadOnlyColumnFamilies( &cErr, ) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return nil, nil, errors.New(C.GoString(cErr)) } @@ -211,15 +211,12 @@ func ListColumnFamilies(opts *Options, name string) ([]string, error) { defer C.free(unsafe.Pointer(cName)) cNames := C.rocksdb_list_column_families(opts.c, cName, &cLen, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } namesLen := int(cLen) names := make([]string, namesLen) - // The maximum capacity of the following two slices is limited to (2^29)-1 to remain compatible - // with 32-bit platforms. The size of a `*C.char` (a pointer) is 4 Byte on a 32-bit system - // and (2^29)*4 == math.MaxInt32 + 1. -- See issue golang/go#13656 - cNamesArr := (*[(1 << 29) - 1]*C.char)(unsafe.Pointer(cNames))[:namesLen:namesLen] + cNamesArr := (*[1 << 30]*C.char)(unsafe.Pointer(cNames))[:namesLen:namesLen] for i, n := range cNamesArr { names[i] = C.GoString(n) } @@ -246,7 +243,7 @@ func (db *DB) Get(opts *ReadOptions, key []byte) (*Slice, error) { ) cValue := C.rocksdb_get(db.c, opts.c, cKey, C.size_t(len(key)), &cValLen, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return NewSlice(cValue, cValLen), nil @@ -261,13 +258,13 @@ func (db *DB) GetBytes(opts *ReadOptions, key []byte) ([]byte, error) { ) cValue := C.rocksdb_get(db.c, opts.c, cKey, C.size_t(len(key)), &cValLen, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } if cValue == nil { return nil, nil } - defer C.rocksdb_free(unsafe.Pointer(cValue)) + defer C.free(unsafe.Pointer(cValue)) return C.GoBytes(unsafe.Pointer(cValue), C.int(cValLen)), nil } @@ -280,7 +277,7 @@ func (db *DB) GetCF(opts *ReadOptions, cf *ColumnFamilyHandle, key []byte) (*Sli ) cValue := C.rocksdb_get_cf(db.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cValLen, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return NewSlice(cValue, cValLen), nil @@ -294,7 +291,7 @@ func (db *DB) GetPinned(opts *ReadOptions, key []byte) (*PinnableSliceHandle, er ) cHandle := C.rocksdb_get_pinned(db.c, opts.c, cKey, C.size_t(len(key)), &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return NewNativePinnableSliceHandle(cHandle), nil @@ -323,7 +320,7 @@ func (db *DB) MultiGet(opts *ReadOptions, keys ...[]byte) (Slices, error) { for i, rocksErr := range rocksErrs { if rocksErr != nil { - defer C.rocksdb_free(unsafe.Pointer(rocksErr)) + defer C.free(unsafe.Pointer(rocksErr)) err := fmt.Errorf("getting %q failed: %v", string(keys[i]), C.GoString(rocksErr)) errs = append(errs, err) } @@ -375,7 +372,7 @@ func (db *DB) MultiGetCFMultiCF(opts *ReadOptions, cfs ColumnFamilyHandles, keys for i, rocksErr := range rocksErrs { if rocksErr != nil { - defer C.rocksdb_free(unsafe.Pointer(rocksErr)) + defer C.free(unsafe.Pointer(rocksErr)) err := fmt.Errorf("getting %q failed: %v", string(keys[i]), C.GoString(rocksErr)) errs = append(errs, err) } @@ -402,7 +399,7 @@ func (db *DB) Put(opts *WriteOptions, key, value []byte) error { ) C.rocksdb_put(db.c, opts.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -417,7 +414,7 @@ func (db *DB) PutCF(opts *WriteOptions, cf *ColumnFamilyHandle, key, value []byt ) C.rocksdb_put_cf(db.c, opts.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -431,7 +428,7 @@ func (db *DB) Delete(opts *WriteOptions, key []byte) error { ) C.rocksdb_delete(db.c, opts.c, cKey, C.size_t(len(key)), &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -445,7 +442,7 @@ func (db *DB) DeleteCF(opts *WriteOptions, cf *ColumnFamilyHandle, key []byte) e ) C.rocksdb_delete_cf(db.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -460,7 +457,7 @@ func (db *DB) Merge(opts *WriteOptions, key []byte, value []byte) error { ) C.rocksdb_merge(db.c, opts.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -476,7 +473,7 @@ func (db *DB) MergeCF(opts *WriteOptions, cf *ColumnFamilyHandle, key []byte, va ) C.rocksdb_merge_cf(db.c, opts.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -487,7 +484,7 @@ func (db *DB) Write(opts *WriteOptions, batch *WriteBatch) error { var cErr *C.char C.rocksdb_write(db.c, opts.c, batch.c, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -507,20 +504,6 @@ func (db *DB) NewIteratorCF(opts *ReadOptions, cf *ColumnFamilyHandle) *Iterator return NewNativeIterator(unsafe.Pointer(cIter)) } -func (db *DB) GetUpdatesSince(seqNumber uint64) (*WalIterator, error) { - var cErr *C.char - cIter := C.rocksdb_get_updates_since(db.c, C.uint64_t(seqNumber), nil, &cErr) - if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) - return nil, errors.New(C.GoString(cErr)) - } - return NewNativeWalIterator(unsafe.Pointer(cIter)), nil -} - -func (db *DB) GetLatestSequenceNumber() uint64 { - return uint64(C.rocksdb_get_latest_sequence_number(db.c)) -} - // NewSnapshot creates a new snapshot of the database. func (db *DB) NewSnapshot() *Snapshot { cSnap := C.rocksdb_create_snapshot(db.c) @@ -538,7 +521,7 @@ func (db *DB) GetProperty(propName string) string { cprop := C.CString(propName) defer C.free(unsafe.Pointer(cprop)) cValue := C.rocksdb_property_value(db.c, cprop) - defer C.rocksdb_free(unsafe.Pointer(cValue)) + defer C.free(unsafe.Pointer(cValue)) return C.GoString(cValue) } @@ -547,7 +530,7 @@ func (db *DB) GetPropertyCF(propName string, cf *ColumnFamilyHandle) string { cProp := C.CString(propName) defer C.free(unsafe.Pointer(cProp)) cValue := C.rocksdb_property_value_cf(db.c, cf.c, cProp) - defer C.rocksdb_free(unsafe.Pointer(cValue)) + defer C.free(unsafe.Pointer(cValue)) return C.GoString(cValue) } @@ -560,7 +543,7 @@ func (db *DB) CreateColumnFamily(opts *Options, name string) (*ColumnFamilyHandl defer C.free(unsafe.Pointer(cName)) cHandle := C.rocksdb_create_column_family(db.c, opts.c, cName, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return NewNativeColumnFamilyHandle(cHandle), nil @@ -571,7 +554,7 @@ func (db *DB) DropColumnFamily(c *ColumnFamilyHandle) error { var cErr *C.char C.rocksdb_drop_column_family(db.c, c.c, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -613,7 +596,8 @@ func (db *DB) GetApproximateSizes(ranges []Range) []uint64 { &cStartLens[0], &cLimits[0], &cLimitLens[0], - (*C.uint64_t)(&sizes[0])) + (*C.uint64_t)(&sizes[0]), + nil) return sizes } @@ -655,7 +639,8 @@ func (db *DB) GetApproximateSizesCF(cf *ColumnFamilyHandle, ranges []Range) []ui &cStartLens[0], &cLimits[0], &cLimitLens[0], - (*C.uint64_t)(&sizes[0])) + (*C.uint64_t)(&sizes[0]), + nil) return sizes } @@ -685,7 +670,7 @@ func (db *DB) SetOptions(keys, values []string) error { &cErr, ) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -746,7 +731,7 @@ func (db *DB) Flush(opts *FlushOptions) error { var cErr *C.char C.rocksdb_flush(db.c, opts.c, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -757,7 +742,7 @@ func (db *DB) DisableFileDeletions() error { var cErr *C.char C.rocksdb_disable_file_deletions(db.c, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -766,9 +751,9 @@ func (db *DB) DisableFileDeletions() error { // EnableFileDeletions enables file deletions for the database. func (db *DB) EnableFileDeletions(force bool) error { var cErr *C.char - C.rocksdb_enable_file_deletions(db.c, boolToChar(force), &cErr) + C.rocksdb_enable_file_deletions(db.c, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -780,10 +765,10 @@ func (db *DB) EnableFileDeletions(force bool) error { func (db *DB) DeleteFile(name string) { cName := C.CString(name) defer C.free(unsafe.Pointer(cName)) - C.rocksdb_delete_file(db.c, cName) + //C.rocksdb_delete_file(db.c, cName) } -// DeleteFileInRange deletes SST files that contain keys between the Range, [r.Start, r.Limit] +// DeleteFileInRange deletes SST files that contain keys between the Range, [r.Start, limitKey] func (db *DB) DeleteFileInRange(r Range) error { cStartKey := byteToChar(r.Start) cLimitKey := byteToChar(r.Limit) @@ -798,7 +783,7 @@ func (db *DB) DeleteFileInRange(r Range) error { ) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -821,7 +806,7 @@ func (db *DB) DeleteFileInRangeCF(cf *ColumnFamilyHandle, r Range) error { ) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -850,7 +835,7 @@ func (db *DB) IngestExternalFile(filePaths []string, opts *IngestExternalFileOpt ) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -880,7 +865,7 @@ func (db *DB) IngestExternalFileCF(handle *ColumnFamilyHandle, filePaths []strin ) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -895,7 +880,7 @@ func (db *DB) NewCheckpoint() (*Checkpoint, error) { db.c, &cErr, ) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } @@ -917,7 +902,7 @@ func DestroyDb(name string, opts *Options) error { defer C.free(unsafe.Pointer(cName)) C.rocksdb_destroy_db(opts.c, cName, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -932,7 +917,7 @@ func RepairDb(name string, opts *Options) error { defer C.free(unsafe.Pointer(cName)) C.rocksdb_repair_db(opts.c, cName, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil diff --git a/db_test.go b/db_test.go old mode 100755 new mode 100644 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..5d22b9a8 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,24 @@ +version: "3" + +services: + test: + build: + context: . + dockerfile: docker/Dockerfile.test + image: test.gorocksdb.getstream.io + container_name: test.gorocksdb.getstream.io + profiles: + - test + command: + - go + - test + - -v + volumes: + - ~/.ssh:/root/.ssh:ro + - rocksdb.gorocksdb.getstream.io:/rocksdb + - gocache.gorocksdb.getstream.io:/root/.cache/go-build + - gopath.gorocksdb.getstream.io:/go +volumes: + rocksdb.gorocksdb.getstream.io: + gocache.gorocksdb.getstream.io: + gopath.gorocksdb.getstream.io: diff --git a/docker/Dockerfile.test b/docker/Dockerfile.test new file mode 100644 index 00000000..0c363065 --- /dev/null +++ b/docker/Dockerfile.test @@ -0,0 +1,19 @@ +FROM golang:1.16.7-buster + +ENV GOBIN /go/bin + +RUN apt-get update && apt-get install -y openssh-client libgflags-dev libsnappy-dev zlib1g-dev libbz2-dev libzstd-dev \ + liblz4-dev git-core curl wget perl bash g++ build-essential unzip + +RUN mkdir -p ~/.ssh/ && ssh-keyscan github.com > ~/.ssh/known_hosts +RUN git config --global url."git@github.com:".insteadOf "https://github.com/" + +ENV CGO_CFLAGS="-I/rocksdb/include" +ENV CGO_LDFLAGS="-L/rocksdb -lrocksdb -lstdc++ -lm -lz -lbz2 -lsnappy -llz4 -lzstd" + +ADD . /gorocksdb +WORKDIR /gorocksdb + +RUN chmod +x /gorocksdb/docker/entrypoint.sh + +ENTRYPOINT ["/gorocksdb/docker/entrypoint.sh"] \ No newline at end of file diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh new file mode 100644 index 00000000..34be2eda --- /dev/null +++ b/docker/entrypoint.sh @@ -0,0 +1,17 @@ +#!/bin/sh + +set -e + +if [ ! -f /rocksdb/librocksdb.a ] +then + rm -rf /rocksdb/* + + git clone git@github.com:GetStream/rocksdb.git /rocksdb && \ + cd /rocksdb && \ + git checkout broadwell && \ + DISABLE_JEMALLOC=1 make static_lib -j5 +fi + +cd /gorocksdb + +exec "$@" \ No newline at end of file diff --git a/dynflag.go b/dynflag.go index 18c18f40..81fe93b5 100644 --- a/dynflag.go +++ b/dynflag.go @@ -1,6 +1,8 @@ -// +build !linux !static +//go:build !embed +// +build !embed package gorocksdb -// #cgo LDFLAGS: -lrocksdb -lstdc++ -lm -ldl +// #cgo LDFLAGS: -L/Users/aditya/code/Keevo/.rocksdb-repo -lrocksdb -lstdc++ -lm -lz -lbz2 -lsnappy -llz4 -lzstd +// #cgo CFLAGS: -I/Users/aditya/code/Keevo/.rocksdb-repo/include import "C" diff --git a/env.go b/env.go index 11e84ef8..dfdd583a 100644 --- a/env.go +++ b/env.go @@ -13,11 +13,6 @@ func NewDefaultEnv() *Env { return NewNativeEnv(C.rocksdb_create_default_env()) } -// NewMemEnv creates MemEnv for in-memory testing. -func NewMemEnv() *Env { - return NewNativeEnv(C.rocksdb_create_mem_env()) -} - // NewNativeEnv creates a Environment object. func NewNativeEnv(c *C.rocksdb_env_t) *Env { return &Env{c} @@ -38,6 +33,22 @@ func (env *Env) SetHighPriorityBackgroundThreads(n int) { C.rocksdb_env_set_high_priority_background_threads(env.c, C.int(n)) } +func (env *Env) LowerThreadPoolIOPriority() { + C.rocksdb_env_lower_thread_pool_io_priority(env.c) +} + +func (env *Env) LowerHighPriorityThreadPoolIOPriority() { + C.rocksdb_env_lower_high_priority_thread_pool_io_priority(env.c) +} + +func (env *Env) LowerThreadPoolCPUPriority() { + C.rocksdb_env_lower_thread_pool_cpu_priority(env.c) +} + +func (env *Env) LowerHighPriorityThreadPoolCPUPriority() { + C.rocksdb_env_lower_high_priority_thread_pool_cpu_priority(env.c) +} + // Destroy deallocates the Env object. func (env *Env) Destroy() { C.rocksdb_env_destroy(env.c) diff --git a/filter_policy.go b/filter_policy.go index a9c222b0..10c266ce 100644 --- a/filter_policy.go +++ b/filter_policy.go @@ -46,13 +46,7 @@ func (fp nativeFilterPolicy) Name() string { retur // FilterPolicy (like NewBloomFilterPolicy) that does not ignore // trailing spaces in keys. func NewBloomFilter(bitsPerKey int) FilterPolicy { - return NewNativeFilterPolicy(C.rocksdb_filterpolicy_create_bloom(C.int(bitsPerKey))) -} - -// NewBloomFilterFull returns a new filter policy created with use_block_based_builder=false -// (use full or partitioned filter). -func NewBloomFilterFull(bitsPerKey int) FilterPolicy { - return NewNativeFilterPolicy(C.rocksdb_filterpolicy_create_bloom_full(C.int(bitsPerKey))) + return NewNativeFilterPolicy(C.rocksdb_filterpolicy_create_bloom(C.double(bitsPerKey))) } // Hold references to filter policies. diff --git a/go.mod b/go.mod new file mode 100644 index 00000000..739e40ad --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module github.com/GetStream/gorocksdb + +go 1.16 + +require ( + github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c + github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect + github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect + github.com/stretchr/testify v1.7.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 00000000..76a9d22e --- /dev/null +++ b/go.sum @@ -0,0 +1,17 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c h1:8ISkoahWXwZR41ois5lSJBSVw4D0OV19Ht/JSTzvSv0= +github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c/go.mod h1:Yg+htXGokKKdzcwhuNDwVvN+uBxDGXJ7G/VN1d8fa64= +github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 h1:JWuenKqqX8nojtoVVWjGfOF9635RETekkoH6Cc9SX0A= +github.com/facebookgo/stack v0.0.0-20160209184415-751773369052/go.mod h1:UbMTZqLaRiH3MsBH8va0n7s1pQYcu3uTb8G4tygF4Zg= +github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 h1:7HZCaLC5+BZpmbhCOZJ293Lz68O7PYrF2EzeiFMwCLk= +github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/gorocksdb.c b/gorocksdb.c index efebbe51..255558d1 100644 --- a/gorocksdb.c +++ b/gorocksdb.c @@ -1,5 +1,6 @@ #include "gorocksdb.h" #include "_cgo_export.h" +#include /* Base */ @@ -28,13 +29,8 @@ rocksdb_compactionfilter_t* gorocksdb_compactionfilter_create(uintptr_t idx) { /* Filter Policy */ rocksdb_filterpolicy_t* gorocksdb_filterpolicy_create(uintptr_t idx) { - return rocksdb_filterpolicy_create( - (void*)idx, - gorocksdb_destruct_handler, - (char* (*)(void*, const char* const*, const size_t*, int, size_t*))(gorocksdb_filterpolicy_create_filter), - (unsigned char (*)(void*, const char*, size_t, const char*, size_t))(gorocksdb_filterpolicy_key_may_match), - gorocksdb_filterpolicy_delete_filter, - (const char *(*)(void*))(gorocksdb_filterpolicy_name)); + // This function is deprecated and removed in RocksDB v10.2.1 + return NULL; } void gorocksdb_filterpolicy_delete_filter(void* state, const char* v, size_t s) { @@ -68,3 +64,238 @@ rocksdb_slicetransform_t* gorocksdb_slicetransform_create(uintptr_t idx) { (unsigned char (*)(void*, const char*, size_t))(gorocksdb_slicetransform_in_range), (const char* (*)(void*))(gorocksdb_slicetransform_name)); } + +#define DEFAULT_PAGE_ALLOC_SIZE 512 + +extern gorocksdb_many_keys_t* gorocksdb_iter_many_keys(rocksdb_iterator_t* iter, int limit, bool reverse, const gorocksdb_many_keys_filter_t* key_filter, int page_alloc_size) { + int i; + char** keys, **values; + size_t* key_sizes, *value_sizes; + size_t key_size, value_size, cmp_size; + + // todo: we malloc the prefetch size (improve it) + gorocksdb_many_keys_t* many_keys = (gorocksdb_many_keys_t*) malloc(sizeof(gorocksdb_many_keys_t)); + + int size = page_alloc_size; + if (size <= 0) { + size = DEFAULT_PAGE_ALLOC_SIZE; + } + if (limit > 0 && limit < size) { + size = limit; + } + keys = (char**) malloc(size * sizeof(char*)); + key_sizes = (size_t*) malloc(size * sizeof(size_t)); + values = (char**) malloc(size * sizeof(char*)); + value_sizes = (size_t*) malloc(size * sizeof(size_t)); + + i = 0; + while (rocksdb_iter_valid(iter)) { + // Get current key + const char* key = rocksdb_iter_key(iter, &key_size); + + // Check filter + if (key_filter->key_prefix_s > 0) { + if (key_size < key_filter->key_prefix_s) { + break; + } + if (memcmp(key_filter->key_prefix, key, key_filter->key_prefix_s) != 0) { + break; + } + } + if (key_filter->key_end_s > 0) { + cmp_size = key_size > key_filter->key_end_s ? key_filter->key_end_s : key_size; + int c = memcmp(key, key_filter->key_end, cmp_size); + if (c == 0 && key_filter->key_end_s == key_size) { + // keys are equals, we break + break; + } + if (reverse) { + if (c == 0 && key_filter->key_end_s > key_size) { + // key_end is bigger than key, we must stop + break; + } else if (c < 0) { + // key is smaller than key_end, we break + break; + } + } else { + if (c == 0 && key_size > key_filter->key_end_s) { + // key_end is smaller than key, we must stop + break; + } else if (c > 0) { + // key is greater than key_end, we break + break; + } + } + } + + // Store key + if (i == size) { + // realloc 2x existing size + size = size*2; + keys = (char**) realloc(keys, size * sizeof(char*)); + key_sizes = (size_t*) realloc(key_sizes, size * sizeof(size_t)); + values = (char**) realloc(values, size * sizeof(char*)); + value_sizes = (size_t*) realloc(value_sizes, size * sizeof(size_t)); + } + keys[i] = (char*) malloc(key_size * sizeof(char)); + memcpy(keys[i], key, key_size); + key_sizes[i] = key_size; + + // Get value and store it + const char* val = rocksdb_iter_value(iter, &value_size); + if (val != NULL) { + values[i] = (char*) malloc(value_size * sizeof(char)); + memcpy(values[i], val, value_size); + } else { + values[i] = NULL; + } + value_sizes[i] = value_size; + i++; + + // Next key + if (reverse) { + // Move prev + rocksdb_iter_prev(iter); + } else { + // Move next + rocksdb_iter_next(iter); + } + + // Check limit + if (limit > 0 && i == limit) { + break; + } + } + + many_keys->keys = keys; + many_keys->key_sizes = key_sizes; + many_keys->values = values; + many_keys->value_sizes = value_sizes; + many_keys->found = i; + return many_keys; +} + +extern void gorocksdb_destroy_many_keys(gorocksdb_many_keys_t* many_keys) { + int i; + for (i = 0; i < many_keys->found; i++) { + free(many_keys->keys[i]); + if (many_keys->values != NULL && many_keys->values[i] != NULL) { + free(many_keys->values[i]); + } + } + free(many_keys->keys); + free(many_keys->key_sizes); + if (many_keys->values != NULL) { + free(many_keys->values); + free(many_keys->value_sizes); + } + free(many_keys); +} + +void _seek(rocksdb_iterator_t* iter, char* to_key, size_t to_key_s, bool reverse, bool exclude_to_key) { + // seek + if (reverse) { + if (to_key_s > 0) { + rocksdb_iter_seek_for_prev(iter, to_key, to_key_s); + } else { + rocksdb_iter_seek_to_last(iter); + } + } else { + if (to_key_s > 0) { + rocksdb_iter_seek(iter, to_key, to_key_s); + } else { + rocksdb_iter_seek_to_first(iter); + } + } + // jump current? + if (exclude_to_key && rocksdb_iter_valid(iter)) { + size_t key_size; + const char* key = rocksdb_iter_key(iter, &key_size); + if (to_key_s == key_size && memcmp(key, to_key, key_size) == 0) { + if (reverse) { + rocksdb_iter_prev(iter); + } else { + rocksdb_iter_next(iter); + } + } + } +} + +extern gorocksdb_many_keys_t** gorocksdb_many_search_keys(rocksdb_iterator_t* iter, const gorocksdb_keys_search_t* keys_searches, int size, int page_alloc_size) { + int i; + gorocksdb_many_keys_filter_t key_filter; + gorocksdb_many_keys_t** result = (gorocksdb_many_keys_t**) malloc(size*sizeof(gorocksdb_many_keys_t*)); + for (i=0; i < size; i++) { + _seek(iter, keys_searches[i].key_from, keys_searches[i].key_from_s, keys_searches[i].reverse, keys_searches[i].exclude_key_from); + key_filter.key_prefix = keys_searches[i].key_prefix; + key_filter.key_prefix_s = keys_searches[i].key_prefix_s; + key_filter.key_end = keys_searches[i].key_end; + key_filter.key_end_s = keys_searches[i].key_end_s; + result[i] = gorocksdb_iter_many_keys(iter, keys_searches[i].limit, keys_searches[i].reverse, &key_filter, page_alloc_size); + } + return result; +} + +extern void gorocksdb_destroy_many_many_keys(gorocksdb_many_keys_t** many_many_keys, int size) { + int i; + for (i = 0; i < size; i++) { + gorocksdb_destroy_many_keys(many_many_keys[i]); + } + free(many_many_keys); +} + +extern gorocksdb_many_keys_t** gorocksdb_many_search_keys_raw( + rocksdb_iterator_t* iter, + char** key_froms, + size_t* key_from_s, + char** key_prefixes, + size_t* key_prefix_s, + char** key_ends, + size_t* key_end_s, + int* limits, + bool* reverse, + int size, + int page_alloc_size +) { + int i; + gorocksdb_many_keys_filter_t key_filter; + gorocksdb_many_keys_t** result = (gorocksdb_many_keys_t**) malloc(size*sizeof(gorocksdb_many_keys_t*)); + for (i=0; i < size; i++) { + rocksdb_iter_seek(iter, key_froms[i], key_from_s[i]); + key_filter.key_prefix = key_prefixes[i]; + key_filter.key_prefix_s = key_prefix_s[i]; + key_filter.key_end = key_ends[i]; + key_filter.key_end_s = key_end_s[i]; + result[i] = gorocksdb_iter_many_keys(iter, limits[i], reverse[i], &key_filter, page_alloc_size); + } + return result; +} + +void gorocksdb_writebatch_put_many( + rocksdb_writebatch_t* batch, + size_t num_pairs, + char** keys, + size_t* key_sizes, + char** values, + size_t* value_sizes +) { + int i; + for (i=0; i < num_pairs; i++) { + rocksdb_writebatch_put(batch, keys[i], key_sizes[i], values[i], value_sizes[i]); + } +} + +void gorocksdb_writebatch_put_many_cf( + rocksdb_writebatch_t* batch, + rocksdb_column_family_handle_t* cf, + size_t num_pairs, + char** keys, + size_t* key_sizes, + char** values, + size_t* value_sizes +) { + int i; + for (i=0; i < num_pairs; i++) { + rocksdb_writebatch_put_cf(batch, cf, keys[i], key_sizes[i], values[i], value_sizes[i]); + } +} diff --git a/gorocksdb.h b/gorocksdb.h index 4a9968f0..07e0acb0 100644 --- a/gorocksdb.h +++ b/gorocksdb.h @@ -1,6 +1,29 @@ #include +#include #include "rocksdb/c.h" +typedef struct { + char** keys; + size_t* key_sizes; + char** values; + size_t* value_sizes; + int found; + +} gorocksdb_many_keys_t; + +// Compression types +#define rocksdb_no_compression 0 +#define rocksdb_snappy_compression 1 +#define rocksdb_zlib_compression 2 +#define rocksdb_bz2_compression 3 +#define rocksdb_lz4_compression 4 +#define rocksdb_lz4hc_compression 5 +#define rocksdb_xpress_compression 6 +#define rocksdb_zstd_compression 7 + +#define FALSE false +#define TRUE true + // This API provides convenient C wrapper functions for rocksdb client. /* Base */ @@ -28,3 +51,76 @@ extern void gorocksdb_mergeoperator_delete_value(void* state, const char* v, siz /* Slice Transform */ extern rocksdb_slicetransform_t* gorocksdb_slicetransform_create(uintptr_t idx); + +/* Iterate many keys */ + +typedef struct { + char* key_prefix; + size_t key_prefix_s; + char* key_end; + size_t key_end_s; + +} gorocksdb_many_keys_filter_t; + +extern gorocksdb_many_keys_t* gorocksdb_iter_many_keys(rocksdb_iterator_t* iter, int limit, bool reverse, const gorocksdb_many_keys_filter_t* key_filter, int page_alloc_size); + +extern void gorocksdb_destroy_many_keys(gorocksdb_many_keys_t* many_keys); + +/* Batch searches */ + +typedef struct { + char* key_from; + size_t key_from_s; + char* key_prefix; + size_t key_prefix_s; + char* key_end; + size_t key_end_s; + int limit; + bool reverse; + bool exclude_key_from; + +} gorocksdb_keys_search_t; + +extern gorocksdb_many_keys_t** gorocksdb_many_search_keys( + rocksdb_iterator_t* iter, + const gorocksdb_keys_search_t* keys_searches, + int size, + int page_alloc_size +); + +extern gorocksdb_many_keys_t** gorocksdb_many_search_keys_raw( + rocksdb_iterator_t* iter, + char** key_froms, + size_t* key_from_s, + char** key_prefixes, + size_t* key_prefix_s, + char** key_ends, + size_t* key_end_s, + int* limits, + bool* reverse, + int size, + int page_alloc_size +); + +extern void gorocksdb_destroy_many_many_keys(gorocksdb_many_keys_t** many_many_keys, int size); + +/* Batch PutMany */ + +void gorocksdb_writebatch_put_many( + rocksdb_writebatch_t* batch, + size_t num_pairs, + char** keys, + size_t* key_sizes, + char** values, + size_t* value_sizes +); + +void gorocksdb_writebatch_put_many_cf( + rocksdb_writebatch_t* batch, + rocksdb_column_family_handle_t* cf, + size_t num_pairs, + char** keys, + size_t* key_sizes, + char** values, + size_t* value_sizes +); diff --git a/iterator.go b/iterator.go index fefb82f1..ac6b59bd 100644 --- a/iterator.go +++ b/iterator.go @@ -2,10 +2,12 @@ package gorocksdb // #include // #include "rocksdb/c.h" +// #include "gorocksdb.h" import "C" import ( "bytes" "errors" + "reflect" "unsafe" ) @@ -14,18 +16,17 @@ import ( // // For example: // -// it := db.NewIterator(readOpts) -// defer it.Close() +// it := db.NewIterator(readOpts) +// defer it.Close() // -// it.Seek([]byte("foo")) -// for ; it.Valid(); it.Next() { -// fmt.Printf("Key: %v Value: %v\n", it.Key().Data(), it.Value().Data()) -// } -// -// if err := it.Err(); err != nil { -// return err -// } +// it.Seek([]byte("foo")) +// for ; it.Valid(); it.Next() { +// fmt.Printf("Key: %v Value: %v\n", it.Key().Data(), it.Value().Data()) +// } // +// if err := it.Err(); err != nil { +// return err +// } type Iterator struct { c *C.rocksdb_iterator_t } @@ -113,7 +114,7 @@ func (iter *Iterator) Err() error { var cErr *C.char C.rocksdb_iter_get_error(iter.c, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -124,3 +125,197 @@ func (iter *Iterator) Close() { C.rocksdb_iter_destroy(iter.c) iter.c = nil } + +var ManyKeysPageAllocSize int = 512 + +func (iter *Iterator) fetchNextManyKeys(reverse bool, limit int, keyPrefix, keyEnd []byte) *ManyKeys { + cKeyFilter := C.gorocksdb_many_keys_filter_t{} + if len(keyPrefix) > 0 { + cKeyPrefix := C.CString(string(keyPrefix)) + defer C.free(unsafe.Pointer(cKeyPrefix)) + cKeyFilter.key_prefix = cKeyPrefix + cKeyFilter.key_prefix_s = C.size_t(len(keyPrefix)) + } + if len(keyEnd) > 0 { + cKeyEnd := C.CString(string(keyEnd)) + defer C.free(unsafe.Pointer(cKeyEnd)) + cKeyFilter.key_end = cKeyEnd + cKeyFilter.key_end_s = C.size_t(len(keyEnd)) + } + return &ManyKeys{c: C.gorocksdb_iter_many_keys(iter.c, C.int(limit), C.bool(reverse), &cKeyFilter, C.int(ManyKeysPageAllocSize))} +} + +// NextManyKeys... +func (iter *Iterator) NextManyKeys(limit int, keyPrefix, keyEnd []byte) *ManyKeys { + return iter.fetchNextManyKeys(false, limit, keyPrefix, keyEnd) +} + +// NextManyKeysF... (compat) +func (iter *Iterator) NextManyKeysF(limit int, keyPrefix, keyEnd []byte) *ManyKeys { + return iter.NextManyKeys(limit, keyPrefix, keyEnd) +} + +// PrevManyKeys... +func (iter *Iterator) PrevManyKeys(limit int, keyPrefix, keyEnd []byte) *ManyKeys { + return iter.fetchNextManyKeys(true, limit, keyPrefix, keyEnd) +} + +type KeysSearch struct { + KeyFrom, + KeyPrefix, + KeyEnd []byte + Limit int + Reverse bool + ExcludeKeyFrom bool +} + +func (iter *Iterator) ManySearchKeys(searches []KeysSearch) *ManyManyKeys { + nbSearches := len(searches) + cManyKeysSearches := make([]C.gorocksdb_keys_search_t, nbSearches) + for i := range searches { + cKSearch := C.gorocksdb_keys_search_t{ + limit: C.int(searches[i].Limit), + reverse: C.bool(searches[i].Reverse), + exclude_key_from: C.bool(searches[i].ExcludeKeyFrom), + } + cKSearch.key_from = C.CString(string(searches[i].KeyFrom)) + cKSearch.key_from_s = C.size_t(len(searches[i].KeyFrom)) + if len(searches[i].KeyPrefix) > 0 { + cKSearch.key_prefix = C.CString(string(searches[i].KeyPrefix)) + cKSearch.key_prefix_s = C.size_t(len(searches[i].KeyPrefix)) + } + if len(searches[i].KeyEnd) > 0 { + cKSearch.key_end = C.CString(string(searches[i].KeyEnd)) + cKSearch.key_end_s = C.size_t(len(searches[i].KeyEnd)) + } + cManyKeysSearches[i] = cKSearch + } + cManyManyKeys := C.gorocksdb_many_search_keys(iter.c, + (*C.gorocksdb_keys_search_t)(unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&cManyKeysSearches)).Data)), + C.int(nbSearches), + C.int(ManyKeysPageAllocSize), + ) + // free + for i := range searches { + C.free(unsafe.Pointer(cManyKeysSearches[i].key_from)) + if len(searches[i].KeyPrefix) > 0 { + C.free(unsafe.Pointer(cManyKeysSearches[i].key_prefix)) + } + if len(searches[i].KeyEnd) > 0 { + C.free(unsafe.Pointer(cManyKeysSearches[i].key_end)) + } + } + return &ManyManyKeys{c: cManyManyKeys, size: nbSearches} +} + +//func (iter *Iterator) ManySearchKeysExp(searches []KeysSearch) *ManyManyKeys { +// nbSearches := len(searches) +// +// cKeyFroms := C.malloc(C.size_t(nbSearches) * C.size_t(unsafe.Sizeof(uintptr(0)))) +// defer C.free(cKeyFroms) +// cKeyPrefixes := C.malloc(C.size_t(nbSearches) * C.size_t(unsafe.Sizeof(uintptr(0)))) +// defer C.free(cKeyPrefixes) +// cKeyEnds := C.malloc(C.size_t(nbSearches) * C.size_t(unsafe.Sizeof(uintptr(0)))) +// defer C.free(cKeyEnds) +// cKeyFromSizes := make([]C.size_t, nbSearches) +// cKeyPrefixSizes := make([]C.size_t, nbSearches) +// cKeyEndSizes := make([]C.size_t, nbSearches) +// cLimits := make([]C.int, nbSearches) +// +// for i := uintptr(0); i < uintptr(nbSearches); i++ { +// search := searches[i] +// *(**C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(cKeyFroms)) + i*unsafe.Sizeof(cKeyFroms))) = byteToChar(search.KeyFrom) +// *(**C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(cKeyPrefixes)) + i*unsafe.Sizeof(cKeyPrefixes))) = byteToChar(search.KeyPrefix) +// *(**C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(cKeyEnds)) + i*unsafe.Sizeof(cKeyEnds))) = byteToChar(search.KeyEnd) +// cKeyFromSizes[i] = C.size_t(len(searches[i].KeyFrom)) +// cKeyPrefixSizes[i] = C.size_t(len(searches[i].KeyPrefix)) +// cKeyEndSizes[i] = C.size_t(len(searches[i].KeyEnd)) +// cLimits[i] = C.int(searches[i].Limit) +// } +// cManyManyKeys := C.gorocksdb_many_search_keys_raw( +// iter.c, +// (**C.char)(unsafe.Pointer(cKeyFroms)), +// (*C.size_t)(unsafe.Pointer(&cKeyFromSizes[0])), +// (**C.char)(unsafe.Pointer(cKeyPrefixes)), +// (*C.size_t)(unsafe.Pointer(&cKeyPrefixSizes[0])), +// (**C.char)(unsafe.Pointer(cKeyEnds)), +// (*C.size_t)(unsafe.Pointer(&cKeyEndSizes[0])), +// (*C.int)(unsafe.Pointer(&cLimits[0])), +// C.int(nbSearches), +// C.int(ManyKeysPageAllocSize), +// ) +// return &ManyManyKeys{c: cManyManyKeys, size: nbSearches} +//} + +type ManyKeys struct { + c *C.gorocksdb_many_keys_t +} + +func (m *ManyKeys) Destroy() { + C.gorocksdb_destroy_many_keys(m.c) +} + +func (m *ManyKeys) Found() int { + return int(m.c.found) +} + +func (m *ManyKeys) Keys() [][]byte { + found := m.Found() + keys := make([][]byte, found) + + for i := uintptr(0); i < uintptr(found); i++ { + chars := *(**C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(m.c.keys)) + i*unsafe.Sizeof(m.c.keys))) + size := *(*C.size_t)(unsafe.Pointer(uintptr(unsafe.Pointer(m.c.key_sizes)) + i*unsafe.Sizeof(m.c.key_sizes))) + keys[i] = charToByte(chars, size) + + } + return keys +} + +func (m *ManyKeys) Values() [][]byte { + found := m.Found() + values := make([][]byte, found) + + for i := uintptr(0); i < uintptr(found); i++ { + chars := *(**C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(m.c.values)) + i*unsafe.Sizeof(m.c.values))) + size := *(*C.size_t)(unsafe.Pointer(uintptr(unsafe.Pointer(m.c.value_sizes)) + i*unsafe.Sizeof(m.c.value_sizes))) + values[i] = charToByte(chars, size) + } + return values +} + +func (m *ManyKeys) Each(each func(i int, key []byte, value []byte) bool) bool { + found := m.Found() + for i := uintptr(0); i < uintptr(found); i++ { + chars := *(**C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(m.c.keys)) + i*unsafe.Sizeof(m.c.keys))) + size := *(*C.size_t)(unsafe.Pointer(uintptr(unsafe.Pointer(m.c.key_sizes)) + i*unsafe.Sizeof(m.c.key_sizes))) + key := charToByte(chars, size) + + chars = *(**C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(m.c.values)) + i*unsafe.Sizeof(m.c.values))) + size = *(*C.size_t)(unsafe.Pointer(uintptr(unsafe.Pointer(m.c.value_sizes)) + i*unsafe.Sizeof(m.c.value_sizes))) + value := charToByte(chars, size) + + if !each(int(i), key, value) { + return false + } + } + return true +} + +type ManyManyKeys struct { + c **C.gorocksdb_many_keys_t + size int +} + +func (m ManyManyKeys) Result() []*ManyKeys { + result := make([]*ManyKeys, m.size) + for i := uintptr(0); i < uintptr(m.size); i++ { + manyKeys := *(**C.gorocksdb_many_keys_t)(unsafe.Pointer(uintptr(unsafe.Pointer(m.c)) + i*unsafe.Sizeof(m.c))) + result[i] = &ManyKeys{c: manyKeys} + } + return result +} + +func (m ManyManyKeys) Destroy() { + C.gorocksdb_destroy_many_many_keys(m.c, C.int(m.size)) +} diff --git a/iterator_test.go b/iterator_test.go index 358400ba..5b20f61a 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -29,3 +29,625 @@ func TestIterator(t *testing.T) { ensure.Nil(t, iter.Err()) ensure.DeepEqual(t, actualKeys, givenKeys) } + +func TestIteratorNextManyWithKeyPrefix(t *testing.T) { + db := newTestDB(t, "TestIterator", nil) + defer db.Close() + + // insert keys + givenKeys := [][]byte{[]byte("keyA1"), []byte("keyA2"), []byte("keyA3"), []byte("keyB1")} + wo := NewDefaultWriteOptions() + for _, k := range givenKeys { + ensure.Nil(t, db.Put(wo, k, []byte("val_"+string(k)))) + } + + ro := NewDefaultReadOptions() + iter := db.NewIterator(ro) + defer iter.Close() + var actualKeys [][]byte + var actualValues [][]byte + iter.SeekToFirst() + + manyKeys := iter.NextManyKeys(2, []byte("keyA"), nil) + for manyKeys.Found() > 0 { + for _, k := range manyKeys.Keys() { + newK := make([]byte, len(k)) + copy(newK, k) + actualKeys = append(actualKeys, newK) + } + for _, v := range manyKeys.Values() { + newV := make([]byte, len(v)) + copy(newV, v) + actualValues = append(actualValues, newV) + } + manyKeys.Destroy() + manyKeys = iter.NextManyKeys(2, []byte("keyA"), nil) + } + manyKeys.Destroy() + ensure.Nil(t, iter.Err()) + ensure.DeepEqual(t, actualKeys, [][]byte{[]byte("keyA1"), []byte("keyA2"), []byte("keyA3")}) + ensure.DeepEqual(t, actualValues, [][]byte{[]byte("val_keyA1"), []byte("val_keyA2"), []byte("val_keyA3")}) +} + +func TestIteratorNextManyWithLimit(t *testing.T) { + db := newTestDB(t, "TestIterator", nil) + defer db.Close() + + // insert keys + givenKeys := [][]byte{[]byte("keyA1"), []byte("keyA2"), []byte("keyA3"), []byte("keyA4")} + wo := NewDefaultWriteOptions() + for _, k := range givenKeys { + ensure.Nil(t, db.Put(wo, k, []byte("val_"+string(k)))) + } + + ro := NewDefaultReadOptions() + iter := db.NewIterator(ro) + defer iter.Close() + + iter.SeekToFirst() + manyKeys := iter.NextManyKeys(-1, []byte("keyA"), nil) + ensure.DeepEqual(t, manyKeys.Keys(), [][]byte{[]byte("keyA1"), []byte("keyA2"), []byte("keyA3"), []byte("keyA4")}) + ensure.DeepEqual(t, manyKeys.Values(), [][]byte{[]byte("val_keyA1"), []byte("val_keyA2"), []byte("val_keyA3"), []byte("val_keyA4")}) + manyKeys.Destroy() + + iter.SeekToFirst() + manyKeys = iter.NextManyKeys(0, []byte("keyA"), nil) + ensure.DeepEqual(t, manyKeys.Keys(), [][]byte{[]byte("keyA1"), []byte("keyA2"), []byte("keyA3"), []byte("keyA4")}) + ensure.DeepEqual(t, manyKeys.Values(), [][]byte{[]byte("val_keyA1"), []byte("val_keyA2"), []byte("val_keyA3"), []byte("val_keyA4")}) + manyKeys.Destroy() + + iter.SeekToFirst() + manyKeys = iter.NextManyKeys(2, []byte("keyA"), nil) + ensure.DeepEqual(t, manyKeys.Keys(), [][]byte{[]byte("keyA1"), []byte("keyA2")}) + ensure.DeepEqual(t, manyKeys.Values(), [][]byte{[]byte("val_keyA1"), []byte("val_keyA2")}) + manyKeys.Destroy() + + iter.SeekToFirst() + manyKeys = iter.NextManyKeys(20, []byte("keyA"), nil) + ensure.DeepEqual(t, manyKeys.Keys(), [][]byte{[]byte("keyA1"), []byte("keyA2"), []byte("keyA3"), []byte("keyA4")}) + ensure.DeepEqual(t, manyKeys.Values(), [][]byte{[]byte("val_keyA1"), []byte("val_keyA2"), []byte("val_keyA3"), []byte("val_keyA4")}) + manyKeys.Destroy() +} + +func TestIteratorNextManyWithKeyEnd(t *testing.T) { + db := newTestDB(t, "TestIterator", nil) + defer db.Close() + + // insert keys + givenKeys := [][]byte{[]byte("A"), []byte("B"), []byte("C"), []byte("C10"), []byte("C11"), []byte("D")} + wo := NewDefaultWriteOptions() + for _, k := range givenKeys { + ensure.Nil(t, db.Put(wo, k, []byte("val_"+string(k)))) + } + + ro := NewDefaultReadOptions() + iter := db.NewIterator(ro) + defer iter.Close() + var actualKeys [][]byte + var actualValues [][]byte + iter.SeekToFirst() + + manyKeys := iter.NextManyKeys(2, nil, []byte("C1")) + for manyKeys.Found() > 0 { + for _, k := range manyKeys.Keys() { + newK := make([]byte, len(k)) + copy(newK, k) + actualKeys = append(actualKeys, newK) + } + for _, v := range manyKeys.Values() { + newV := make([]byte, len(v)) + copy(newV, v) + actualValues = append(actualValues, newV) + } + manyKeys.Destroy() + manyKeys = iter.NextManyKeys(2, nil, []byte("C1")) + } + manyKeys.Destroy() + ensure.Nil(t, iter.Err()) + ensure.DeepEqual(t, actualKeys, [][]byte{[]byte("A"), []byte("B"), []byte("C")}) + ensure.DeepEqual(t, actualValues, [][]byte{[]byte("val_A"), []byte("val_B"), []byte("val_C")}) +} + +func TestIteratorNextManyWithKeyPrefixAndEnd(t *testing.T) { + db := newTestDB(t, "TestIterator", nil) + defer db.Close() + + // insert keys + givenKeys := [][]byte{[]byte("keyA"), []byte("keyB"), []byte("keyC"), []byte("keyC1")} + wo := NewDefaultWriteOptions() + for _, k := range givenKeys { + ensure.Nil(t, db.Put(wo, k, []byte("val_"+string(k)))) + } + + ro := NewDefaultReadOptions() + iter := db.NewIterator(ro) + defer iter.Close() + var actualKeys [][]byte + var actualValues [][]byte + iter.SeekToFirst() + + manyKeys := iter.NextManyKeys(2, []byte("key"), []byte("keyC1")) + for manyKeys.Found() > 0 { + for _, k := range manyKeys.Keys() { + newK := make([]byte, len(k)) + copy(newK, k) + actualKeys = append(actualKeys, newK) + } + for _, v := range manyKeys.Values() { + newV := make([]byte, len(v)) + copy(newV, v) + actualValues = append(actualValues, newV) + } + manyKeys.Destroy() + manyKeys = iter.NextManyKeys(2, []byte("key"), []byte("keyC1")) + } + manyKeys.Destroy() + ensure.Nil(t, iter.Err()) + ensure.DeepEqual(t, actualKeys, [][]byte{[]byte("keyA"), []byte("keyB"), []byte("keyC")}) + ensure.DeepEqual(t, actualValues, [][]byte{[]byte("val_keyA"), []byte("val_keyB"), []byte("val_keyC")}) +} + +func TestIteratorPrevManyWithKeyPrefix(t *testing.T) { + db := newTestDB(t, "TestIterator", nil) + defer db.Close() + + // insert keys + givenKeys := [][]byte{[]byte("keyA1"), []byte("keyA2"), []byte("keyA3"), []byte("keyB1")} + wo := NewDefaultWriteOptions() + for _, k := range givenKeys { + ensure.Nil(t, db.Put(wo, k, []byte("val_"+string(k)))) + } + + ro := NewDefaultReadOptions() + iter := db.NewIterator(ro) + defer iter.Close() + var actualKeys [][]byte + var actualValues [][]byte + + iter.SeekToLast() + manyKeys := iter.PrevManyKeys(2, []byte("keyA"), nil) + ensure.DeepEqual(t, manyKeys.Found(), 0) + + iter.Seek([]byte("keyA3")) + manyKeys = iter.PrevManyKeys(2, []byte("keyA"), nil) + for manyKeys.Found() > 0 { + for _, k := range manyKeys.Keys() { + newK := make([]byte, len(k)) + copy(newK, k) + actualKeys = append(actualKeys, newK) + } + for _, v := range manyKeys.Values() { + newV := make([]byte, len(v)) + copy(newV, v) + actualValues = append(actualValues, newV) + } + manyKeys.Destroy() + manyKeys = iter.PrevManyKeys(2, []byte("keyA"), nil) + } + manyKeys.Destroy() + ensure.Nil(t, iter.Err()) + ensure.DeepEqual(t, actualKeys, [][]byte{[]byte("keyA3"), []byte("keyA2"), []byte("keyA1")}) + ensure.DeepEqual(t, actualValues, [][]byte{[]byte("val_keyA3"), []byte("val_keyA2"), []byte("val_keyA1")}) +} + +func TestIteratorPrevManyWithLimit(t *testing.T) { + db := newTestDB(t, "TestIterator", nil) + defer db.Close() + + // insert keys + givenKeys := [][]byte{[]byte("keyA1"), []byte("keyA2"), []byte("keyA3"), []byte("keyA4")} + wo := NewDefaultWriteOptions() + for _, k := range givenKeys { + ensure.Nil(t, db.Put(wo, k, []byte("val_"+string(k)))) + } + + ro := NewDefaultReadOptions() + iter := db.NewIterator(ro) + defer iter.Close() + + iter.SeekToLast() + manyKeys := iter.PrevManyKeys(-1, []byte("keyA"), nil) + ensure.DeepEqual(t, manyKeys.Keys(), [][]byte{[]byte("keyA4"), []byte("keyA3"), []byte("keyA2"), []byte("keyA1")}) + ensure.DeepEqual(t, manyKeys.Values(), [][]byte{[]byte("val_keyA4"), []byte("val_keyA3"), []byte("val_keyA2"), []byte("val_keyA1")}) + manyKeys.Destroy() + + iter.SeekToLast() + manyKeys = iter.PrevManyKeys(0, []byte("keyA"), nil) + ensure.DeepEqual(t, manyKeys.Keys(), [][]byte{[]byte("keyA4"), []byte("keyA3"), []byte("keyA2"), []byte("keyA1")}) + ensure.DeepEqual(t, manyKeys.Values(), [][]byte{[]byte("val_keyA4"), []byte("val_keyA3"), []byte("val_keyA2"), []byte("val_keyA1")}) + manyKeys.Destroy() + + iter.SeekToLast() + manyKeys = iter.PrevManyKeys(2, []byte("keyA"), nil) + ensure.DeepEqual(t, manyKeys.Keys(), [][]byte{[]byte("keyA4"), []byte("keyA3")}) + ensure.DeepEqual(t, manyKeys.Values(), [][]byte{[]byte("val_keyA4"), []byte("val_keyA3")}) + manyKeys.Destroy() + + iter.SeekToLast() + manyKeys = iter.PrevManyKeys(20, []byte("keyA"), nil) + ensure.DeepEqual(t, manyKeys.Keys(), [][]byte{[]byte("keyA4"), []byte("keyA3"), []byte("keyA2"), []byte("keyA1")}) + ensure.DeepEqual(t, manyKeys.Values(), [][]byte{[]byte("val_keyA4"), []byte("val_keyA3"), []byte("val_keyA2"), []byte("val_keyA1")}) + manyKeys.Destroy() +} + +func TestIteratorPrevManyWithKeyEnd(t *testing.T) { + db := newTestDB(t, "TestIterator", nil) + defer db.Close() + + // insert keys + givenKeys := [][]byte{[]byte("A"), []byte("B"), []byte("C"), []byte("C11"), []byte("C12"), []byte("D")} + wo := NewDefaultWriteOptions() + for _, k := range givenKeys { + ensure.Nil(t, db.Put(wo, k, []byte("val_"+string(k)))) + } + + ro := NewDefaultReadOptions() + iter := db.NewIterator(ro) + defer iter.Close() + var actualKeys [][]byte + var actualValues [][]byte + iter.SeekToLast() + + manyKeys := iter.PrevManyKeys(2, nil, []byte("C1")) + for manyKeys.Found() > 0 { + for _, k := range manyKeys.Keys() { + newK := make([]byte, len(k)) + copy(newK, k) + actualKeys = append(actualKeys, newK) + } + for _, v := range manyKeys.Values() { + newV := make([]byte, len(v)) + copy(newV, v) + actualValues = append(actualValues, newV) + } + manyKeys.Destroy() + manyKeys = iter.PrevManyKeys(2, nil, []byte("C1")) + } + manyKeys.Destroy() + ensure.Nil(t, iter.Err()) + ensure.DeepEqual(t, actualKeys, [][]byte{[]byte("D"), []byte("C12"), []byte("C11")}) + ensure.DeepEqual(t, actualValues, [][]byte{[]byte("val_D"), []byte("val_C12"), []byte("val_C11")}) +} + +func TestIteratorPrevManyWithKeyPrefixAndEnd(t *testing.T) { + db := newTestDB(t, "TestIterator", nil) + defer db.Close() + + // insert keys + givenKeys := [][]byte{[]byte("keyA"), []byte("keyB"), []byte("keyC"), []byte("keyC1")} + wo := NewDefaultWriteOptions() + for _, k := range givenKeys { + ensure.Nil(t, db.Put(wo, k, []byte("val_"+string(k)))) + } + + ro := NewDefaultReadOptions() + iter := db.NewIterator(ro) + defer iter.Close() + var actualKeys [][]byte + var actualValues [][]byte + iter.SeekToLast() + + manyKeys := iter.PrevManyKeys(2, []byte("key"), []byte("keyA")) + for manyKeys.Found() > 0 { + for _, k := range manyKeys.Keys() { + newK := make([]byte, len(k)) + copy(newK, k) + actualKeys = append(actualKeys, newK) + } + for _, v := range manyKeys.Values() { + newV := make([]byte, len(v)) + copy(newV, v) + actualValues = append(actualValues, newV) + } + manyKeys.Destroy() + manyKeys = iter.PrevManyKeys(2, []byte("key"), []byte("keyA")) + } + manyKeys.Destroy() + ensure.Nil(t, iter.Err()) + ensure.DeepEqual(t, actualKeys, [][]byte{[]byte("keyC1"), []byte("keyC"), []byte("keyB")}) + ensure.DeepEqual(t, actualValues, [][]byte{[]byte("val_keyC1"), []byte("val_keyC"), []byte("val_keyB")}) +} + +func TestIteratorManySearchKeys(t *testing.T) { + db := newTestDB(t, "TestIterator", nil) + defer db.Close() + + // insert keys + givenKeys := [][]byte{[]byte("A"), []byte("B"), []byte("C"), []byte("D"), []byte("E"), []byte("F")} + wo := NewDefaultWriteOptions() + for _, k := range givenKeys { + ensure.Nil(t, db.Put(wo, k, []byte("val_"+string(k)))) + } + + ro := NewDefaultReadOptions() + iter := db.NewIterator(ro) + defer iter.Close() + + searches := make([]KeysSearch, 3) + searches[0] = KeysSearch{KeyFrom: []byte("A"), Limit: 1000} + searches[1] = KeysSearch{KeyFrom: []byte("D"), Limit: 1000} + searches[2] = KeysSearch{KeyFrom: []byte("Z"), Limit: 1000} + + manyManyKeys := iter.ManySearchKeys(searches) + defer manyManyKeys.Destroy() + result := manyManyKeys.Result() + if len(result) != len(searches) { + t.Fatalf("result len should be %d", len(searches)) + } + ensure.DeepEqual(t, result[0].Found(), 6) + ensure.DeepEqual(t, result[0].Keys(), [][]byte{[]byte("A"), []byte("B"), []byte("C"), []byte("D"), []byte("E"), []byte("F")}) + ensure.DeepEqual(t, result[0].Values(), [][]byte{[]byte("val_A"), []byte("val_B"), []byte("val_C"), []byte("val_D"), []byte("val_E"), []byte("val_F")}) + ensure.DeepEqual(t, result[1].Found(), 3) + ensure.DeepEqual(t, result[1].Keys(), [][]byte{[]byte("D"), []byte("E"), []byte("F")}) + ensure.DeepEqual(t, result[1].Values(), [][]byte{[]byte("val_D"), []byte("val_E"), []byte("val_F")}) + ensure.DeepEqual(t, result[2].Found(), 0) + ensure.DeepEqual(t, result[2].Keys(), [][]byte{}) + ensure.DeepEqual(t, result[2].Values(), [][]byte{}) +} + +func TestIteratorManySearchKeysEmptyKeyFrom(t *testing.T) { + db := newTestDB(t, "TestIterator", nil) + defer db.Close() + + // insert keys + givenKeys := [][]byte{[]byte("A"), []byte("B"), []byte("C"), []byte("D"), []byte("E"), []byte("F")} + wo := NewDefaultWriteOptions() + for _, k := range givenKeys { + ensure.Nil(t, db.Put(wo, k, []byte("val_"+string(k)))) + } + + ro := NewDefaultReadOptions() + iter := db.NewIterator(ro) + defer iter.Close() + + searches := make([]KeysSearch, 4) + searches[0] = KeysSearch{Limit: 3} + searches[1] = KeysSearch{Limit: 3, ExcludeKeyFrom: true} + searches[2] = KeysSearch{Limit: 3, Reverse: true} + searches[3] = KeysSearch{Limit: 3, ExcludeKeyFrom: true, Reverse: true} + + manyManyKeys := iter.ManySearchKeys(searches) + defer manyManyKeys.Destroy() + result := manyManyKeys.Result() + if len(result) != len(searches) { + t.Fatalf("result len should be %d", len(searches)) + } + ensure.DeepEqual(t, result[0].Found(), 3) + ensure.DeepEqual(t, result[0].Keys(), [][]byte{[]byte("A"), []byte("B"), []byte("C")}) + ensure.DeepEqual(t, result[0].Values(), [][]byte{[]byte("val_A"), []byte("val_B"), []byte("val_C")}) + ensure.DeepEqual(t, result[0].Keys(), result[1].Keys()) + ensure.DeepEqual(t, result[0].Values(), result[1].Values()) + ensure.DeepEqual(t, result[2].Found(), 3) + ensure.DeepEqual(t, result[2].Keys(), [][]byte{[]byte("F"), []byte("E"), []byte("D")}) + ensure.DeepEqual(t, result[2].Values(), [][]byte{[]byte("val_F"), []byte("val_E"), []byte("val_D")}) + ensure.DeepEqual(t, result[2].Keys(), result[3].Keys()) + ensure.DeepEqual(t, result[2].Values(), result[3].Values()) +} + +func TestIteratorManySearchKeysReverse(t *testing.T) { + db := newTestDB(t, "TestIterator", nil) + defer db.Close() + + // insert keys + givenKeys := [][]byte{[]byte("A1"), []byte("A2"), []byte("C1"), []byte("C2"), []byte("D"), []byte("E"), []byte("F")} + wo := NewDefaultWriteOptions() + for _, k := range givenKeys { + ensure.Nil(t, db.Put(wo, k, []byte("val_"+string(k)))) + } + + ro := NewDefaultReadOptions() + iter := db.NewIterator(ro) + defer iter.Close() + + searches := make([]KeysSearch, 2) + searches[0] = KeysSearch{KeyFrom: []byte("C3"), Limit: 1000, Reverse: true} + searches[1] = KeysSearch{KeyFrom: []byte("C2"), Limit: 1000, Reverse: true} + + manyManyKeys := iter.ManySearchKeys(searches) + defer manyManyKeys.Destroy() + result := manyManyKeys.Result() + if len(result) != len(searches) { + t.Fatalf("result len should be %d", len(searches)) + } + ensure.DeepEqual(t, result[0].Found(), 4) + ensure.DeepEqual(t, result[0].Keys(), [][]byte{[]byte("C2"), []byte("C1"), []byte("A2"), []byte("A1")}) + ensure.DeepEqual(t, result[0].Values(), [][]byte{[]byte("val_C2"), []byte("val_C1"), []byte("val_A2"), []byte("val_A1")}) + ensure.DeepEqual(t, result[1].Found(), 4) + ensure.DeepEqual(t, result[1].Keys(), [][]byte{[]byte("C2"), []byte("C1"), []byte("A2"), []byte("A1")}) + ensure.DeepEqual(t, result[1].Values(), [][]byte{[]byte("val_C2"), []byte("val_C1"), []byte("val_A2"), []byte("val_A1")}) +} + +func TestIteratorManySearchKeysExcludeKeyFrom(t *testing.T) { + db := newTestDB(t, "TestIterator", nil) + defer db.Close() + + // insert keys + givenKeys := [][]byte{[]byte("A"), []byte("B"), []byte("C"), []byte("D"), []byte("E"), []byte("F")} + wo := NewDefaultWriteOptions() + for _, k := range givenKeys { + ensure.Nil(t, db.Put(wo, k, []byte("val_"+string(k)))) + } + + ro := NewDefaultReadOptions() + iter := db.NewIterator(ro) + defer iter.Close() + + searches := make([]KeysSearch, 5) + searches[0] = KeysSearch{KeyFrom: []byte("A"), Limit: 1000, Reverse: false, ExcludeKeyFrom: true} + searches[1] = KeysSearch{KeyFrom: []byte("D"), Limit: 1000, Reverse: false, ExcludeKeyFrom: false} + searches[2] = KeysSearch{KeyFrom: []byte("A"), Limit: 1000, Reverse: true, ExcludeKeyFrom: true} + searches[3] = KeysSearch{KeyFrom: []byte("A"), Limit: 1000, Reverse: true, ExcludeKeyFrom: false} + searches[4] = KeysSearch{KeyFrom: []byte("D"), Limit: 1000, Reverse: true, ExcludeKeyFrom: false} + manyManyKeys := iter.ManySearchKeys(searches) + defer manyManyKeys.Destroy() + result := manyManyKeys.Result() + if len(result) != len(searches) { + t.Fatalf("result len should be %d", len(searches)) + } + ensure.DeepEqual(t, result[0].Found(), 5) + ensure.DeepEqual(t, result[0].Keys(), [][]byte{[]byte("B"), []byte("C"), []byte("D"), []byte("E"), []byte("F")}) + ensure.DeepEqual(t, result[0].Values(), [][]byte{[]byte("val_B"), []byte("val_C"), []byte("val_D"), []byte("val_E"), []byte("val_F")}) + ensure.DeepEqual(t, result[1].Found(), 3) + ensure.DeepEqual(t, result[1].Keys(), [][]byte{[]byte("D"), []byte("E"), []byte("F")}) + ensure.DeepEqual(t, result[1].Values(), [][]byte{[]byte("val_D"), []byte("val_E"), []byte("val_F")}) + ensure.DeepEqual(t, result[2].Found(), 0) + ensure.DeepEqual(t, result[2].Keys(), [][]byte{}) + ensure.DeepEqual(t, result[2].Values(), [][]byte{}) + ensure.DeepEqual(t, result[3].Found(), 1) + ensure.DeepEqual(t, result[3].Keys(), [][]byte{[]byte("A")}) + ensure.DeepEqual(t, result[3].Values(), [][]byte{[]byte("val_A")}) + ensure.DeepEqual(t, result[4].Found(), 4) + ensure.DeepEqual(t, result[4].Keys(), [][]byte{[]byte("D"), []byte("C"), []byte("B"), []byte("A")}) + ensure.DeepEqual(t, result[4].Values(), [][]byte{[]byte("val_D"), []byte("val_C"), []byte("val_B"), []byte("val_A")}) +} + +func TestIteratorManySearchKeysWithKeyPrefix(t *testing.T) { + db := newTestDB(t, "TestIterator", nil) + defer db.Close() + + // insert keys + givenKeys := [][]byte{[]byte("A1"), []byte("A2"), []byte("B1"), []byte("C1"), []byte("D1"), []byte("D2")} + wo := NewDefaultWriteOptions() + for _, k := range givenKeys { + ensure.Nil(t, db.Put(wo, k, []byte("val_"+string(k)))) + } + + ro := NewDefaultReadOptions() + iter := db.NewIterator(ro) + defer iter.Close() + + searches := make([]KeysSearch, 4) + searches[0] = KeysSearch{KeyFrom: []byte("A"), KeyPrefix: []byte("A"), Limit: 1000} + searches[1] = KeysSearch{KeyFrom: []byte("B"), KeyPrefix: []byte("B"), Limit: 1000} + searches[2] = KeysSearch{KeyFrom: []byte("D"), KeyPrefix: []byte("D"), Limit: 1000} + searches[3] = KeysSearch{KeyFrom: []byte("Z"), KeyPrefix: []byte("Z"), Limit: 1000} + + manyManyKeys := iter.ManySearchKeys(searches) + defer manyManyKeys.Destroy() + result := manyManyKeys.Result() + if len(result) != len(searches) { + t.Fatalf("result len should be %d", len(searches)) + } + ensure.DeepEqual(t, result[0].Found(), 2) + ensure.DeepEqual(t, result[0].Keys(), [][]byte{[]byte("A1"), []byte("A2")}) + ensure.DeepEqual(t, result[0].Values(), [][]byte{[]byte("val_A1"), []byte("val_A2")}) + ensure.DeepEqual(t, result[1].Found(), 1) + ensure.DeepEqual(t, result[1].Keys(), [][]byte{[]byte("B1")}) + ensure.DeepEqual(t, result[1].Values(), [][]byte{[]byte("val_B1")}) + ensure.DeepEqual(t, result[2].Found(), 2) + ensure.DeepEqual(t, result[2].Keys(), [][]byte{[]byte("D1"), []byte("D2")}) + ensure.DeepEqual(t, result[2].Values(), [][]byte{[]byte("val_D1"), []byte("val_D2")}) + ensure.DeepEqual(t, result[3].Found(), 0) + ensure.DeepEqual(t, result[3].Keys(), [][]byte{}) + ensure.DeepEqual(t, result[3].Values(), [][]byte{}) +} + +func TestIteratorManySearchKeysWithKeyEnd(t *testing.T) { + db := newTestDB(t, "TestIterator", nil) + defer db.Close() + + // insert keys + givenKeys := [][]byte{[]byte("A1"), []byte("A2"), []byte("A3"), []byte("B1"), []byte("B2"), []byte("B3")} + wo := NewDefaultWriteOptions() + for _, k := range givenKeys { + ensure.Nil(t, db.Put(wo, k, []byte("val_"+string(k)))) + } + + ro := NewDefaultReadOptions() + iter := db.NewIterator(ro) + defer iter.Close() + + searches := make([]KeysSearch, 2) + searches[0] = KeysSearch{KeyFrom: []byte("A"), KeyEnd: []byte("A3"), Limit: 1000} + searches[1] = KeysSearch{KeyFrom: []byte("B"), KeyEnd: []byte("B2"), Limit: 1000} + + manyManyKeys := iter.ManySearchKeys(searches) + defer manyManyKeys.Destroy() + result := manyManyKeys.Result() + if len(result) != len(searches) { + t.Fatalf("result len should be %d", len(searches)) + } + ensure.DeepEqual(t, result[0].Found(), 2) + ensure.DeepEqual(t, result[0].Keys(), [][]byte{[]byte("A1"), []byte("A2")}) + ensure.DeepEqual(t, result[0].Values(), [][]byte{[]byte("val_A1"), []byte("val_A2")}) + ensure.DeepEqual(t, result[1].Found(), 1) + ensure.DeepEqual(t, result[1].Keys(), [][]byte{[]byte("B1")}) + ensure.DeepEqual(t, result[1].Values(), [][]byte{[]byte("val_B1")}) +} + +func TestIteratorManySearchKeysWithKeyPrefixAndEnd(t *testing.T) { + db := newTestDB(t, "TestIterator", nil) + defer db.Close() + + // insert keys + givenKeys := [][]byte{[]byte("keyC"), []byte("keyC0"), []byte("keyC1")} + wo := NewDefaultWriteOptions() + for _, k := range givenKeys { + ensure.Nil(t, db.Put(wo, k, []byte("val_"+string(k)))) + } + + ro := NewDefaultReadOptions() + iter := db.NewIterator(ro) + defer iter.Close() + + searches := make([]KeysSearch, 2) + searches[0] = KeysSearch{KeyFrom: []byte("keyC0"), KeyPrefix: []byte("keyC"), KeyEnd: []byte("keyC1"), Limit: 1000} + searches[1] = KeysSearch{KeyFrom: []byte("k"), KeyPrefix: []byte("keyC"), KeyEnd: []byte("keyC1"), Limit: 1000} + + manyManyKeys := iter.ManySearchKeys(searches) + defer manyManyKeys.Destroy() + result := manyManyKeys.Result() + if len(result) != len(searches) { + t.Fatalf("result len should be %d", len(searches)) + } + ensure.DeepEqual(t, result[0].Found(), 1) + ensure.DeepEqual(t, result[0].Keys(), [][]byte{[]byte("keyC0")}) + ensure.DeepEqual(t, result[0].Values(), [][]byte{[]byte("val_keyC0")}) + ensure.DeepEqual(t, result[1].Found(), 2) + ensure.DeepEqual(t, result[1].Keys(), [][]byte{[]byte("keyC"), []byte("keyC0")}) + ensure.DeepEqual(t, result[1].Values(), [][]byte{[]byte("val_keyC"), []byte("val_keyC0")}) +} + +func TestIteratorNextManyKeysEach(t *testing.T) { + db := newTestDB(t, "TestIterator", nil) + defer db.Close() + + // insert keys + givenKeys := [][]byte{[]byte("keyA1"), []byte("keyA2"), []byte("keyA3"), []byte("keyA4")} + wo := NewDefaultWriteOptions() + for _, k := range givenKeys { + ensure.Nil(t, db.Put(wo, k, []byte("val_"+string(k)))) + } + + ro := NewDefaultReadOptions() + iter := db.NewIterator(ro) + defer iter.Close() + + iter.SeekToFirst() + manyKeys := iter.NextManyKeys(-1, []byte("keyA"), nil) + + actualKeys := [][]byte{} + actualValues := [][]byte{} + all := manyKeys.Each(func(i int, key []byte, value []byte) bool { + actualKeys = append(actualKeys, key) + actualValues = append(actualValues, value) + return true + }) + ensure.DeepEqual(t, all, true) + ensure.DeepEqual(t, actualKeys, [][]byte{[]byte("keyA1"), []byte("keyA2"), []byte("keyA3"), []byte("keyA4")}) + ensure.DeepEqual(t, actualValues, [][]byte{[]byte("val_keyA1"), []byte("val_keyA2"), []byte("val_keyA3"), []byte("val_keyA4")}) + + actualKeys = nil + actualValues = nil + limit := 2 + all = manyKeys.Each(func(i int, key []byte, value []byte) bool { + actualKeys = append(actualKeys, key) + actualValues = append(actualValues, value) + return len(actualKeys) != limit + }) + ensure.DeepEqual(t, all, false) + ensure.DeepEqual(t, actualKeys, [][]byte{[]byte("keyA1"), []byte("keyA2")}) + ensure.DeepEqual(t, actualValues, [][]byte{[]byte("val_keyA1"), []byte("val_keyA2")}) + + manyKeys.Destroy() +} diff --git a/memory_usage.go b/memory_usage.go index 7b9a6ad6..740b877d 100644 --- a/memory_usage.go +++ b/memory_usage.go @@ -42,7 +42,7 @@ func GetApproximateMemoryUsageByType(dbs []*DB, caches []*Cache) (*MemoryUsage, var cErr *C.char memoryUsage := C.rocksdb_approximate_memory_usage_create(consumers, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } @@ -55,4 +55,4 @@ func GetApproximateMemoryUsageByType(dbs []*DB, caches []*Cache) (*MemoryUsage, CacheTotal: uint64(C.rocksdb_approximate_memory_usage_get_cache_total(memoryUsage)), } return result, nil -} +} \ No newline at end of file diff --git a/merge_operator.go b/merge_operator.go index 2de7f9ab..33f83948 100644 --- a/merge_operator.go +++ b/merge_operator.go @@ -28,14 +28,6 @@ type MergeOperator interface { // internal corruption. This will be treated as an error by the library. FullMerge(key, existingValue []byte, operands [][]byte) ([]byte, bool) - // The name of the MergeOperator. - Name() string -} - -// PartialMerger implements PartialMerge(key, leftOperand, rightOperand []byte) ([]byte, err) -// When a MergeOperator implements this interface, PartialMerge will be called in addition -// to FullMerge for compactions across levels -type PartialMerger interface { // This function performs merge(left_op, right_op) // when both the operands are themselves merge operation types // that you would have passed to a db.Merge() call in the same order @@ -50,28 +42,9 @@ type PartialMerger interface { // The library will internally keep track of the operations, and apply them in the // correct order once a base-value (a Put/Delete/End-of-Database) is seen. PartialMerge(key, leftOperand, rightOperand []byte) ([]byte, bool) -} -// MultiMerger implements PartialMergeMulti(key []byte, operands [][]byte) ([]byte, err) -// When a MergeOperator implements this interface, PartialMergeMulti will be called in addition -// to FullMerge for compactions across levels -type MultiMerger interface { - // PartialMerge performs merge on multiple operands - // when all of the operands are themselves merge operation types - // that you would have passed to a db.Merge() call in the same order - // (i.e.: db.Merge(key,operand[0]), followed by db.Merge(key,operand[1]), - // ... db.Merge(key, operand[n])). - // - // PartialMerge should combine them into a single merge operation. - // The return value should be constructed such that a call to - // db.Merge(key, new_value) would yield the same result as a call - // to db.Merge(key,operand[0]), followed by db.Merge(key,operand[1]), - // ... db.Merge(key, operand[n])). - // - // If it is impossible or infeasible to combine the operations, return false. - // The library will internally keep track of the operations, and apply them in the - // correct order once a base-value (a Put/Delete/End-of-Database) is seen. - PartialMergeMulti(key []byte, operands [][]byte) ([]byte, bool) + // The name of the MergeOperator. + Name() string } // NewNativeMergeOperator creates a MergeOperator object. @@ -137,22 +110,13 @@ func gorocksdb_mergeoperator_partial_merge_multi(idx int, cKey *C.char, cKeyLen success := true merger := mergeOperators.Get(idx).(mergeOperatorWrapper).mergeOperator - - // check if this MergeOperator supports partial or multi merges - switch v := merger.(type) { - case MultiMerger: - newValue, success = v.PartialMergeMulti(key, operands) - case PartialMerger: - leftOperand := operands[0] - for i := 1; i < int(cNumOperands); i++ { - newValue, success = v.PartialMerge(key, leftOperand, operands[i]) - if !success { - break - } - leftOperand = newValue + leftOperand := operands[0] + for i := 1; i < int(cNumOperands); i++ { + newValue, success = merger.PartialMerge(key, leftOperand, operands[i]) + if !success { + break } - default: - success = false + leftOperand = newValue } newValueLen := len(newValue) diff --git a/merge_operator_test.go b/merge_operator_test.go index 9dad6f78..fd7e0887 100644 --- a/merge_operator_test.go +++ b/merge_operator_test.go @@ -40,146 +40,15 @@ func TestMergeOperator(t *testing.T) { ensure.DeepEqual(t, v1.Data(), givenMerged) } -func TestPartialMergeOperator(t *testing.T) { - var ( - givenKey = []byte("hello") - startingVal = []byte("foo") - mergeVal1 = []byte("bar") - mergeVal2 = []byte("baz") - fMergeResult = []byte("foobarbaz") - pMergeResult = []byte("barbaz") - ) - - merger := &mockMergePartialOperator{ - fullMerge: func(key, existingValue []byte, operands [][]byte) ([]byte, bool) { - ensure.DeepEqual(&fatalAsError{t}, key, givenKey) - ensure.DeepEqual(&fatalAsError{t}, existingValue, startingVal) - ensure.DeepEqual(&fatalAsError{t}, operands[0], pMergeResult) - return fMergeResult, true - }, - partialMerge: func(key, leftOperand, rightOperand []byte) ([]byte, bool) { - ensure.DeepEqual(&fatalAsError{t}, key, givenKey) - ensure.DeepEqual(&fatalAsError{t}, leftOperand, mergeVal1) - ensure.DeepEqual(&fatalAsError{t}, rightOperand, mergeVal2) - return pMergeResult, true - }, - } - db := newTestDB(t, "TestMergeOperator", func(opts *Options) { - opts.SetMergeOperator(merger) - }) - defer db.Close() - - wo := NewDefaultWriteOptions() - defer wo.Destroy() - - // insert a starting value and compact to trigger merges - ensure.Nil(t, db.Put(wo, givenKey, startingVal)) - - // trigger a compaction to ensure that a merge is performed - db.CompactRange(Range{nil, nil}) - - // we expect these two operands to be passed to merge partial - ensure.Nil(t, db.Merge(wo, givenKey, mergeVal1)) - ensure.Nil(t, db.Merge(wo, givenKey, mergeVal2)) - - // trigger a compaction to ensure that a - // partial and full merge are performed - db.CompactRange(Range{nil, nil}) - - ro := NewDefaultReadOptions() - v1, err := db.Get(ro, givenKey) - defer v1.Free() - ensure.Nil(t, err) - ensure.DeepEqual(t, v1.Data(), fMergeResult) - -} - -func TestMergeMultiOperator(t *testing.T) { - var ( - givenKey = []byte("hello") - startingVal = []byte("foo") - mergeVal1 = []byte("bar") - mergeVal2 = []byte("baz") - fMergeResult = []byte("foobarbaz") - pMergeResult = []byte("barbaz") - ) - - merger := &mockMergeMultiOperator{ - fullMerge: func(key, existingValue []byte, operands [][]byte) ([]byte, bool) { - ensure.DeepEqual(&fatalAsError{t}, key, givenKey) - ensure.DeepEqual(&fatalAsError{t}, existingValue, startingVal) - ensure.DeepEqual(&fatalAsError{t}, operands[0], pMergeResult) - return fMergeResult, true - }, - partialMergeMulti: func(key []byte, operands [][]byte) ([]byte, bool) { - ensure.DeepEqual(&fatalAsError{t}, key, givenKey) - ensure.DeepEqual(&fatalAsError{t}, operands[0], mergeVal1) - ensure.DeepEqual(&fatalAsError{t}, operands[1], mergeVal2) - return pMergeResult, true - }, - } - db := newTestDB(t, "TestMergeOperator", func(opts *Options) { - opts.SetMergeOperator(merger) - }) - defer db.Close() - - wo := NewDefaultWriteOptions() - defer wo.Destroy() - - // insert a starting value and compact to trigger merges - ensure.Nil(t, db.Put(wo, givenKey, startingVal)) - - // trigger a compaction to ensure that a merge is performed - db.CompactRange(Range{nil, nil}) - - // we expect these two operands to be passed to merge multi - ensure.Nil(t, db.Merge(wo, givenKey, mergeVal1)) - ensure.Nil(t, db.Merge(wo, givenKey, mergeVal2)) - - // trigger a compaction to ensure that a - // partial and full merge are performed - db.CompactRange(Range{nil, nil}) - - ro := NewDefaultReadOptions() - v1, err := db.Get(ro, givenKey) - defer v1.Free() - ensure.Nil(t, err) - ensure.DeepEqual(t, v1.Data(), fMergeResult) - -} - -// Mock Objects type mockMergeOperator struct { - fullMerge func(key, existingValue []byte, operands [][]byte) ([]byte, bool) -} - -func (m *mockMergeOperator) Name() string { return "gorocksdb.test" } -func (m *mockMergeOperator) FullMerge(key, existingValue []byte, operands [][]byte) ([]byte, bool) { - return m.fullMerge(key, existingValue, operands) -} - -type mockMergeMultiOperator struct { - fullMerge func(key, existingValue []byte, operands [][]byte) ([]byte, bool) - partialMergeMulti func(key []byte, operands [][]byte) ([]byte, bool) -} - -func (m *mockMergeMultiOperator) Name() string { return "gorocksdb.multi" } -func (m *mockMergeMultiOperator) FullMerge(key, existingValue []byte, operands [][]byte) ([]byte, bool) { - return m.fullMerge(key, existingValue, operands) -} -func (m *mockMergeMultiOperator) PartialMergeMulti(key []byte, operands [][]byte) ([]byte, bool) { - return m.partialMergeMulti(key, operands) -} - -type mockMergePartialOperator struct { fullMerge func(key, existingValue []byte, operands [][]byte) ([]byte, bool) partialMerge func(key, leftOperand, rightOperand []byte) ([]byte, bool) } -func (m *mockMergePartialOperator) Name() string { return "gorocksdb.partial" } -func (m *mockMergePartialOperator) FullMerge(key, existingValue []byte, operands [][]byte) ([]byte, bool) { +func (m *mockMergeOperator) Name() string { return "gorocksdb.test" } +func (m *mockMergeOperator) FullMerge(key, existingValue []byte, operands [][]byte) ([]byte, bool) { return m.fullMerge(key, existingValue, operands) } -func (m *mockMergePartialOperator) PartialMerge(key, leftOperand, rightOperand []byte) ([]byte, bool) { +func (m *mockMergeOperator) PartialMerge(key, leftOperand, rightOperand []byte) ([]byte, bool) { return m.partialMerge(key, leftOperand, rightOperand) } diff --git a/options.go b/options.go index 07000215..f1259c69 100644 --- a/options.go +++ b/options.go @@ -60,15 +60,6 @@ const ( FatalInfoLogLevel = InfoLogLevel(4) ) -type WALRecoveryMode int - -const ( - TolerateCorruptedTailRecordsRecovery = WALRecoveryMode(0) - AbsoluteConsistencyRecovery = WALRecoveryMode(1) - PointInTimeRecovery = WALRecoveryMode(2) - SkipAnyCorruptedRecordsRecovery = WALRecoveryMode(3) -) - // Options represent all of the available options when opening a database with Open. type Options struct { c *C.rocksdb_options_t @@ -111,7 +102,7 @@ func GetOptionsFromString(base *Options, optStr string) (*Options, error) { newOpt := NewDefaultOptions() C.rocksdb_get_options_from_string(base.c, cOptStr, newOpt.c, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } @@ -234,7 +225,8 @@ func (opts *Options) SetParanoidChecks(value bool) { // // For example, you have a flash device with 10GB allocated for the DB, // as well as a hard drive of 2TB, you should config it to be: -// [{"/flash_path", 10GB}, {"/hard_drive", 2TB}] +// +// [{"/flash_path", 10GB}, {"/hard_drive", 2TB}] // // The system will try to guarantee data under each path is close to but // not larger than the target size. But current and future file sizes used @@ -342,7 +334,7 @@ func (opts *Options) OptimizeUniversalStyleCompaction(memtable_memory_budget uin // so you may wish to adjust this parameter to control memory usage. // Also, a larger write buffer will result in a longer recovery time // the next time the database is opened. -// Default: 64MB +// Default: 4MB func (opts *Options) SetWriteBufferSize(value int) { C.rocksdb_options_set_write_buffer_size(opts.c, C.size_t(value)) } @@ -485,17 +477,9 @@ func (opts *Options) SetLevel0StopWritesTrigger(value int) { C.rocksdb_options_set_level0_stop_writes_trigger(opts.c, C.int(value)) } -// SetMaxMemCompactionLevel sets the maximum level -// to which a new compacted memtable is pushed if it does not create overlap. -// -// We try to push to level 2 to avoid the -// relatively expensive level 0=>1 compactions and to avoid some -// expensive manifest file operations. We do not push all the way to -// the largest level since that can generate a lot of wasted disk -// space if the same key space is being repeatedly overwritten. -// Default: 2 +// SetMaxMemCompactionLevel is deprecated and has been removed in RocksDB v10.2.1 func (opts *Options) SetMaxMemCompactionLevel(value int) { - C.rocksdb_options_set_max_mem_compaction_level(opts.c, C.int(value)) + // This function is deprecated and removed in RocksDB v10.2.1 } // SetTargetFileSizeBase sets the target file size for compaction. @@ -540,17 +524,18 @@ func (opts *Options) SetMaxBytesForLevelMultiplier(value float64) { C.rocksdb_options_set_max_bytes_for_level_multiplier(opts.c, C.double(value)) } -// SetLevelCompactiondynamiclevelbytes specifies whether to pick +// SetLevelCompactionDynamicLevelBytes specifies whether to pick // target size of each level dynamically. // // We will pick a base level b >= 1. L0 will be directly merged into level b, // instead of always into level 1. Level 1 to b-1 need to be empty. // We try to pick b and its target size so that -// 1. target size is in the range of -// (max_bytes_for_level_base / max_bytes_for_level_multiplier, -// max_bytes_for_level_base] -// 2. target size of the last level (level num_levels-1) equals to extra size -// of the level. +// 1. target size is in the range of +// (max_bytes_for_level_base / max_bytes_for_level_multiplier, +// max_bytes_for_level_base] +// 2. target size of the last level (level num_levels-1) equals to extra size +// of the level. +// // At the same time max_bytes_for_level_multiplier and // max_bytes_for_level_multiplier_additional are still satisfied. // @@ -739,32 +724,19 @@ func (opts *Options) SetKeepLogFileNum(value int) { C.rocksdb_options_set_keep_log_file_num(opts.c, C.size_t(value)) } -// SetSoftRateLimit sets the soft rate limit. -// -// Puts are delayed 0-1 ms when any level has a compaction score that exceeds -// soft_rate_limit. This is ignored when == 0.0. -// CONSTRAINT: soft_rate_limit <= hard_rate_limit. If this constraint does not -// hold, RocksDB will set soft_rate_limit = hard_rate_limit -// Default: 0.0 (disabled) +// SetSoftRateLimit is deprecated and has been removed in RocksDB v10.2.1 func (opts *Options) SetSoftRateLimit(value float64) { - C.rocksdb_options_set_soft_rate_limit(opts.c, C.double(value)) + // This function is deprecated and removed in RocksDB v10.2.1 } -// SetHardRateLimit sets the hard rate limit. -// -// Puts are delayed 1ms at a time when any level has a compaction score that -// exceeds hard_rate_limit. This is ignored when <= 1.0. -// Default: 0.0 (disabled) +// SetHardRateLimit is deprecated and has been removed in RocksDB v10.2.1 func (opts *Options) SetHardRateLimit(value float64) { - C.rocksdb_options_set_hard_rate_limit(opts.c, C.double(value)) + // This function is deprecated and removed in RocksDB v10.2.1 } -// SetRateLimitDelayMaxMilliseconds sets the max time -// a put will be stalled when hard_rate_limit is enforced. -// If 0, then there is no limit. -// Default: 1000 +// SetRateLimitDelayMaxMilliseconds is deprecated and has been removed in RocksDB v10.2.1 func (opts *Options) SetRateLimitDelayMaxMilliseconds(value uint) { - C.rocksdb_options_set_rate_limit_delay_max_milliseconds(opts.c, C.uint(value)) + // This function is deprecated and removed in RocksDB v10.2.1 } // SetMaxManifestFileSize sets the maximal manifest file size until is rolled over. @@ -780,17 +752,9 @@ func (opts *Options) SetTableCacheNumshardbits(value int) { C.rocksdb_options_set_table_cache_numshardbits(opts.c, C.int(value)) } -// SetTableCacheRemoveScanCountLimit sets the count limit during a scan. -// -// During data eviction of table's LRU cache, it would be inefficient -// to strictly follow LRU because this piece of memory will not really -// be released unless its refcount falls to zero. Instead, make two -// passes: the first pass will release items with refcount = 1, -// and if not enough space releases after scanning the number of -// elements specified by this parameter, we will remove items in LRU order. -// Default: 16 +// SetTableCacheRemoveScanCountLimit is deprecated and has been removed in RocksDB v10.2.1 func (opts *Options) SetTableCacheRemoveScanCountLimit(value int) { - C.rocksdb_options_set_table_cache_remove_scan_count_limit(opts.c, C.int(value)) + // This function is deprecated and removed in RocksDB v10.2.1 } // SetArenaBlockSize sets the size of one block in arena memory allocation. @@ -810,28 +774,21 @@ func (opts *Options) SetDisableAutoCompactions(value bool) { C.rocksdb_options_set_disable_auto_compactions(opts.c, C.int(btoi(value))) } -// SetWALRecoveryMode sets the recovery mode -// -// Recovery mode to control the consistency while replaying WAL -// Default: TolerateCorruptedTailRecordsRecovery -func (opts *Options) SetWALRecoveryMode(mode WALRecoveryMode) { - C.rocksdb_options_set_wal_recovery_mode(opts.c, C.int(mode)) -} - // SetWALTtlSeconds sets the WAL ttl in seconds. // // The following two options affect how archived logs will be deleted. -// 1. If both set to 0, logs will be deleted asap and will not get into -// the archive. -// 2. If wal_ttl_seconds is 0 and wal_size_limit_mb is not 0, -// WAL files will be checked every 10 min and if total size is greater -// then wal_size_limit_mb, they will be deleted starting with the -// earliest until size_limit is met. All empty files will be deleted. -// 3. If wal_ttl_seconds is not 0 and wall_size_limit_mb is 0, then -// WAL files will be checked every wal_ttl_seconds / 2 and those that -// are older than wal_ttl_seconds will be deleted. -// 4. If both are not 0, WAL files will be checked every 10 min and both -// checks will be performed with ttl being first. +// 1. If both set to 0, logs will be deleted asap and will not get into +// the archive. +// 2. If wal_ttl_seconds is 0 and wal_size_limit_mb is not 0, +// WAL files will be checked every 10 min and if total size is greater +// then wal_size_limit_mb, they will be deleted starting with the +// earliest until size_limit is met. All empty files will be deleted. +// 3. If wal_ttl_seconds is not 0 and wall_size_limit_mb is 0, then +// WAL files will be checked every wal_ttl_seconds / 2 and those that +// are older than wal_ttl_seconds will be deleted. +// 4. If both are not 0, WAL files will be checked every 10 min and both +// checks will be performed with ttl being first. +// // Default: 0 func (opts *Options) SetWALTtlSeconds(value uint64) { C.rocksdb_options_set_WAL_ttl_seconds(opts.c, C.uint64_t(value)) @@ -846,13 +803,6 @@ func (opts *Options) SetWalSizeLimitMb(value uint64) { C.rocksdb_options_set_WAL_size_limit_MB(opts.c, C.uint64_t(value)) } -// SetEnablePipelinedWrite enables pipelined write -// -// Default: false -func (opts *Options) SetEnablePipelinedWrite(value bool) { - C.rocksdb_options_set_enable_pipelined_write(opts.c, boolToChar(value)) -} - // SetManifestPreallocationSize sets the number of bytes // to preallocate (via fallocate) the manifest files. // @@ -864,11 +814,9 @@ func (opts *Options) SetManifestPreallocationSize(value int) { C.rocksdb_options_set_manifest_preallocation_size(opts.c, C.size_t(value)) } -// SetPurgeRedundantKvsWhileFlush enable/disable purging of -// duplicate/deleted keys when a memtable is flushed to storage. -// Default: true +// SetPurgeRedundantKvsWhileFlush is deprecated and has been removed in RocksDB v10.2.1 func (opts *Options) SetPurgeRedundantKvsWhileFlush(value bool) { - C.rocksdb_options_set_purge_redundant_kvs_while_flush(opts.c, boolToChar(value)) + // This function is deprecated and removed in RocksDB v10.2.1 } // SetAllowMmapReads enable/disable mmap reads for reading sst tables. @@ -902,12 +850,9 @@ func (opts *Options) SetIsFdCloseOnExec(value bool) { C.rocksdb_options_set_is_fd_close_on_exec(opts.c, boolToChar(value)) } -// SetSkipLogErrorOnRecovery enable/disable skipping of -// log corruption error on recovery (If client is ok with -// losing most recent changes) -// Default: false +// SetSkipLogErrorOnRecovery is deprecated and has been removed in RocksDB v10.2.1 func (opts *Options) SetSkipLogErrorOnRecovery(value bool) { - C.rocksdb_options_set_skip_log_error_on_recovery(opts.c, boolToChar(value)) + // This function is deprecated and removed in RocksDB v10.2.1 } // SetStatsDumpPeriodSec sets the stats dump period in seconds. @@ -939,13 +884,9 @@ func (opts *Options) SetDbWriteBufferSize(value int) { C.rocksdb_options_set_db_write_buffer_size(opts.c, C.size_t(value)) } -// SetAccessHintOnCompactionStart specifies the file access pattern -// once a compaction is started. -// -// It will be applied to all input files of a compaction. -// Default: NormalCompactionAccessPattern +// SetAccessHintOnCompactionStart is deprecated and has been removed in RocksDB v10.2.1 func (opts *Options) SetAccessHintOnCompactionStart(value CompactionAccessPattern) { - C.rocksdb_options_set_access_hint_on_compaction_start(opts.c, C.int(value)) + // This function is deprecated and removed in RocksDB v10.2.1 } // SetUseAdaptiveMutex enable/disable adaptive mutex, which spins @@ -991,7 +932,7 @@ func (opts *Options) SetFIFOCompactionOptions(value *FIFOCompactionOptions) { // GetStatisticsString returns the statistics as a string. func (opts *Options) GetStatisticsString() string { sString := C.rocksdb_options_statistics_get_string(opts.c) - defer C.rocksdb_free(unsafe.Pointer(sString)) + defer C.free(unsafe.Pointer(sString)) return C.GoString(sString) } @@ -1036,7 +977,9 @@ func (opts *Options) SetInplaceUpdateNumLocks(value int) { // If <=0, it won't allocate from huge page but from malloc. // Users are responsible to reserve huge pages for it to be allocated. For // example: -// sysctl -w vm.nr_hugepages=20 +// +// sysctl -w vm.nr_hugepages=20 +// // See linux doc Documentation/vm/hugetlbpage.txt // If there isn't enough free huge page available, it will fall back to // malloc. @@ -1104,7 +1047,8 @@ func (opts *Options) SetMemtableVectorRep() { // bucketCount: number of fixed array buckets // skiplistHeight: the max height of the skiplist // skiplistBranchingFactor: probabilistic size ratio between adjacent -// link lists in the skiplist +// +// link lists in the skiplist func (opts *Options) SetHashSkipListRep(bucketCount int, skiplistHeight, skiplistBranchingFactor int32) { C.rocksdb_options_set_hash_skip_list_rep(opts.c, C.size_t(bucketCount), C.int32_t(skiplistHeight), C.int32_t(skiplistBranchingFactor)) } @@ -1127,16 +1071,33 @@ func (opts *Options) SetHashLinkListRep(bucketCount int) { // a linear search is used. // // keyLen: plain table has optimization for fix-sized keys, -// which can be specified via keyLen. +// +// which can be specified via keyLen. +// // bloomBitsPerKey: the number of bits used for bloom filer per prefix. You -// may disable it by passing a zero. +// +// may disable it by passing a zero. +// // hashTableRatio: the desired utilization of the hash table used for prefix -// hashing. hashTableRatio = number of prefixes / #buckets -// in the hash table +// +// hashing. hashTableRatio = number of prefixes / #buckets +// in the hash table +// // indexSparseness: inside each prefix, need to build one index record for how -// many keys for binary search inside each hash bucket. +// +// many keys for binary search inside each hash bucket. func (opts *Options) SetPlainTableFactory(keyLen uint32, bloomBitsPerKey int, hashTableRatio float64, indexSparseness int) { - C.rocksdb_options_set_plain_table_factory(opts.c, C.uint32_t(keyLen), C.int(bloomBitsPerKey), C.double(hashTableRatio), C.size_t(indexSparseness)) + C.rocksdb_options_set_plain_table_factory( + opts.c, + C.uint32_t(keyLen), + C.int(bloomBitsPerKey), + C.double(hashTableRatio), + C.size_t(indexSparseness), + 0, // huge_page_tlb_size (default 0) + C.char(0), // encoding_type (default nil) + C.uchar(0), // full_scan_mode (default false) + C.uchar(0), // store_index_in_file (default false) + ) } // SetCreateIfMissingColumnFamilies specifies whether the column families @@ -1166,44 +1127,15 @@ func (opts *Options) SetAllowIngestBehind(value bool) { C.rocksdb_options_set_allow_ingest_behind(opts.c, boolToChar(value)) } -// SetMemTablePrefixBloomSizeRatio sets memtable_prefix_bloom_size_ratio -// if prefix_extractor is set and memtable_prefix_bloom_size_ratio is not 0, -// create prefix bloom for memtable with the size of -// write_buffer_size * memtable_prefix_bloom_size_ratio. -// If it is larger than 0.25, it is sanitized to 0.25. -// -// Default: 0 (disable) -func (opts *Options) SetMemTablePrefixBloomSizeRatio(value float64) { - C.rocksdb_options_set_memtable_prefix_bloom_size_ratio(opts.c, C.double(value)) -} - -// SetOptimizeFiltersForHits sets optimize_filters_for_hits -// This flag specifies that the implementation should optimize the filters -// mainly for cases where keys are found rather than also optimize for keys -// missed. This would be used in cases where the application knows that -// there are very few misses or the performance in the case of misses is not -// important. -// -// For now, this flag allows us to not store filters for the last level i.e -// the largest level which contains data of the LSM store. For keys which -// are hits, the filters in this level are not useful because we will search -// for the data anyway. NOTE: the filters in other levels are still useful -// even for key hit because they tell us whether to look in that level or go -// to the higher level. -// -// Default: false -func (opts *Options) SetOptimizeFiltersForHits(value bool) { - C.rocksdb_options_set_optimize_filters_for_hits(opts.c, C.int(btoi(value))) -} - // Destroy deallocates the Options object. func (opts *Options) Destroy() { C.rocksdb_options_destroy(opts.c) if opts.ccmp != nil { C.rocksdb_comparator_destroy(opts.ccmp) } - // don't destroy the opts.cst here, it has already been - // associated with a PrefixExtractor and this will segfault + if opts.cst != nil { + C.rocksdb_slicetransform_destroy(opts.cst) + } if opts.ccf != nil { C.rocksdb_compactionfilter_destroy(opts.ccf) } diff --git a/options_block_based_table.go b/options_block_based_table.go index 80244132..345a10f9 100644 --- a/options_block_based_table.go +++ b/options_block_based_table.go @@ -56,14 +56,6 @@ func (opts *BlockBasedTableOptions) SetCacheIndexAndFilterBlocks(value bool) { C.rocksdb_block_based_options_set_cache_index_and_filter_blocks(opts.c, boolToChar(value)) } -// SetCacheIndexAndFilterBlocksWithHighPriority sets cache index and filter -// blocks with high priority (if cache_index_and_filter_blocks is enabled). -// If set to true, depending on implementation of block cache, -// index and filter blocks may be less likely to be evicted than data blocks. -func (opts *BlockBasedTableOptions) SetCacheIndexAndFilterBlocksWithHighPriority(value bool) { - C.rocksdb_block_based_options_set_cache_index_and_filter_blocks_with_high_priority(opts.c, boolToChar(value)) -} - // SetPinL0FilterAndIndexBlocksInCache sets cache_index_and_filter_blocks. // If is true and the below is true (hash_index_allow_collision), then // filter and index blocks are stored in the cache, but a reference is @@ -73,15 +65,6 @@ func (opts *BlockBasedTableOptions) SetPinL0FilterAndIndexBlocksInCache(value bo C.rocksdb_block_based_options_set_pin_l0_filter_and_index_blocks_in_cache(opts.c, boolToChar(value)) } -// SetPinTopLevelIndexAndFilter set that if cache_index_and_filter_blocks is true, then -// the top-level index of partitioned filter and index blocks are stored in -// the cache, but a reference is held in the "table reader" object so the -// blocks are pinned and only evicted from cache when the table reader is -// freed. This is not limited to l0 in LSM tree. -func (opts *BlockBasedTableOptions) SetPinTopLevelIndexAndFilter(value bool) { - C.rocksdb_block_based_options_set_pin_top_level_index_and_filter(opts.c, boolToChar(value)) -} - // SetBlockSize sets the approximate size of user data packed per block. // Note that the block size specified here corresponds opts uncompressed data. // The actual size of the unit read from disk may be smaller if @@ -111,39 +94,6 @@ func (opts *BlockBasedTableOptions) SetBlockRestartInterval(blockRestartInterval C.rocksdb_block_based_options_set_block_restart_interval(opts.c, C.int(blockRestartInterval)) } -// SetIndexBlockRestartInterval is the same as SetBlockRestartInterval but used for the index block. -// Default: 1 -func (opts *BlockBasedTableOptions) SetIndexBlockRestartInterval(indexBlockRestartInterval int) { - C.rocksdb_block_based_options_set_index_block_restart_interval(opts.c, C.int(indexBlockRestartInterval)) -} - -// SetMetadataBlockSize sets the block size for partitioned metadata. -// Currently applied to indexes when -// kTwoLevelIndexSearch is used and to filters when partition_filters is used. -// Note: Since in the current implementation the filters and index partitions -// are aligned, an index/filter block is created when either index or filter -// block size reaches the specified limit. -// Note: this limit is currently applied to only index blocks; a filter -// partition is cut right after an index block is cut -// Default: 4096 -func (opts *BlockBasedTableOptions) SetMetadataBlockSize(metadataBlockSize uint64) { - C.rocksdb_block_based_options_set_metadata_block_size(opts.c, C.uint64_t(metadataBlockSize)) -} - -// SetPartitionFilters sets using partitioned full filters for each SST file. -// This option is incompatible with block-based filters. -// Note: currently this option requires kTwoLevelIndexSearch to be set as well. -// Default: false -func (opts *BlockBasedTableOptions) SetPartitionFilters(value bool) { - C.rocksdb_block_based_options_set_partition_filters(opts.c, boolToChar(value)) -} - -// SetUseDeltaEncoding sets using delta encoding to compress keys in blocks. -// ReadOptions::pin_data requires this option to be disabled. -func (opts *BlockBasedTableOptions) SetUseDeltaEncoding(value bool) { - C.rocksdb_block_based_options_set_use_delta_encoding(opts.c, boolToChar(value)) -} - // SetFilterPolicy sets the filter policy opts reduce disk reads. // Many applications will benefit from passing the result of // NewBloomFilterPolicy() here. @@ -175,12 +125,9 @@ func (opts *BlockBasedTableOptions) SetBlockCache(cache *Cache) { C.rocksdb_block_based_options_set_block_cache(opts.c, cache.c) } -// SetBlockCacheCompressed sets the cache for compressed blocks. -// If nil, rocksdb will not use a compressed block cache. -// Default: nil +// SetBlockCacheCompressed is deprecated and has been removed in RocksDB v10.2.1 func (opts *BlockBasedTableOptions) SetBlockCacheCompressed(cache *Cache) { - opts.compCache = cache - C.rocksdb_block_based_options_set_block_cache_compressed(opts.c, cache.c) + // This function is deprecated and removed in RocksDB v10.2.1 } // SetWholeKeyFiltering specify if whole keys in the filter (not just prefixes) @@ -191,35 +138,6 @@ func (opts *BlockBasedTableOptions) SetWholeKeyFiltering(value bool) { C.rocksdb_block_based_options_set_whole_key_filtering(opts.c, boolToChar(value)) } -// SetFormatVersion sets the format version. -// We currently have five versions: -// 0 -- This version is currently written out by all RocksDB's versions by -// default. Can be read by really old RocksDB's. Doesn't support changing -// checksum (default is CRC32). -// 1 -- Can be read by RocksDB's versions since 3.0. Supports non-default -// checksum, like xxHash. It is written by RocksDB when -// BlockBasedTableOptions::checksum is something other than kCRC32c. (version -// 0 is silently upconverted) -// 2 -- Can be read by RocksDB's versions since 3.10. Changes the way we -// encode compressed blocks with LZ4, BZip2 and Zlib compression. If you -// don't plan to run RocksDB before version 3.10, you should probably use -// this. -// 3 -- Can be read by RocksDB's versions since 5.15. Changes the way we -// encode the keys in index blocks. If you don't plan to run RocksDB before -// version 5.15, you should probably use this. -// This option only affects newly written tables. When reading existing -// tables, the information about version is read from the footer. -// 4 -- Can be read by RocksDB's versions since 5.16. Changes the way we -// encode the values in index blocks. If you don't plan to run RocksDB before -// version 5.16 and you are using index_block_restart_interval > 1, you should -// probably use this as it would reduce the index size. -// This option only affects newly written tables. When reading existing -// tables, the information about version is read from the footer. -// Default: 2 -func (opts *BlockBasedTableOptions) SetFormatVersion(version int) { - C.rocksdb_block_based_options_set_format_version(opts.c, C.int(version)) -} - // SetIndexType sets the index type used for this table. // kBinarySearch: // A space efficient index block that is optimized for diff --git a/options_read.go b/options_read.go index 6a37cc48..a047c7ce 100644 --- a/options_read.go +++ b/options_read.go @@ -1,5 +1,6 @@ package gorocksdb +// #include // #include "rocksdb/c.h" import "C" import "unsafe" @@ -23,7 +24,8 @@ const ( // ReadOptions represent all of the available options when reading from a // database. type ReadOptions struct { - c *C.rocksdb_readoptions_t + c *C.rocksdb_readoptions_t + cIterateUpperBound *C.char } // NewDefaultReadOptions creates a default ReadOptions object. @@ -33,7 +35,7 @@ func NewDefaultReadOptions() *ReadOptions { // NewNativeReadOptions creates a ReadOptions object. func NewNativeReadOptions(c *C.rocksdb_readoptions_t) *ReadOptions { - return &ReadOptions{c} + return &ReadOptions{c: c} } // UnsafeGetReadOptions returns the underlying c read options object. @@ -48,17 +50,6 @@ func (opts *ReadOptions) SetVerifyChecksums(value bool) { C.rocksdb_readoptions_set_verify_checksums(opts.c, boolToChar(value)) } -// SetPrefixSameAsStart Enforce that the iterator only iterates over the same -// prefix as the seek. -// This option is effective only for prefix seeks, i.e. prefix_extractor is -// non-null for the column family and total_order_seek is false. Unlike -// iterate_upper_bound, prefix_same_as_start only works within a prefix -// but in both directions. -// Default: false -func (opts *ReadOptions) SetPrefixSameAsStart(value bool) { - C.rocksdb_readoptions_set_prefix_same_as_start(opts.c, boolToChar(value)) -} - // SetFillCache specify whether the "data block"/"index block"/"filter block" // read for this iteration should be cached in memory? // Callers may wish to set this field to false for bulk scans. @@ -104,9 +95,15 @@ func (opts *ReadOptions) SetTailing(value bool) { // implemented. // Default: nullptr func (opts *ReadOptions) SetIterateUpperBound(key []byte) { - cKey := byteToChar(key) + C.free(unsafe.Pointer(opts.cIterateUpperBound)) + if key == nil { + opts.cIterateUpperBound = nil + } else { + opts.cIterateUpperBound = cByteSlice(key) + } + cKeyLen := C.size_t(len(key)) - C.rocksdb_readoptions_set_iterate_upper_bound(opts.c, cKey, cKeyLen) + C.rocksdb_readoptions_set_iterate_upper_bound(opts.c, opts.cIterateUpperBound, cKeyLen) } // SetPinData specifies the value of "pin_data". If true, it keeps the blocks @@ -132,5 +129,6 @@ func (opts *ReadOptions) SetReadaheadSize(value uint64) { // Destroy deallocates the ReadOptions object. func (opts *ReadOptions) Destroy() { C.rocksdb_readoptions_destroy(opts.c) - opts.c = nil + C.free(unsafe.Pointer(opts.cIterateUpperBound)) + *opts = ReadOptions{} } diff --git a/slice.go b/slice.go index 707a1f2e..b450daa3 100644 --- a/slice.go +++ b/slice.go @@ -51,7 +51,7 @@ func (s *Slice) Exists() bool { // Free frees the slice data. func (s *Slice) Free() { if !s.freed { - C.rocksdb_free(unsafe.Pointer(s.data)) + C.free(unsafe.Pointer(s.data)) s.freed = true } } diff --git a/slice_transform.go b/slice_transform.go index 8b9b2362..e66e4d84 100644 --- a/slice_transform.go +++ b/slice_transform.go @@ -23,11 +23,6 @@ func NewFixedPrefixTransform(prefixLen int) SliceTransform { return NewNativeSliceTransform(C.rocksdb_slicetransform_create_fixed_prefix(C.size_t(prefixLen))) } -// NewNoopPrefixTransform creates a new no-op prefix transform. -func NewNoopPrefixTransform() SliceTransform { - return NewNativeSliceTransform(C.rocksdb_slicetransform_create_noop()) -} - // NewNativeSliceTransform creates a SliceTransform object. func NewNativeSliceTransform(c *C.rocksdb_slicetransform_t) SliceTransform { return nativeSliceTransform{c} diff --git a/slice_transform_test.go b/slice_transform_test.go index d60c7326..1c551183 100644 --- a/slice_transform_test.go +++ b/slice_transform_test.go @@ -35,13 +35,6 @@ func TestFixedPrefixTransformOpen(t *testing.T) { defer db.Close() } -func TestNewNoopPrefixTransform(t *testing.T) { - db := newTestDB(t, "TestNewNoopPrefixTransform", func(opts *Options) { - opts.SetPrefixExtractor(NewNoopPrefixTransform()) - }) - defer db.Close() -} - type testSliceTransform struct { initiated bool } diff --git a/sst_file_writer.go b/sst_file_writer.go index 54f2c139..0f4689c2 100644 --- a/sst_file_writer.go +++ b/sst_file_writer.go @@ -30,7 +30,7 @@ func (w *SSTFileWriter) Open(path string) error { defer C.free(unsafe.Pointer(cPath)) C.rocksdb_sstfilewriter_open(w.c, cPath, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -44,7 +44,7 @@ func (w *SSTFileWriter) Add(key, value []byte) error { var cErr *C.char C.rocksdb_sstfilewriter_add(w.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -55,7 +55,7 @@ func (w *SSTFileWriter) Finish() error { var cErr *C.char C.rocksdb_sstfilewriter_finish(w.c, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil diff --git a/staticflag_linux.go b/staticflag_linux.go index 3af044ef..6653c498 100644 --- a/staticflag_linux.go +++ b/staticflag_linux.go @@ -2,5 +2,5 @@ package gorocksdb -// #cgo LDFLAGS: -l:librocksdb.a -l:libstdc++.a -lm -ldl +// #cgo LDFLAGS: -l:librocksdb.a -l:libstdc++.a -l:libz.a -l:libbz2.a -l:libsnappy.a -lm import "C" diff --git a/transaction.go b/transaction.go index 67c9ef09..49a04bd7 100644 --- a/transaction.go +++ b/transaction.go @@ -26,7 +26,7 @@ func (transaction *Transaction) Commit() error { ) C.rocksdb_transaction_commit(transaction.c, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -40,7 +40,7 @@ func (transaction *Transaction) Rollback() error { C.rocksdb_transaction_rollback(transaction.c, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -57,7 +57,7 @@ func (transaction *Transaction) Get(opts *ReadOptions, key []byte) (*Slice, erro transaction.c, opts.c, cKey, C.size_t(len(key)), &cValLen, &cErr, ) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return NewSlice(cValue, cValLen), nil @@ -74,7 +74,7 @@ func (transaction *Transaction) GetForUpdate(opts *ReadOptions, key []byte) (*Sl transaction.c, opts.c, cKey, C.size_t(len(key)), &cValLen, C.uchar(byte(1)) /*exclusive*/, &cErr, ) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return NewSlice(cValue, cValLen), nil @@ -91,7 +91,7 @@ func (transaction *Transaction) Put(key, value []byte) error { transaction.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr, ) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -105,7 +105,7 @@ func (transaction *Transaction) Delete(key []byte) error { ) C.rocksdb_transaction_delete(transaction.c, cKey, C.size_t(len(key)), &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil diff --git a/transactiondb.go b/transactiondb.go index cfdeac9c..f5d2fd70 100644 --- a/transactiondb.go +++ b/transactiondb.go @@ -30,7 +30,7 @@ func OpenTransactionDb( db := C.rocksdb_transactiondb_open( opts.c, transactionDBOpts.c, cName, &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return &TransactionDB{ @@ -83,7 +83,7 @@ func (db *TransactionDB) Get(opts *ReadOptions, key []byte) (*Slice, error) { db.c, opts.c, cKey, C.size_t(len(key)), &cValLen, &cErr, ) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } return NewSlice(cValue, cValLen), nil @@ -100,7 +100,7 @@ func (db *TransactionDB) Put(opts *WriteOptions, key, value []byte) error { db.c, opts.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr, ) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -114,7 +114,7 @@ func (db *TransactionDB) Delete(opts *WriteOptions, key []byte) error { ) C.rocksdb_transactiondb_delete(db.c, opts.c, cKey, C.size_t(len(key)), &cErr) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return errors.New(C.GoString(cErr)) } return nil @@ -129,7 +129,7 @@ func (db *TransactionDB) NewCheckpoint() (*Checkpoint, error) { db.c, &cErr, ) if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) + defer C.free(unsafe.Pointer(cErr)) return nil, errors.New(C.GoString(cErr)) } diff --git a/util.go b/util.go index b6637ec3..f05cdcdd 100644 --- a/util.go +++ b/util.go @@ -1,7 +1,7 @@ package gorocksdb - // #include +// #include import "C" import ( "reflect" @@ -32,6 +32,19 @@ func charToByte(data *C.char, len C.size_t) []byte { return value } +// cBackedBytes returs a copy of the same byte slice which is backed by +// malloced memory. This should be freed using freeCBackedBytes. +func cBackedBytes(data []byte) []byte { + return charToByte(cByteSlice(data), C.size_t(len(data))) +} + +// freeCBackedBytes frees a byte slice created by cBackedBytes +func freeCBackedBytes(data []byte) { + sH := (*reflect.SliceHeader)(unsafe.Pointer(&data)) + C.free(unsafe.Pointer(sH.Data)) + +} + // byteToChar returns *C.char from byte slice. func byteToChar(b []byte) *C.char { var c *C.char diff --git a/wal_iterator.go b/wal_iterator.go deleted file mode 100755 index 7805d7c9..00000000 --- a/wal_iterator.go +++ /dev/null @@ -1,49 +0,0 @@ -package gorocksdb - -// #include -// #include "rocksdb/c.h" -import "C" -import ( - "errors" - "unsafe" -) - -type WalIterator struct { - c *C.rocksdb_wal_iterator_t -} - -func NewNativeWalIterator(c unsafe.Pointer) *WalIterator { - return &WalIterator{(*C.rocksdb_wal_iterator_t)(c)} -} - -func (iter *WalIterator) Valid() bool { - return C.rocksdb_wal_iter_valid(iter.c) != 0 -} - -func (iter *WalIterator) Next() { - C.rocksdb_wal_iter_next(iter.c) -} - -func (iter *WalIterator) Err() error { - var cErr *C.char - C.rocksdb_wal_iter_status(iter.c, &cErr) - if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) - return errors.New(C.GoString(cErr)) - } - return nil -} - -func (iter *WalIterator) Destroy() { - C.rocksdb_wal_iter_destroy(iter.c) - iter.c = nil -} - -// C.rocksdb_wal_iter_get_batch in the official rocksdb c wrapper has memory leak -// see https://github.com/facebook/rocksdb/pull/5515 -// https://github.com/facebook/rocksdb/issues/5536 -func (iter *WalIterator) GetBatch() (*WriteBatch, uint64) { - var cSeq C.uint64_t - cB := C.rocksdb_wal_iter_get_batch(iter.c, &cSeq) - return NewNativeWriteBatch(cB), uint64(cSeq) -} diff --git a/write_batch.go b/write_batch.go index f894427b..33c1043a 100644 --- a/write_batch.go +++ b/write_batch.go @@ -1,6 +1,7 @@ package gorocksdb // #include "rocksdb/c.h" +// #include "gorocksdb.h" import "C" import ( "errors" @@ -9,7 +10,8 @@ import ( // WriteBatch is a batching of Puts, Merges and Deletes. type WriteBatch struct { - c *C.rocksdb_writebatch_t + c *C.rocksdb_writebatch_t + charsSlices []charsSlice } // NewWriteBatch create a WriteBatch object. @@ -19,7 +21,7 @@ func NewWriteBatch() *WriteBatch { // NewNativeWriteBatch create a WriteBatch object. func NewNativeWriteBatch(c *C.rocksdb_writebatch_t) *WriteBatch { - return &WriteBatch{c} + return &WriteBatch{c: c} } // WriteBatchFrom creates a write batch from a serialized WriteBatch. @@ -41,10 +43,40 @@ func (wb *WriteBatch) PutCF(cf *ColumnFamilyHandle, key, value []byte) { C.rocksdb_writebatch_put_cf(wb.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value))) } -// Append a blob of arbitrary size to the records in this batch. -func (wb *WriteBatch) PutLogData(blob []byte) { - cBlob := byteToChar(blob) - C.rocksdb_writebatch_put_log_data(wb.c, cBlob, C.size_t(len(blob))) +// PutMany queues many key and value pairs +func (wb *WriteBatch) PutMany(keys, values [][]byte) error { + if len(keys) != len(values) { + return errors.New("Number of keys and values should be the same") + } + numPairs := C.size_t(len(keys)) + cKeys, cKeySizes := byteSlicesToCSlices(keys) + cValues, cValueSizes := byteSlicesToCSlices(values) + wb.charsSlices = append(wb.charsSlices, cKeys, cValues) + C.gorocksdb_writebatch_put_many( + wb.c, + numPairs, + cKeys.c(), cKeySizes.c(), + cValues.c(), cValueSizes.c(), + ) + return nil +} + +// PutManyCF queues many key and value pairs in a column family +func (wb *WriteBatch) PutManyCF(cf *ColumnFamilyHandle, keys, values [][]byte) error { + if len(keys) != len(values) { + return errors.New("Number of keys and values should be the same") + } + numPairs := C.size_t(len(keys)) + cKeys, cKeySizes := byteSlicesToCSlices(keys) + cValues, cValueSizes := byteSlicesToCSlices(values) + wb.charsSlices = append(wb.charsSlices, cKeys, cValues) + C.gorocksdb_writebatch_put_many_cf( + wb.c, cf.c, + numPairs, + cKeys.c(), cKeySizes.c(), + cValues.c(), cValueSizes.c(), + ) + return nil } // Merge queues a merge of "value" with the existing value of "key". @@ -119,6 +151,9 @@ func (wb *WriteBatch) Clear() { func (wb *WriteBatch) Destroy() { C.rocksdb_writebatch_destroy(wb.c) wb.c = nil + for _, slice := range wb.charsSlices { + slice.Destroy() + } } // WriteBatchRecordType describes the type of a batch record. diff --git a/write_batch_test.go b/write_batch_test.go index 72eeb36e..f1c7a918 100644 --- a/write_batch_test.go +++ b/write_batch_test.go @@ -53,6 +53,43 @@ func TestWriteBatch(t *testing.T) { ensure.True(t, v1.Data() == nil) } +func TestWriteBatchPutMany(t *testing.T) { + db := newTestDB(t, "TestWriteBatchPutMany", nil) + defer db.Close() + + var ( + key1 = []byte("key1") + val1 = []byte("val1") + key2 = []byte("key22") + val2 = []byte("val22") + ) + wo := NewDefaultWriteOptions() + defer wo.Destroy() + + // create and fill the write batch + keys := [][]byte{key1, key2} + values := [][]byte{val1, val2} + wb := NewWriteBatch() + defer wb.Destroy() + wb.PutMany(keys, values) + // ensure.DeepEqual(t, wb.Count(), 2) + + // perform the batch + ensure.Nil(t, db.Write(wo, wb)) + + // check changes + ro := NewDefaultReadOptions() + v1, err := db.Get(ro, key1) + defer v1.Free() + ensure.Nil(t, err) + ensure.DeepEqual(t, v1.Data(), val1) + + v2, err := db.Get(ro, key2) + defer v2.Free() + ensure.Nil(t, err) + ensure.DeepEqual(t, v2.Data(), val2) +} + func TestWriteBatchIterator(t *testing.T) { db := newTestDB(t, "TestWriteBatchIterator", nil) defer db.Close()