8554: Add EmptyTrash api to Volume and implementation in volume_unix. Add emptyTrash...
authorradhika <radhika@curoverse.com>
Wed, 9 Mar 2016 16:29:57 +0000 (11:29 -0500)
committerradhika <radhika@curoverse.com>
Wed, 9 Mar 2016 16:29:57 +0000 (11:29 -0500)
services/keepstore/azure_blob_volume.go
services/keepstore/keepstore.go
services/keepstore/s3_volume.go
services/keepstore/volume.go
services/keepstore/volume_generic_test.go
services/keepstore/volume_test.go
services/keepstore/volume_unix.go

index 7dfb84d109957af05f101f01c4dbc94e074457d3..d096dc69c3a9ee6be3ab9a960bc63e69ab202641 100644 (file)
@@ -395,3 +395,10 @@ var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
        return keepBlockRegexp.MatchString(s)
 }
+
+// EmptyTrash looks for trashed blocks that exceeded trashLifetime
+// and deletes them from the volume.
+// TBD
+func (v *AzureBlobVolume) EmptyTrash() error {
+       return nil
+}
index 104ae89ba061c9554f355c4e8871c50ad461d9a7..086162368fe58ae238a6d939887bd96b9f20c00a 100644 (file)
@@ -55,9 +55,13 @@ var dataManagerToken string
 // actually deleting anything.
 var neverDelete = true
 
-// trashLifetime is the time duration after a block is trashed
+// trashLifetime is the time duration in seconds after a block is trashed
 // during which it can be recovered using an /untrash request
-var trashLifetime time.Duration
+var trashLifetime int
+
+// Interval in seconds at which the emptyTrash goroutine will check
+// and delete expired trashed blocks. Default is once a day.
+var trashCheckInterval int
 
 var maxBuffers = 128
 var bufs *bufferPool
