Merge branch 'master' into 9187-crunchv2-dispatching
[arvados.git] / services / keepstore / volume_unix.go
index 98c31d1eab6d0c18f3d242daf898c3d25345e490..7aff85e59a4357acb1e27ce5386756feb96fa0e1 100644 (file)
@@ -181,26 +181,24 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
        return stat, err
 }
 
-// Get retrieves a block identified by the locator string "loc", and
-// returns its contents as a byte slice.
-//
-// Get returns a nil buffer IFF it returns a non-nil error.
-func (v *UnixVolume) Get(loc string) ([]byte, error) {
+// Get retrieves a block, copies it to the given slice, and returns
+// the number of bytes copied.
+func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
        path := v.blockPath(loc)
        stat, err := v.stat(path)
        if err != nil {
-               return nil, err
+               return 0, v.translateError(err)
+       }
+       if stat.Size() > int64(len(buf)) {
+               return 0, TooLongError
        }
-       buf := bufs.Get(int(stat.Size()))
+       var read int
+       size := int(stat.Size())
        err = v.getFunc(path, func(rdr io.Reader) error {
-               _, err = io.ReadFull(rdr, buf)
+               read, err = io.ReadFull(rdr, buf[:size])
                return err
        })
-       if err != nil {
-               bufs.Put(buf)
-               return nil, err
-       }
-       return buf, nil
+       return read, err
 }
 
 // Compare returns nil if Get(loc) would return the same content as
@@ -209,7 +207,7 @@ func (v *UnixVolume) Get(loc string) ([]byte, error) {
 func (v *UnixVolume) Compare(loc string, expect []byte) error {
        path := v.blockPath(loc)
        if _, err := v.stat(path); err != nil {
-               return err
+               return v.translateError(err)
        }
        return v.getFunc(path, func(rdr io.Reader) error {
                return compareReaderWithBuf(rdr, expect, loc[:32])
@@ -292,6 +290,7 @@ func (v *UnixVolume) Status() *VolumeStatus {
 }
 
 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
+var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
 // IndexTo writes (to the given Writer) a list of blocks found on this
 // volume which begin with the specified prefix. If the prefix is an
@@ -348,6 +347,9 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                        if !strings.HasPrefix(name, prefix) {
                                continue
                        }
+                       if !blockFileRe.MatchString(name) {
+                               continue
+                       }
                        _, err = fmt.Fprint(w,
                                name,
                                "+", fileInfo[0].Size(),
@@ -358,15 +360,18 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
        }
 }
 
-// Delete deletes the block data from the unix storage
-func (v *UnixVolume) Delete(loc string) error {
+// Trash trashes the block data from the unix storage
+// If trashLifetime == 0, the block is deleted
+// Else, the block is renamed as path/{loc}.trash.{deadline},
+// where deadline = now + trashLifetime
+func (v *UnixVolume) Trash(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
-       // and Delete() because either (a) the file will be deleted and Touch()
+       // and Trash() because either (a) the file will be trashed and Touch()
        // will signal to the caller that the file is not present (and needs to
        // be re-written), or (b) Touch() will update the file's timestamp and
-       // Delete() will read the correct up-to-date timestamp and choose not to
-       // delete the file.
+       // Trash() will read the correct up-to-date timestamp and choose not to
+       // trash the file.
 
        if v.readonly {
                return MethodDisabledError
@@ -398,7 +403,47 @@ func (v *UnixVolume) Delete(loc string) error {
                        return nil
                }
        }
-       return os.Remove(p)
+
+       if trashLifetime == 0 {
+               return os.Remove(p)
+       }
+       return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
+}
+
+// Untrash moves block from trash back into store
+// Look for path/{loc}.trash.{deadline} in storage,
+// and rename the first such file as path/{loc}
+func (v *UnixVolume) Untrash(loc string) (err error) {
+       if v.readonly {
+               return MethodDisabledError
+       }
+
+       files, err := ioutil.ReadDir(v.blockDir(loc))
+       if err != nil {
+               return err
+       }
+
+       if len(files) == 0 {
+               return os.ErrNotExist
+       }
+
+       foundTrash := false
+       prefix := fmt.Sprintf("%v.trash.", loc)
+       for _, f := range files {
+               if strings.HasPrefix(f.Name(), prefix) {
+                       foundTrash = true
+                       err = os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
+                       if err == nil {
+                               break
+                       }
+               }
+       }
+
+       if foundTrash == false {
+               return os.ErrNotExist
+       }
+
+       return
 }
 
 // blockDir returns the fully qualified directory name for the directory
@@ -479,3 +524,63 @@ func lockfile(f *os.File) error {
 func unlockfile(f *os.File) error {
        return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
 }
+
+// Where appropriate, translate a more specific filesystem error to an
+// error recognized by handlers, like os.ErrNotExist.
+func (v *UnixVolume) translateError(err error) error {
+       switch err.(type) {
+       case *os.PathError:
+               // stat() returns a PathError if the parent directory
+               // (not just the file itself) is missing
+               return os.ErrNotExist
+       default:
+               return err
+       }
+}
+
+var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
+
+// EmptyTrash walks hierarchy looking for {hash}.trash.*
+// and deletes those with deadline < now.
+func (v *UnixVolume) EmptyTrash() {
+       var bytesDeleted, bytesInTrash int64
+       var blocksDeleted, blocksInTrash int
+
+       err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
+               if err != nil {
+                       log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
+                       return nil
+               }
+               if info.Mode().IsDir() {
+                       return nil
+               }
+               matches := unixTrashLocRegexp.FindStringSubmatch(path)
+               if len(matches) != 3 {
+                       return nil
+               }
+               deadline, err := strconv.ParseInt(matches[2], 10, 64)
+               if err != nil {
+                       log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
+                       return nil
+               }
+               bytesInTrash += info.Size()
+               blocksInTrash++
+               if deadline > time.Now().Unix() {
+                       return nil
+               }
+               err = os.Remove(path)
+               if err != nil {
+                       log.Printf("EmptyTrash: Remove %v: %v", path, err)
+                       return nil
+               }
+               bytesDeleted += info.Size()
+               blocksDeleted++
+               return nil
+       })
+
+       if err != nil {
+               log.Printf("EmptyTrash error for %v: %v", v.String(), err)
+       }
+
+       log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+}