Merge branch '9857-cwl-acceptlist-re' refs #9857
[arvados.git] / services / keepstore / volume_unix.go
index 1be622c23f141d48f5d896d484fb8d62b95a2655..5982fb0484eae0a37ef09e24b9526492c5b0459f 100644 (file)
@@ -23,10 +23,6 @@ type unixVolumeAdder struct {
 }
 
 func (vs *unixVolumeAdder) Set(value string) error {
-       if trashLifetime <= 0 {
-               log.Print("Missing required configuration parameter: trash-lifetime")
-               //return ErrNotImplemented
-       }
        if dirs := strings.Split(value, ","); len(dirs) > 1 {
                log.Print("DEPRECATED: using comma-separated volume list.")
                for _, dir := range dirs {
@@ -142,9 +138,8 @@ func (v *UnixVolume) Touch(loc string) error {
                return e
        }
        defer unlockfile(f)
-       now := time.Now().Unix()
-       utime := syscall.Utimbuf{now, now}
-       return syscall.Utime(p, &utime)
+       ts := syscall.NsecToTimespec(time.Now().UnixNano())
+       return syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
 }
 
 // Mtime returns the stored timestamp for the given locator.
@@ -185,26 +180,24 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
        return stat, err
 }
 
-// Get retrieves a block identified by the locator string "loc", and
-// returns its contents as a byte slice.
-//
-// Get returns a nil buffer IFF it returns a non-nil error.
-func (v *UnixVolume) Get(loc string) ([]byte, error) {
+// Get retrieves a block, copies it to the given slice, and returns
+// the number of bytes copied.
+func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
        path := v.blockPath(loc)
        stat, err := v.stat(path)
        if err != nil {
-               return nil, v.translateError(err)
+               return 0, v.translateError(err)
        }
-       buf := bufs.Get(int(stat.Size()))
+       if stat.Size() > int64(len(buf)) {
+               return 0, TooLongError
+       }
+       var read int
+       size := int(stat.Size())
        err = v.getFunc(path, func(rdr io.Reader) error {
-               _, err = io.ReadFull(rdr, buf)
+               read, err = io.ReadFull(rdr, buf[:size])
                return err
        })
-       if err != nil {
-               bufs.Put(buf)
-               return nil, err
-       }
-       return buf, nil
+       return read, err
 }
 
 // Compare returns nil if Get(loc) would return the same content as
@@ -313,7 +306,7 @@ var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
 //
 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
-       var lastErr error = nil
+       var lastErr error
        rootdir, err := os.Open(v.root)
        if err != nil {
                return err
@@ -359,22 +352,25 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                        _, err = fmt.Fprint(w,
                                name,
                                "+", fileInfo[0].Size(),
-                               " ", fileInfo[0].ModTime().Unix(),
+                               " ", fileInfo[0].ModTime().UnixNano(),
                                "\n")
                }
                blockdir.Close()
        }
 }
 
-// Delete deletes the block data from the unix storage
+// Trash trashes the block data from the unix storage
+// If trashLifetime == 0, the block is deleted
+// Else, the block is renamed as path/{loc}.trash.{deadline},
+// where deadline = now + trashLifetime
 func (v *UnixVolume) Trash(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
-       // and Delete() because either (a) the file will be deleted and Touch()
+       // and Trash() because either (a) the file will be trashed and Touch()
        // will signal to the caller that the file is not present (and needs to
        // be re-written), or (b) Touch() will update the file's timestamp and
-       // Delete() will read the correct up-to-date timestamp and choose not to
-       // delete the file.
+       // Trash() will read the correct up-to-date timestamp and choose not to
+       // trash the file.
 
        if v.readonly {
                return MethodDisabledError
@@ -401,18 +397,50 @@ func (v *UnixVolume) Trash(loc string) error {
        // anyway (because the permission signatures have expired).
        if fi, err := os.Stat(p); err != nil {
                return err
-       } else {
-               if time.Since(fi.ModTime()) < blobSignatureTTL {
-                       return nil
-               }
+       } else if time.Since(fi.ModTime()) < blobSignatureTTL {
+               return nil
+       }
+
+       if trashLifetime == 0 {
+               return os.Remove(p)
        }
-       return os.Remove(p)
+       return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
 }
 
 // Untrash moves block from trash back into store
-// TBD
-func (v *UnixVolume) Untrash(loc string) error {
-       return nil
+// Look for path/{loc}.trash.{deadline} in storage,
+// and rename the first such file as path/{loc}
+func (v *UnixVolume) Untrash(loc string) (err error) {
+       if v.readonly {
+               return MethodDisabledError
+       }
+
+       files, err := ioutil.ReadDir(v.blockDir(loc))
+       if err != nil {
+               return err
+       }
+
+       if len(files) == 0 {
+               return os.ErrNotExist
+       }
+
+       foundTrash := false
+       prefix := fmt.Sprintf("%v.trash.", loc)
+       for _, f := range files {
+               if strings.HasPrefix(f.Name(), prefix) {
+                       foundTrash = true
+                       err = os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
+                       if err == nil {
+                               break
+                       }
+               }
+       }
+
+       if foundTrash == false {
+               return os.ErrNotExist
+       }
+
+       return
 }
 
 // blockDir returns the fully qualified directory name for the directory
@@ -476,11 +504,14 @@ func (v *UnixVolume) String() string {
        return fmt.Sprintf("[UnixVolume %s]", v.root)
 }
 
-// Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
+// Writable returns false if all future Put, Mtime, and Delete calls
+// are expected to fail.
 func (v *UnixVolume) Writable() bool {
        return !v.readonly
 }
 
+// Replication returns the number of replicas promised by the
+// underlying device (currently assumed to be 1).
 func (v *UnixVolume) Replication() int {
        return 1
 }
@@ -506,3 +537,50 @@ func (v *UnixVolume) translateError(err error) error {
                return err
        }
 }
+
+var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
+
+// EmptyTrash walks hierarchy looking for {hash}.trash.*
+// and deletes those with deadline < now.
+func (v *UnixVolume) EmptyTrash() {
+       var bytesDeleted, bytesInTrash int64
+       var blocksDeleted, blocksInTrash int
+
+       err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
+               if err != nil {
+                       log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
+                       return nil
+               }
+               if info.Mode().IsDir() {
+                       return nil
+               }
+               matches := unixTrashLocRegexp.FindStringSubmatch(path)
+               if len(matches) != 3 {
+                       return nil
+               }
+               deadline, err := strconv.ParseInt(matches[2], 10, 64)
+               if err != nil {
+                       log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
+                       return nil
+               }
+               bytesInTrash += info.Size()
+               blocksInTrash++
+               if deadline > time.Now().Unix() {
+                       return nil
+               }
+               err = os.Remove(path)
+               if err != nil {
+                       log.Printf("EmptyTrash: Remove %v: %v", path, err)
+                       return nil
+               }
+               bytesDeleted += info.Size()
+               blocksDeleted++
+               return nil
+       })
+
+       if err != nil {
+               log.Printf("EmptyTrash error for %v: %v", v.String(), err)
+       }
+
+       log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+}