@@ -205,11 +209,16 @@ func main() {
                "max-buffers",
                maxBuffers,
                fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
-       flag.DurationVar(
+       flag.IntVar(
                &trashLifetime,
                "trash-lifetime",
-               0*time.Second,
+               0,
                "Interval in seconds after a block is trashed during which it can be recovered using an /untrash request")
+       flag.IntVar(
+               &trashCheckInterval,
+               "trash-check-interval",
+               24*60*60,
+               "Interval in seconds at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
 
        flag.Parse()
 
@@ -321,10 +330,14 @@ func main() {
        trashq = NewWorkQueue()
        go RunTrashWorker(trashq)
 
+       // Start emptyTrash goroutine
+       go emptyTrash(trashCheckInterval)
+
        // Shut down the server gracefully (by closing the listener)
        // if SIGTERM is received.
        term := make(chan os.Signal, 1)
        go func(sig <-chan os.Signal) {
+               doneEmptyingTrash <- true
                s := <-sig
                log.Println("caught signal:", s)
                listener.Close()
@@ -336,3 +349,25 @@ func main() {
        srv := &http.Server{Addr: listen}
        srv.Serve(listener)
 }
+
+// Channel to stop emptying trash
+var doneEmptyingTrash = make(chan bool)
+
+// At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
+func emptyTrash(trashCheckInterval int) {
+       ticker := time.NewTicker(time.Duration(trashCheckInterval) * time.Second)
+
+       for {
+               select {
+               case <-ticker.C:
+                       for _, v := range volumes {
+                               if v.Writable() {
+                                       v.EmptyTrash()
+                               }
+                       }
+               case <-doneEmptyingTrash:
+                       ticker.Stop()
+                       return
+               }
+       }
+}
index 7d9ba8ab9ef33bf46888566c6d0c6ae333dba9ae..5bcab1d398fcb4ef77b58ce03f5b6a8a6e07457c 100644 (file)
@@ -260,6 +260,7 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
        return nil
 }
 
+// Trash a Keep block.
 func (v *S3Volume) Trash(loc string) error {
        if v.readonly {
                return MethodDisabledError
@@ -321,3 +322,10 @@ func (v *S3Volume) translateError(err error) error {
        }
        return err
 }
+
+// EmptyTrash looks for trashed blocks that exceeded trashLifetime
+// and deletes them from the volume.
+// TBD
+func (v *S3Volume) EmptyTrash() error {
+       return nil
+}
index 58710c04b269a57af236fbb36f5a6aaa61d9b256..bec1ee6f7fc9a8b4f94a19c1b0d3a09f2a6003f4 100644 (file)
@@ -204,6 +204,10 @@ type Volume interface {
        // underlying device. It will be passed on to clients in
        // responses to PUT requests.
        Replication() int
+
+       // EmptyTrash looks for trashed blocks that exceeded trashLifetime
+       // and deletes them from the volume.
+       EmptyTrash() error
 }
 
 // A VolumeManager tells callers which volumes can read, which volumes
index 5810411c89bca1d3a31b3d03f65748456fec78db..c614a08521a723ac0c583bacbb854ddfc2c2516c 100644 (file)
@@ -78,6 +78,9 @@ func DoGenericVolumeTests(t TB, factory TestableVolumeFactory) {
        testPutFullBlock(t, factory)
 
        testTrashUntrash(t, factory)
+       testEmptyTrashTrashLifetime0s(t, factory)
+       testEmptyTrashTrashLifetime3600s(t, factory)
+       testEmptyTrashTrashLifetime1s(t, factory)
 }
 
 // Put a test block, get it and verify content
@@ -710,7 +713,7 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
                trashLifetime = 0
        }()
 
-       trashLifetime = 3600 * time.Second
+       trashLifetime = 3600
 
        // put block and backdate it
        v.PutRaw(TestHash, TestBlock)
@@ -758,3 +761,157 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
        }
        bufs.Put(buf)
 }
+
+// With trashLifetime == 0, perform:
+// Trash an old block - which either raises ErrNotImplemented or succeeds to delete it
+// Untrash - which either raises ErrNotImplemented or is a no-op for the deleted block
+// Get - which must fail to find the block, since it was deleted and hence not untrashed
+func testEmptyTrashTrashLifetime0s(t TB, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+       defer func() {
+               trashLifetime = 0
+               doneEmptyingTrash <- true
+       }()
+
+       trashLifetime = 0
+       trashCheckInterval = 1
+
+       go emptyTrash(trashCheckInterval)
+
+       // Trash old block; since trashLifetime = 0, Trash actually deletes the block
+       err := trashUntrashOldBlock(t, v, 0)
+
+       // Get it; for writable volumes, this should not find the block since it was deleted
+       buf, err := v.Get(TestHash)
+       if err != nil {
+               if !os.IsNotExist(err) {
+                       t.Errorf("os.IsNotExist(%v) should have been true", err)
+               }
+       } else {
+               if bytes.Compare(buf, TestBlock) != 0 {
+                       t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+               }
+               bufs.Put(buf)
+       }
+}
+
+// With large trashLifetime, perform:
+// Run emptyTrash goroutine with much smaller trashCheckInterval
+// Trash an old block - which either raises ErrNotImplemented or succeeds
+// Untrash - which either raises ErrNotImplemented or succeeds
+// Get - which must find the block
+func testEmptyTrashTrashLifetime3600s(t TB, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+       defer func() {
+               trashLifetime = 0
+               doneEmptyingTrash <- true
+       }()
+
+       trashLifetime = 3600
+       trashCheckInterval = 1
+
+       go emptyTrash(trashCheckInterval)
+
+       // Trash old block
+       err := trashUntrashOldBlock(t, v, 2)
+
+       // Get is expected to succeed after untrash before EmptyTrash
+       // It is still found on readonly volumes
+       buf, err := v.Get(TestHash)
+       if err != nil {
+               if !os.IsNotExist(err) {
+                       t.Errorf("os.IsNotExist(%v) should have been true", err)
+               }
+       } else {
+               if bytes.Compare(buf, TestBlock) != 0 {
+                       t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+               }
+               bufs.Put(buf)
+       }
+}
+
+// With trashLifetime = 1, perform:
+// Run emptyTrash goroutine
+// Trash an old block - which either raises ErrNotImplemented or succeeds
+// Untrash - after emptyTrash goroutine ticks, and hence does not actually untrash
+// Get - which must fail to find the block
+func testEmptyTrashTrashLifetime1s(t TB, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+       defer func() {
+               trashLifetime = 0
+               doneEmptyingTrash <- true
+       }()
+
+       volumes = append(volumes, v)
+
+       trashLifetime = 1
+       trashCheckInterval = 1
+
+       go emptyTrash(trashCheckInterval)
+
+       // Trash old block and untrash a little after first trashCheckInterval
+       err := trashUntrashOldBlock(t, v, 3)
+
+       // Get is expected to fail due to EmptyTrash before Untrash
+       // It is still found on readonly volumes
+       buf, err := v.Get(TestHash)
+       if err != nil {
+               if !os.IsNotExist(err) {
+                       t.Errorf("os.IsNotExist(%v) should have been true", err)
+               }
+       } else {
+               if bytes.Compare(buf, TestBlock) != 0 {
+                       t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+               }
+               bufs.Put(buf)
+       }
+}
+
+// Put a block, backdate it, trash it, untrash it after the untrashAfter seconds
+func trashUntrashOldBlock(t TB, v TestableVolume, untrashAfter int) error {
+       // put block and backdate it
+       v.PutRaw(TestHash, TestBlock)
+       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+
+       buf, err := v.Get(TestHash)
+       if err != nil {
+               t.Fatal(err)
+       }
+       if bytes.Compare(buf, TestBlock) != 0 {
+               t.Fatalf("Got data %+q, expected %+q", buf, TestBlock)
+       }
+       bufs.Put(buf)
+
+       // Trash
+       err = v.Trash(TestHash)
+       if err != nil {
+               if err != ErrNotImplemented && err != MethodDisabledError {
+                       t.Fatal(err)
+               } else {
+                       // To test emptyTrash goroutine effectively, we need to give the
+                       // ticker a couple rounds, adding some sleep time to the test.
+                       // This delay is unnecessary for volumes that are currently
+                       // not yet supporting trashLifetime > 0 (this case is already
+                       // covered in the testTrashUntrash already)
+                       return err
+               }
+       } else {
+               _, err = v.Get(TestHash)
+               if err == nil || !os.IsNotExist(err) {
+                       t.Fatalf("os.IsNotExist(%v) should have been true", err)
+               }
+       }
+
+       // Untrash after give wait time
+       time.Sleep(time.Duration(untrashAfter) * time.Second)
+       err = v.Untrash(TestHash)
+       if err != nil {
+               if err != ErrNotImplemented && err != MethodDisabledError {
+                       t.Fatal(err)
+               }
+       }
+       return err
+}
index 53ffeef0bba186d7f995e6e6afb00feb194c5e7f..508c7fa24f4b7a385220ff1ef55e4a166ff2d9f8 100644 (file)
@@ -223,3 +223,7 @@ func (v *MockVolume) Writable() bool {
 func (v *MockVolume) Replication() int {
        return 1
 }
+
+func (v *MockVolume) EmptyTrash() error {
+       return nil
+}
index 27ee2422b681bf006d0c4d48e3f49fcfe97fcb20..eca0aeed9297b3737d5658ad1dfb6a0d2319b53b 100644 (file)
@@ -23,9 +23,6 @@ type unixVolumeAdder struct {
 }
 
 func (vs *unixVolumeAdder) Set(value string) error {
-       if trashLifetime != 0 {
-               return ErrNotImplemented
-       }
        if dirs := strings.Split(value, ","); len(dirs) > 1 {
                log.Print("DEPRECATED: using comma-separated volume list.")
                for _, dir := range dirs {
@@ -365,22 +362,22 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
        }
 }
 
-// Delete deletes the block data from the unix storage
+// Trash trashes the block data from the unix storage
+// If trashLifetime == 0, the block is deleted
+// Else, the block is renamed as path/{loc}.trash.{deadline},
+// where deadline = now + trashLifetime
 func (v *UnixVolume) Trash(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
-       // and Delete() because either (a) the file will be deleted and Touch()
+       // and Trash() because either (a) the file will be trashed and Touch()
        // will signal to the caller that the file is not present (and needs to
        // be re-written), or (b) Touch() will update the file's timestamp and
-       // Delete() will read the correct up-to-date timestamp and choose not to
-       // delete the file.
+       // Trash() will read the correct up-to-date timestamp and choose not to
+       // trash the file.
 
        if v.readonly {
                return MethodDisabledError
        }
-       if trashLifetime != 0 {
-               return ErrNotImplemented
-       }
        if v.locker != nil {
                v.locker.Lock()
                defer v.locker.Unlock()
@@ -408,11 +405,21 @@ func (v *UnixVolume) Trash(loc string) error {
                        return nil
                }
        }
-       return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
+
+       if trashLifetime == 0 {
+               return os.Remove(p)
+       }
+       return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Unix()+int64(trashLifetime)))
 }
 
 // Untrash moves block from trash back into store
+// Look for path/{loc}.trash.{deadline} in storage,
+// and rename the first such file as path/{loc}
 func (v *UnixVolume) Untrash(loc string) (err error) {
+       if v.readonly {
+               return MethodDisabledError
+       }
+
        prefix := fmt.Sprintf("%v.trash.", loc)
        files, _ := ioutil.ReadDir(v.blockDir(loc))
        for _, f := range files {
@@ -423,7 +430,8 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
                        }
                }
        }
-       return err
+
+       return
 }
 
 // blockDir returns the fully qualified directory name for the directory
