From 66753fb70cee727033fb2f2dc2a900953ea3173b Mon Sep 17 00:00:00 2001 From: Melvin Junhee Woo Date: Mon, 21 Sep 2020 13:29:26 +0900 Subject: [PATCH] file.go: Added SaveToFileConcurrentWithTimeout --- file.go | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 75 insertions(+), 4 deletions(-) diff --git a/file.go b/file.go index bab5484..2ed4918 100644 --- a/file.go +++ b/file.go @@ -1,7 +1,9 @@ package fastcache import ( + "context" "encoding/binary" + "errors" "fmt" "io" "io/ioutil" @@ -9,6 +11,7 @@ import ( "path/filepath" "regexp" "runtime" + "time" "github.com/golang/snappy" ) @@ -60,7 +63,7 @@ func (c *Cache) SaveToFileConcurrent(filePath string, concurrency int) error { if concurrency <= 0 || concurrency > gomaxprocs { concurrency = gomaxprocs } - if err := c.save(tmpDir, concurrency); err != nil { + if err := c.save(tmpDir, concurrency, context.Background()); err != nil { return fmt.Errorf("cannot save cache data to temporary dir %q: %s", tmpDir, err) } @@ -76,6 +79,67 @@ func (c *Cache) SaveToFileConcurrent(filePath string, concurrency int) error { return nil } +// SaveToFileConcurrentWithTimeout basically does the same things which SaveToFileConcurrent does. +// +// The difference is that SaveToFileConcurrentWithTimeout accepts timeout parameter to +// limit the time for saving the data when cached data is too huge. +// +// The saved data may be loaded with LoadFromFile*. +// +// See also SaveToFileConcurrent. +func (c *Cache) SaveToFileConcurrentWithTimeout(filePath string, concurrency int, timeout time.Duration) error { + ctx, cancelFn := context.WithTimeout(context.Background(), timeout) + defer cancelFn() + + // Create dir if it doesn't exist. + dir := filepath.Dir(filePath) + if _, err := os.Stat(dir); err != nil { + if !os.IsNotExist(err) { + return fmt.Errorf("cannot stat %q: %s", dir, err) + } + if err := os.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("cannot create dir %q: %s", dir, err) + } + } + + // Save cache data into a temporary directory. + tmpDir, err := ioutil.TempDir(dir, "fastcache.tmp.") + if err != nil { + return fmt.Errorf("cannot create temporary dir inside %q: %s", dir, err) + } + defer func() { + if tmpDir != "" { + _ = os.RemoveAll(tmpDir) + } + }() + gomaxprocs := runtime.GOMAXPROCS(-1) + if concurrency <= 0 || concurrency > gomaxprocs { + concurrency = gomaxprocs + } + timedOut := false + if err := c.save(tmpDir, concurrency, ctx); err != nil && err != ErrTimeout { + return fmt.Errorf("cannot save cache data to temporary dir %q: %s", tmpDir, err) + } else if err == ErrTimeout { + timedOut = true + } + + // Remove old filePath contents, since os.Rename may return + // error if filePath dir exists. + if err := os.RemoveAll(filePath); err != nil { + return fmt.Errorf("cannot remove old contents at %q: %s", filePath, err) + } + if err := os.Rename(tmpDir, filePath); err != nil { + return fmt.Errorf("cannot move temporary dir %q to %q: %s", tmpDir, filePath, err) + } + tmpDir = "" + + // To notice the caller that saving has been stopped due to the time limit. + if timedOut { + return ErrTimeout + } + return nil +} + // LoadFromFile loads cache data from the given filePath. // // See SaveToFile* for saving cache data to file. @@ -95,7 +159,7 @@ func LoadFromFileOrNew(filePath string, maxBytes int) *Cache { return New(maxBytes) } -func (c *Cache) save(dir string, workersCount int) error { +func (c *Cache) save(dir string, workersCount int, ctx context.Context) error { if err := saveMetadata(c, dir); err != nil { return err } @@ -105,7 +169,7 @@ func (c *Cache) save(dir string, workersCount int) error { results := make(chan error) for i := 0; i < workersCount; i++ { go func(workerNum int) { - results <- saveBuckets(c.buckets[:], workCh, dir, workerNum) + results <- saveBuckets(c.buckets[:], workCh, dir, workerNum, ctx) }(i) } // Feed workers with work @@ -223,7 +287,9 @@ func loadMetadata(dir string) (uint64, error) { var dataFileRegexp = regexp.MustCompile(`^data\.\d+\.bin$`) -func saveBuckets(buckets []bucket, workCh <-chan int, dir string, workerNum int) error { +var ErrTimeout = errors.New("spent allowed time") + +func saveBuckets(buckets []bucket, workCh <-chan int, dir string, workerNum int, ctx context.Context) error { dataPath := fmt.Sprintf("%s/data.%d.bin", dir, workerNum) dataFile, err := os.Create(dataPath) if err != nil { @@ -234,6 +300,11 @@ func saveBuckets(buckets []bucket, workCh <-chan int, dir string, workerNum int) }() zw := snappy.NewBufferedWriter(dataFile) for bucketNum := range workCh { + select { + case <-ctx.Done(): + return ErrTimeout + default: + } if err := writeUint64(zw, uint64(bucketNum)); err != nil { return fmt.Errorf("cannot write bucketNum=%d to %q: %s", bucketNum, dataPath, err) }