From 6e344faefb4ec99b4632c384c3e7d728f2754c4f Mon Sep 17 00:00:00 2001 From: Elbandi Date: Tue, 22 May 2018 22:14:30 +0200 Subject: [PATCH 1/5] Allow change block size --- gsync.go | 6 +++--- gsync_server.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/gsync.go b/gsync.go index 6725754..e478ee5 100644 --- a/gsync.go +++ b/gsync.go @@ -7,9 +7,9 @@ package gsync import "sync" -const ( +var ( // DefaultBlockSize is the default block size. - DefaultBlockSize = 6 * 1024 // 6kb + BlockSize = int64(6 * 1024) // 6kb ) // Rolling checksum is up to 16 bit length for simplicity and speed. @@ -69,7 +69,7 @@ type BlockOperation struct { var bufferPool = sync.Pool{ New: func() interface{} { - b := make([]byte, DefaultBlockSize) + b := make([]byte, BlockSize) return &b }, } diff --git a/gsync_server.go b/gsync_server.go index 3d4d3f9..4a47c78 100644 --- a/gsync_server.go +++ b/gsync_server.go @@ -115,7 +115,7 @@ func Apply(ctx context.Context, dst io.Writer, cache io.ReaderAt, ops <-chan Blo } index := int64(o.Index) - n, err := cache.ReadAt(buffer, (index * DefaultBlockSize)) + n, err := cache.ReadAt(buffer, (index * BlockSize)) if err != nil && err != io.EOF { return errors.Wrapf(err, "failed reading cached block") } From 5fabf4d3bbcdc57728b259ad7131eb3d5d9c5322 Mon Sep 17 00:00:00 2001 From: Elbandi Date: Sat, 26 May 2018 23:40:43 +0200 Subject: [PATCH 2/5] Try reading with blocks --- gsync.go | 2 +- gsync_client.go | 143 ++++++++++++++++++------------------------------ gsync_server.go | 2 +- 3 files changed, 54 insertions(+), 93 deletions(-) diff --git a/gsync.go b/gsync.go index e478ee5..2c04d04 100644 --- a/gsync.go +++ b/gsync.go @@ -9,7 +9,7 @@ import "sync" var ( // DefaultBlockSize is the default block size. - BlockSize = int64(6 * 1024) // 6kb + BlockSize = int(6 * 1024) // 6kb ) // Rolling checksum is up to 16 bit length for simplicity and speed. diff --git a/gsync_client.go b/gsync_client.go index d47295a..3f8dd5a 100644 --- a/gsync_client.go +++ b/gsync_client.go @@ -43,7 +43,7 @@ func LookUpTable(ctx context.Context, bc <-chan BlockSignature) (map[uint32][]Bl // so this function is expected to be called once the remote blocks map is fully populated. // // The caller must make sure the concrete reader instance is not nil or this function will panic. -func Sync(ctx context.Context, r io.ReaderAt, shash hash.Hash, remote map[uint32][]BlockSignature) (<-chan BlockOperation, error) { +func Sync(ctx context.Context, r io.Reader, shash hash.Hash, remote map[uint32][]BlockSignature) (<-chan BlockOperation, error) { if r == nil { return nil, errors.New("gsync: reader required") } @@ -55,13 +55,15 @@ func Sync(ctx context.Context, r io.ReaderAt, shash hash.Hash, remote map[uint32 } go func() { + var ( r1, r2, rhash, old uint32 - offset int64 + offset, max, delta int rolling, match bool + err error ) - - delta := make([]byte, 0) + bufferSize := 3 * BlockSize + buffer := make([]byte, bufferSize) defer func() { close(o) @@ -79,39 +81,53 @@ func Sync(ctx context.Context, r io.ReaderAt, shash hash.Hash, remote map[uint32 break } - bfp := bufferPool.Get().(*[]byte) - buffer := *bfp + for offset > max-BlockSize { + if err == io.EOF { + // If EOF is reached and not match data found, we add trailing data + // to delta array. + left := max - offset + delta + for left >= BlockSize { + o <- BlockOperation{Data: append([]byte(nil), buffer[max-left:max-left+BlockSize]...)} + left -= BlockSize - n, err := r.ReadAt(buffer, offset) - if err != nil && err != io.EOF { - o <- BlockOperation{ - Error: errors.Wrapf(err, "failed reading data block"), + } + if left > 0 { + o <- BlockOperation{Data: append([]byte(nil), buffer[max-left:max]...)} + } + return } - bufferPool.Put(bfp) - // return since data corruption in the server is possible and a re-sync is required. - return + var n int + left := copy(buffer[:], buffer[offset-delta:max]) + n, err = r.Read(buffer[left:]) + if err != nil && err != io.EOF { + o <- BlockOperation{ + Error: errors.Wrapf(err, "failed reading data block"), + } + // return since data corruption in the server is possible and a re-sync is required. + return + } + offset = delta + max = left + n } - - block := buffer[:n] - // If there are no block signatures from remote server, send all data blocks if len(remote) == 0 { - if n > 0 { - o <- BlockOperation{Data: block} - offset += int64(n) - } - - if err == io.EOF { - bufferPool.Put(bfp) - return + for max-offset >= BlockSize { + o <- BlockOperation{Data: append([]byte(nil), buffer[offset:offset+BlockSize]...)} + offset += BlockSize } continue } + left := BlockSize + if max-offset < BlockSize { + // FIXME: is this called? + left = max - offset + } + block := buffer[offset:offset+left] if rolling { - new := uint32(block[n-1]) - r1, r2, rhash = rollingHash2(uint32(n), r1, r2, old, new) + new := uint32(buffer[offset+left-1]) + r1, r2, rhash = rollingHash2(uint32(left), r1, r2, old, new) } else { r1, r2, rhash = rollingHash(block) } @@ -129,9 +145,9 @@ func Sync(ctx context.Context, r io.ReaderAt, shash hash.Hash, remote map[uint32 match = true // We need to send deltas before sending an index token. - if len(delta) > 0 { - send(ctx, bytes.NewReader(delta), o) - delta = make([]byte, 0) + if delta > 0 { + o <- BlockOperation{Data: append([]byte(nil), buffer[offset-delta:offset]...)} + delta = 0 } // instructs the server to copy block data at offset b.Index @@ -142,76 +158,21 @@ func Sync(ctx context.Context, r io.ReaderAt, shash hash.Hash, remote map[uint32 } if match { - if err == io.EOF { - bufferPool.Put(bfp) - break - } - rolling, match = false, false old, rhash, r1, r2 = 0, 0, 0, 0 - offset += int64(n) + offset += left } else { - if err == io.EOF { - // If EOF is reached and not match data found, we add trailing data - // to delta array. - delta = append(delta, block...) - if len(delta) > 0 { - send(ctx, bytes.NewReader(delta), o) - } - bufferPool.Put(bfp) - break - } rolling = true - old = uint32(block[0]) - delta = append(delta, block[0]) + old = uint32(buffer[offset]) + delta++ offset++ + if delta >= BlockSize { + o <- BlockOperation{Data: append([]byte(nil), buffer[offset-delta:offset-delta+BlockSize]...)} + delta -= BlockSize + } } - - // Returning this buffer to the pool here gives us 5x more speed - bufferPool.Put(bfp) } }() return o, nil } - -// send sends all deltas over the channel. Any error is reported back using the -// same channel. -func send(ctx context.Context, r io.Reader, o chan<- BlockOperation) { - for { - // Allow for cancellation. - select { - case <-ctx.Done(): - o <- BlockOperation{ - Error: ctx.Err(), - } - return - default: - // break out of the select block and continue reading - break - } - - bfp := bufferPool.Get().(*[]byte) - buffer := *bfp - defer bufferPool.Put(bfp) - - n, err := r.Read(buffer) - if err != nil && err != io.EOF { - o <- BlockOperation{ - Error: errors.Wrapf(err, "failed reading data block"), - } - return - } - - // If we don't guard against 0 bytes reads, an operation with index 0 will be sent - // and the server will duplicate block 0 at the end of the reconstructed file. - if n > 0 { - block := buffer[:n] - o <- BlockOperation{Data: block} - } - - if err == io.EOF { - break - } - } -} diff --git a/gsync_server.go b/gsync_server.go index 4a47c78..25db25e 100644 --- a/gsync_server.go +++ b/gsync_server.go @@ -115,7 +115,7 @@ func Apply(ctx context.Context, dst io.Writer, cache io.ReaderAt, ops <-chan Blo } index := int64(o.Index) - n, err := cache.ReadAt(buffer, (index * BlockSize)) + n, err := cache.ReadAt(buffer, (index * int64(BlockSize))) if err != nil && err != io.EOF { return errors.Wrapf(err, "failed reading cached block") } From f4fa75b6d70c469c19f6c418957a9360c21e5699 Mon Sep 17 00:00:00 2001 From: Elbandi Date: Sun, 27 May 2018 00:43:02 +0200 Subject: [PATCH 3/5] Bigger read buffer --- gsync_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gsync_client.go b/gsync_client.go index 3f8dd5a..fec6366 100644 --- a/gsync_client.go +++ b/gsync_client.go @@ -62,7 +62,7 @@ func Sync(ctx context.Context, r io.Reader, shash hash.Hash, remote map[uint32][ rolling, match bool err error ) - bufferSize := 3 * BlockSize + bufferSize := 16 * BlockSize buffer := make([]byte, bufferSize) defer func() { From c59b33af58b63a9a1cc98b82052aec0b1622f636 Mon Sep 17 00:00:00 2001 From: Elbandi Date: Tue, 29 May 2018 00:08:14 +0200 Subject: [PATCH 4/5] Calculate the readed data hash --- gsync_client.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/gsync_client.go b/gsync_client.go index fec6366..ee5188e 100644 --- a/gsync_client.go +++ b/gsync_client.go @@ -43,7 +43,7 @@ func LookUpTable(ctx context.Context, bc <-chan BlockSignature) (map[uint32][]Bl // so this function is expected to be called once the remote blocks map is fully populated. // // The caller must make sure the concrete reader instance is not nil or this function will panic. -func Sync(ctx context.Context, r io.Reader, shash hash.Hash, remote map[uint32][]BlockSignature) (<-chan BlockOperation, error) { +func Sync(ctx context.Context, r io.Reader, shash hash.Hash, datahash hash.Hash, remote map[uint32][]BlockSignature) (<-chan BlockOperation, error) { if r == nil { return nil, errors.New("gsync: reader required") } @@ -109,6 +109,9 @@ func Sync(ctx context.Context, r io.Reader, shash hash.Hash, remote map[uint32][ } offset = delta max = left + n + if datahash != nil { + datahash.Write(buffer[left:max]) + } } // If there are no block signatures from remote server, send all data blocks if len(remote) == 0 { From b69e1728bc846d24ef26dae839d2e5afed156280 Mon Sep 17 00:00:00 2001 From: Andras Elso Date: Sun, 17 May 2020 20:15:11 +0200 Subject: [PATCH 5/5] Calculate written data hash --- gsync_server.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/gsync_server.go b/gsync_server.go index 25db25e..107fdce 100644 --- a/gsync_server.go +++ b/gsync_server.go @@ -86,7 +86,7 @@ func Signatures(ctx context.Context, r io.Reader, shash hash.Hash) (<-chan Block } // Apply reconstructs a file given a set of operations. The caller must close the ops channel or the context when done or there will be a deadlock. -func Apply(ctx context.Context, dst io.Writer, cache io.ReaderAt, ops <-chan BlockOperation) error { +func Apply(ctx context.Context, dst io.Writer, cache io.ReaderAt, datahash hash.Hash, ops <-chan BlockOperation) error { bfp := bufferPool.Get().(*[]byte) buffer := *bfp defer bufferPool.Put(bfp) @@ -122,7 +122,9 @@ func Apply(ctx context.Context, dst io.Writer, cache io.ReaderAt, ops <-chan Blo block = buffer[:n] } - + if datahash != nil { + datahash.Write(block) + } _, err := dst.Write(block) if err != nil { return errors.Wrapf(err, "failed writing block to destination")