@@ -517,3 +525,51 @@ func (v *UnixVolume) translateError(err error) error {
                return err
        }
 }
+
+var trashRegexp = regexp.MustCompile(`.*([0-9a-fA-F]{32}).trash.(\d+)`)
+
+// EmptyTrash walks hierarchy looking for {hash}.trash.*
+// and deletes those with deadline < now.
+func (v *UnixVolume) EmptyTrash() error {
+       var bytesDeleted, bytesInTrash int64
+       var blocksDeleted, blocksInTrash int
+
+       err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
+               if err != nil {
+                       log.Printf("EmptyTrash error for %v: %v", path, err)
+               } else if !info.Mode().IsDir() {
+                       matches := trashRegexp.FindStringSubmatch(path)
+                       if len(matches) == 3 {
+                               deadline, err := strconv.Atoi(matches[2])
+                               if err != nil {
+                                       log.Printf("EmptyTrash error for %v: %v", matches[1], err)
+                               } else {
+                                       if int64(deadline) < time.Now().Unix() {
+                                               err = os.Remove(path)
+                                               if err != nil {
+                                                       log.Printf("Error deleting %v: %v", matches[1], err)
+                                                       bytesInTrash += info.Size()
+                                                       blocksInTrash++
+                                               } else {
+                                                       bytesDeleted += info.Size()
+                                                       blocksDeleted++
+                                               }
+                                       } else {
+                                               bytesInTrash += info.Size()
+                                               blocksInTrash++
+                                       }
+                               }
+                       }
+               }
+               return nil
+       })
+
+       if err != nil {
+               log.Printf("EmptyTrash error for %v: %v", v.String(), err)
+       } else {
+               log.Printf("EmptyTrash stats for %v: Bytes deleted %v; Blocks deleted %v; Bytes remaining in trash: %v; Blocks remaining in trash: %v",
+                       v.String(), bytesDeleted, blocksDeleted, bytesInTrash, blocksInTrash)
+       }
+
+       return nil
+}