8561: Fix test_arvados_node_match
[arvados.git] / services / keepstore / keepstore.go
index 3e360e1799117e80e773e1e5c58fa3b5560b07ef..3850e993fc511216d4967b1c757109ced844a591 100644 (file)
@@ -1,9 +1,7 @@
 package main
 
 import (
-       "bufio"
        "bytes"
-       "errors"
        "flag"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
@@ -14,7 +12,6 @@ import (
        "os"
        "os/signal"
        "strings"
-       "sync"
        "syscall"
        "time"
 )
@@ -36,10 +33,6 @@ const BlockSize = 64 * 1024 * 1024
 // in order to permit writes.
 const MinFreeKilobytes = BlockSize / 1024
 
-// Until #6221 is resolved, never_delete must be true.
-// However, allow it to be false in testing with TestDataManagerToken
-const TestDataManagerToken = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
-
 // ProcMounts /proc/mounts
 var ProcMounts = "/proc/mounts"
 
@@ -62,6 +55,10 @@ var dataManagerToken string
 // actually deleting anything.
 var neverDelete = true
 
+// trashLifetime is the time duration after a block is trashed
+// during which it can be recovered using an /untrash request
+var trashLifetime time.Duration
+
 var maxBuffers = 128
 var bufs *bufferPool
 
@@ -86,6 +83,7 @@ var (
        SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
        TooLongError        = &KeepError{413, "Block is too large"}
        MethodDisabledError = &KeepError{405, "Method disabled"}
+       ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
 )
 
 func (e *KeepError) Error() string {
@@ -114,95 +112,16 @@ var KeepVM VolumeManager
 var pullq *WorkQueue
 var trashq *WorkQueue
 
+type volumeSet []Volume
+
 var (
        flagSerializeIO bool
        flagReadonly    bool
+       volumes         volumeSet
 )
 
-type volumeSet []Volume
-
-func (vs *volumeSet) Set(value string) error {
-       if dirs := strings.Split(value, ","); len(dirs) > 1 {
-               log.Print("DEPRECATED: using comma-separated volume list.")
-               for _, dir := range dirs {
-                       if err := vs.Set(dir); err != nil {
-                               return err
-                       }
-               }
-               return nil
-       }
-       if len(value) == 0 || value[0] != '/' {
-               return errors.New("Invalid volume: must begin with '/'.")
-       }
-       if _, err := os.Stat(value); err != nil {
-               return err
-       }
-       var locker sync.Locker
-       if flagSerializeIO {
-               locker = &sync.Mutex{}
-       }
-       *vs = append(*vs, &UnixVolume{
-               root:     value,
-               locker:   locker,
-               readonly: flagReadonly,
-       })
-       return nil
-}
-
 func (vs *volumeSet) String() string {
-       s := "["
-       for i, v := range *vs {
-               if i > 0 {
-                       s = s + " "
-               }
-               s = s + v.String()
-       }
-       return s + "]"
-}
-
-// Discover adds a volume for every directory named "keep" that is
-// located at the top level of a device- or tmpfs-backed mount point
-// other than "/". It returns the number of volumes added.
-func (vs *volumeSet) Discover() int {
-       added := 0
-       f, err := os.Open(ProcMounts)
-       if err != nil {
-               log.Fatalf("opening %s: %s", ProcMounts, err)
-       }
-       scanner := bufio.NewScanner(f)
-       for scanner.Scan() {
-               args := strings.Fields(scanner.Text())
-               if err := scanner.Err(); err != nil {
-                       log.Fatalf("reading %s: %s", ProcMounts, err)
-               }
-               dev, mount := args[0], args[1]
-               if mount == "/" {
-                       continue
-               }
-               if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
-                       continue
-               }
-               keepdir := mount + "/keep"
-               if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
-                       continue
-               }
-               // Set the -readonly flag (but only for this volume)
-               // if the filesystem is mounted readonly.
-               flagReadonlyWas := flagReadonly
-               for _, fsopt := range strings.Split(args[3], ",") {
-                       if fsopt == "ro" {
-                               flagReadonly = true
-                               break
-                       }
-                       if fsopt == "rw" {
-                               break
-                       }
-               }
-               vs.Set(keepdir)
-               flagReadonly = flagReadonlyWas
-               added++
-       }
-       return added
+       return fmt.Sprintf("%+v", (*vs)[:])
 }
 
 // TODO(twp): continue moving as much code as possible out of main
@@ -219,7 +138,6 @@ func main() {
                listen               string
                blobSigningKeyFile   string
                permissionTTLSec     int
-               volumes              volumeSet
                pidfile              string
        )
        flag.StringVar(
@@ -242,8 +160,9 @@ func main() {
                &neverDelete,
                "never-delete",
                true,
-               "If set, nothing will be deleted. HTTP 405 will be returned "+
-                       "for valid DELETE requests.")
+               "If true, nothing will be deleted. "+
+                       "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+
+                       "You should leave this option alone unless you can afford to lose data.")
        flag.StringVar(
                &blobSigningKeyFile,
                "permission-key-file",
@@ -276,14 +195,6 @@ func main() {
                "readonly",
                false,
                "Do not write, delete, or touch anything on the following volumes.")
-       flag.Var(
-               &volumes,
-               "volumes",
-               "Deprecated synonym for -volume.")
-       flag.Var(
-               &volumes,
-               "volume",
-               "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
        flag.StringVar(
                &pidfile,
                "pid",
@@ -294,6 +205,11 @@ func main() {
                "max-buffers",
                maxBuffers,
                fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
+       flag.DurationVar(
+               &trashLifetime,
+               "trash-lifetime",
+               0*time.Second,
+               "Interval after a block is trashed during which it can be recovered using an /untrash request")
 
        flag.Parse()
 
@@ -328,7 +244,7 @@ func main() {
        }
 
        if len(volumes) == 0 {
-               if volumes.Discover() == 0 {
+               if (&unixVolumeAdder{&volumes}).Discover() == 0 {
                        log.Fatal("No volumes found.")
                }
        }
@@ -348,8 +264,9 @@ func main() {
                }
        }
 
-       if neverDelete != true && dataManagerToken != TestDataManagerToken {
-               log.Fatal("never_delete must be true, see #6221")
+       if neverDelete != true {
+               log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " +
+                       "been extensively tested. You should leave this option alone unless you can afford to lose data.")
        }
 
        if blobSigningKeyFile != "" {
@@ -393,7 +310,6 @@ func main() {
        keepClient := &keepclient.KeepClient{
                Arvados:       nil,
                Want_replicas: 1,
-               Using_proxy:   true,
                Client:        &http.Client{},
        }