8554: Untrash returns os.ErrNotExist when no trash found for the given locator and...
authorradhika <radhika@curoverse.com>
Sun, 13 Mar 2016 12:52:49 +0000 (08:52 -0400)
committerradhika <radhika@curoverse.com>
Sun, 13 Mar 2016 12:52:49 +0000 (08:52 -0400)
services/keepstore/keepstore.go
services/keepstore/volume_generic_test.go
services/keepstore/volume_unix.go

index 086162368fe58ae238a6d939887bd96b9f20c00a..9473690130bd36f967c387499594eda80c95614e 100644 (file)
@@ -11,6 +11,7 @@ import (
        "net/http"
        "os"
        "os/signal"
+       "regexp"
        "strings"
        "syscall"
        "time"
@@ -55,13 +56,15 @@ var dataManagerToken string
 // actually deleting anything.
 var neverDelete = true
 
-// trashLifetime is the time duration in seconds after a block is trashed
+// trashLifetime is the time duration after a block is trashed
 // during which it can be recovered using an /untrash request
-var trashLifetime int
+// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
+var trashLifetime time.Duration
 
-// Interval in seconds at which the emptyTrash goroutine will check
-// and delete expired trashed blocks. Default is once a day.
-var trashCheckInterval int
+// trashCheckInterval is the time duration at which the emptyTrash goroutine
+// will check and delete expired trashed blocks. Default is one day.
+// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
+var trashCheckInterval time.Duration
 
 var maxBuffers = 128
 var bufs *bufferPool
@@ -124,6 +127,8 @@ var (
        volumes         volumeSet
 )
 
+var trashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32}).trash.(\d+)$`)
+
 func (vs *volumeSet) String() string {
        return fmt.Sprintf("%+v", (*vs)[:])
 }
@@ -209,16 +214,16 @@ func main() {
                "max-buffers",
                maxBuffers,
                fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
-       flag.IntVar(
+       flag.DurationVar(
                &trashLifetime,
                "trash-lifetime",
-               0,
-               "Interval in seconds after a block is trashed during which it can be recovered using an /untrash request")
-       flag.IntVar(
+               0*time.Second,
+               "Time duration after a block is trashed during which it can be recovered using an /untrash request")
+       flag.DurationVar(
                &trashCheckInterval,
                "trash-check-interval",
-               24*60*60,
-               "Interval in seconds at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
+               24*time.Hour,
+               "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
 
        flag.Parse()
 
@@ -331,15 +336,16 @@ func main() {
        go RunTrashWorker(trashq)
 
        // Start emptyTrash goroutine
-       go emptyTrash(trashCheckInterval)
+       doneEmptyingTrash := make(chan bool)
+       go emptyTrash(doneEmptyingTrash, trashCheckInterval)
 
        // Shut down the server gracefully (by closing the listener)
        // if SIGTERM is received.
        term := make(chan os.Signal, 1)
        go func(sig <-chan os.Signal) {
-               doneEmptyingTrash <- true
                s := <-sig
                log.Println("caught signal:", s)
+               doneEmptyingTrash <- true
                listener.Close()
        }(term)
        signal.Notify(term, syscall.SIGTERM)
@@ -350,12 +356,9 @@ func main() {
        srv.Serve(listener)
 }
 
-// Channel to stop emptying trash
-var doneEmptyingTrash = make(chan bool)
-
 // At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
-func emptyTrash(trashCheckInterval int) {
-       ticker := time.NewTicker(time.Duration(trashCheckInterval) * time.Second)
+func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) {
+       ticker := time.NewTicker(trashCheckInterval)
 
        for {
                select {
index c614a08521a723ac0c583bacbb854ddfc2c2516c..5b0016f2772239a5f200cda3a6feff567280739c 100644 (file)
@@ -78,7 +78,6 @@ func DoGenericVolumeTests(t TB, factory TestableVolumeFactory) {
        testPutFullBlock(t, factory)
 
        testTrashUntrash(t, factory)
-       testEmptyTrashTrashLifetime0s(t, factory)
        testEmptyTrashTrashLifetime3600s(t, factory)
        testEmptyTrashTrashLifetime1s(t, factory)
 }
@@ -710,10 +709,10 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
        defer func() {
-               trashLifetime = 0
+               trashLifetime = 0 * time.Second
        }()
 
-       trashLifetime = 3600
+       trashLifetime = 3600 * time.Second
 
        // put block and backdate it
        v.PutRaw(TestHash, TestBlock)
@@ -762,40 +761,6 @@ func testTrashUntrash(t TB, factory TestableVolumeFactory) {
        bufs.Put(buf)
 }
 
-// With trashLifetime == 0, perform:
-// Trash an old block - which either raises ErrNotImplemented or succeeds to delete it
-// Untrash - which either raises ErrNotImplemented or is a no-op for the deleted block
-// Get - which must fail to find the block, since it was deleted and hence not untrashed
-func testEmptyTrashTrashLifetime0s(t TB, factory TestableVolumeFactory) {
-       v := factory(t)
-       defer v.Teardown()
-       defer func() {
-               trashLifetime = 0
-               doneEmptyingTrash <- true
-       }()
-
-       trashLifetime = 0
-       trashCheckInterval = 1
-
-       go emptyTrash(trashCheckInterval)
-
-       // Trash old block; since trashLifetime = 0, Trash actually deletes the block
-       err := trashUntrashOldBlock(t, v, 0)
-
-       // Get it; for writable volumes, this should not find the block since it was deleted
-       buf, err := v.Get(TestHash)
-       if err != nil {
-               if !os.IsNotExist(err) {
-                       t.Errorf("os.IsNotExist(%v) should have been true", err)
-               }
-       } else {
-               if bytes.Compare(buf, TestBlock) != 0 {
-                       t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
-               }
-               bufs.Put(buf)
-       }
-}
-
 // With large trashLifetime, perform:
 // Run emptyTrash goroutine with much smaller trashCheckInterval
 // Trash an old block - which either raises ErrNotImplemented or succeeds
@@ -804,15 +769,17 @@ func testEmptyTrashTrashLifetime0s(t TB, factory TestableVolumeFactory) {
 func testEmptyTrashTrashLifetime3600s(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
+
+       doneEmptyingTrash := make(chan bool)
        defer func() {
-               trashLifetime = 0
+               trashLifetime = 0 * time.Second
                doneEmptyingTrash <- true
        }()
 
-       trashLifetime = 3600
-       trashCheckInterval = 1
+       trashLifetime = 3600 * time.Second
+       trashCheckInterval = 1 * time.Second
 
-       go emptyTrash(trashCheckInterval)
+       go emptyTrash(doneEmptyingTrash, trashCheckInterval)
 
        // Trash old block
        err := trashUntrashOldBlock(t, v, 2)
@@ -840,17 +807,19 @@ func testEmptyTrashTrashLifetime3600s(t TB, factory TestableVolumeFactory) {
 func testEmptyTrashTrashLifetime1s(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
+
+       doneEmptyingTrash := make(chan bool)
        defer func() {
-               trashLifetime = 0
+               trashLifetime = 0 * time.Second
                doneEmptyingTrash <- true
        }()
 
        volumes = append(volumes, v)
 
-       trashLifetime = 1
-       trashCheckInterval = 1
+       trashLifetime = 1 * time.Second
+       trashCheckInterval = 1 * time.Second
 
-       go emptyTrash(trashCheckInterval)
+       go emptyTrash(doneEmptyingTrash, trashCheckInterval)
 
        // Trash old block and untrash a little after first trashCheckInterval
        err := trashUntrashOldBlock(t, v, 3)
@@ -905,11 +874,11 @@ func trashUntrashOldBlock(t TB, v TestableVolume, untrashAfter int) error {
                }
        }
 
-       // Untrash after give wait time
+       // Untrash after give wait time; it may have been deleted by emptyTrash goroutine
        time.Sleep(time.Duration(untrashAfter) * time.Second)
        err = v.Untrash(TestHash)
        if err != nil {
-               if err != ErrNotImplemented && err != MethodDisabledError {
+               if err != ErrNotImplemented && err != MethodDisabledError && err != os.ErrNotExist {
                        t.Fatal(err)
                }
        }
index eca0aeed9297b3737d5658ad1dfb6a0d2319b53b..9183a398f34ca89cc40eb6b6c5f1c2122f831a84 100644 (file)
@@ -409,7 +409,7 @@ func (v *UnixVolume) Trash(loc string) error {
        if trashLifetime == 0 {
                return os.Remove(p)
        }
-       return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Unix()+int64(trashLifetime)))
+       return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
 }
 
 // Untrash moves block from trash back into store
@@ -421,11 +421,17 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
        }
 
        prefix := fmt.Sprintf("%v.trash.", loc)
-       files, _ := ioutil.ReadDir(v.blockDir(loc))
+       files, err := ioutil.ReadDir(v.blockDir(loc))
+       if err != nil {
+               return err
+       }
+       if len(files) == 0 {
+               return os.ErrNotExist
+       }
        for _, f := range files {
                if strings.HasPrefix(f.Name(), prefix) {
                        err = os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
-                       if err != nil {
+                       if err == nil {
                                break
                        }
                }
@@ -526,8 +532,6 @@ func (v *UnixVolume) translateError(err error) error {
        }
 }
 
-var trashRegexp = regexp.MustCompile(`.*([0-9a-fA-F]{32}).trash.(\d+)`)
-
 // EmptyTrash walks hierarchy looking for {hash}.trash.*
 // and deletes those with deadline < now.
 func (v *UnixVolume) EmptyTrash() error {
@@ -536,15 +540,17 @@ func (v *UnixVolume) EmptyTrash() error {
 
        err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
                if err != nil {
-                       log.Printf("EmptyTrash error for %v: %v", path, err)
-               } else if !info.Mode().IsDir() {
-                       matches := trashRegexp.FindStringSubmatch(path)
+                       return err
+               }
+
+               if !info.Mode().IsDir() {
+                       matches := trashLocRegexp.FindStringSubmatch(path)
                        if len(matches) == 3 {
-                               deadline, err := strconv.Atoi(matches[2])
+                               deadline, err := strconv.ParseInt(matches[2], 10, 64)
                                if err != nil {
                                        log.Printf("EmptyTrash error for %v: %v", matches[1], err)
                                } else {
-                                       if int64(deadline) < time.Now().Unix() {
+                                       if int64(deadline) <= time.Now().Unix() {
                                                err = os.Remove(path)
                                                if err != nil {
                                                        log.Printf("Error deleting %v: %v", matches[1], err)
@@ -566,10 +572,9 @@ func (v *UnixVolume) EmptyTrash() error {
 
        if err != nil {
                log.Printf("EmptyTrash error for %v: %v", v.String(), err)
-       } else {
-               log.Printf("EmptyTrash stats for %v: Bytes deleted %v; Blocks deleted %v; Bytes remaining in trash: %v; Blocks remaining in trash: %v",
-                       v.String(), bytesDeleted, blocksDeleted, bytesInTrash, blocksInTrash)
        }
 
+       log.Printf("EmptyTrash stats for %v: Bytes deleted %v; Blocks deleted %v; Bytes remaining in trash: %v; Blocks remaining in trash: %v", v.String(), bytesDeleted, blocksDeleted, bytesInTrash, blocksInTrash)
+
        return nil
 }