8936: update go tests to use a blob-signature-ttl different than 1s.
[arvados.git] / services / keepstore / keepstore.go
index c0b8aec7975d978ae0f980b416840d9226a87c04..b17cc79e17d5bf540894801b648f5d60d4d5bdc9 100644 (file)
@@ -1,9 +1,7 @@
 package main
 
 import (
-       "bufio"
        "bytes"
-       "errors"
        "flag"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
@@ -26,41 +24,51 @@ import (
 
 // Default TCP address on which to listen for requests.
 // Initialized by the --listen flag.
-const DEFAULT_ADDR = ":25107"
+const DefaultAddr = ":25107"
 
 // A Keep "block" is 64MB.
-const BLOCKSIZE = 64 * 1024 * 1024
+const BlockSize = 64 * 1024 * 1024
 
-// A Keep volume must have at least MIN_FREE_KILOBYTES available
+// A Keep volume must have at least MinFreeKilobytes available
 // in order to permit writes.
-const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
+const MinFreeKilobytes = BlockSize / 1024
 
-var PROC_MOUNTS = "/proc/mounts"
+// ProcMounts /proc/mounts
+var ProcMounts = "/proc/mounts"
 
-// enforce_permissions controls whether permission signatures
+// enforcePermissions controls whether permission signatures
 // should be enforced (affecting GET and DELETE requests).
 // Initialized by the -enforce-permissions flag.
-var enforce_permissions bool
+var enforcePermissions bool
 
-// blob_signature_ttl is the time duration for which new permission
+// blobSignatureTTL is the time duration for which new permission
 // signatures (returned by PUT requests) will be valid.
 // Initialized by the -permission-ttl flag.
-var blob_signature_ttl time.Duration
+var blobSignatureTTL time.Duration
 
-// data_manager_token represents the API token used by the
+// dataManagerToken represents the API token used by the
 // Data Manager, and is required on certain privileged operations.
 // Initialized by the -data-manager-token-file flag.
-var data_manager_token string
+var dataManagerToken string
 
-// never_delete can be used to prevent the DELETE handler from
+// neverDelete can be used to prevent the DELETE handler from
 // actually deleting anything.
-var never_delete = false
+var neverDelete = true
+
+// trashLifetime is the time duration after a block is trashed
+// during which it can be recovered using an /untrash request
+// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
+var trashLifetime time.Duration
+
+// trashCheckInterval is the time duration at which the emptyTrash goroutine
+// will check and delete expired trashed blocks. Default is one day.
+// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
+var trashCheckInterval time.Duration
 
 var maxBuffers = 128
 var bufs *bufferPool
 
-// ==========
-// Error types.
+// KeepError types.
 //
 type KeepError struct {
        HTTPCode int
@@ -81,6 +89,7 @@ var (
        SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
        TooLongError        = &KeepError{413, "Block is too large"}
        MethodDisabledError = &KeepError{405, "Method disabled"}
+       ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
 )
 
 func (e *KeepError) Error() string {
@@ -109,91 +118,16 @@ var KeepVM VolumeManager
 var pullq *WorkQueue
 var trashq *WorkQueue
 
+type volumeSet []Volume
+
 var (
        flagSerializeIO bool
        flagReadonly    bool
+       volumes         volumeSet
 )
 
-type volumeSet []Volume
-
-func (vs *volumeSet) Set(value string) error {
-       if dirs := strings.Split(value, ","); len(dirs) > 1 {
-               log.Print("DEPRECATED: using comma-separated volume list.")
-               for _, dir := range dirs {
-                       if err := vs.Set(dir); err != nil {
-                               return err
-                       }
-               }
-               return nil
-       }
-       if len(value) == 0 || value[0] != '/' {
-               return errors.New("Invalid volume: must begin with '/'.")
-       }
-       if _, err := os.Stat(value); err != nil {
-               return err
-       }
-       *vs = append(*vs, &UnixVolume{
-               root:      value,
-               serialize: flagSerializeIO,
-               readonly:  flagReadonly,
-       })
-       return nil
-}
-
 func (vs *volumeSet) String() string {
-       s := "["
-       for i, v := range *vs {
-               if i > 0 {
-                       s = s + " "
-               }
-               s = s + v.String()
-       }
-       return s + "]"
-}
-
-// Discover adds a volume for every directory named "keep" that is
-// located at the top level of a device- or tmpfs-backed mount point
-// other than "/". It returns the number of volumes added.
-func (vs *volumeSet) Discover() int {
-       added := 0
-       f, err := os.Open(PROC_MOUNTS)
-       if err != nil {
-               log.Fatalf("opening %s: %s", PROC_MOUNTS, err)
-       }
-       scanner := bufio.NewScanner(f)
-       for scanner.Scan() {
-               args := strings.Fields(scanner.Text())
-               if err := scanner.Err(); err != nil {
-                       log.Fatalf("reading %s: %s", PROC_MOUNTS, err)
-               }
-               dev, mount := args[0], args[1]
-               if mount == "/" {
-                       continue
-               }
-               if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
-                       continue
-               }
-               keepdir := mount + "/keep"
-               if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
-                       continue
-               }
-               // Set the -readonly flag (but only for this volume)
-               // if the filesystem is mounted readonly.
-               flagReadonlyWas := flagReadonly
-               for _, fsopt := range strings.Split(args[3], ",") {
-                       if fsopt == "ro" {
-                               flagReadonly = true
-                               break
-                       }
-                       if fsopt == "rw" {
-                               break
-                       }
-               }
-               vs.Set(keepdir)
-               flagReadonly = flagReadonlyWas
-               added++
-       }
-       return added
+       return fmt.Sprintf("%+v", (*vs)[:])
 }
 
 // TODO(twp): continue moving as much code as possible out of main
@@ -202,59 +136,60 @@ func (vs *volumeSet) Discover() int {
 // permission arguments).
 
 func main() {
-       log.Println("Keep started: pid", os.Getpid())
+       log.Println("keepstore starting, pid", os.Getpid())
+       defer log.Println("keepstore exiting, pid", os.Getpid())
 
        var (
-               data_manager_token_file string
-               listen                  string
-               blob_signing_key_file   string
-               permission_ttl_sec      int
-               volumes                 volumeSet
-               pidfile                 string
+               dataManagerTokenFile string
+               listen               string
+               blobSigningKeyFile   string
+               permissionTTLSec     int
+               pidfile              string
        )
        flag.StringVar(
-               &data_manager_token_file,
+               &dataManagerTokenFile,
                "data-manager-token-file",
                "",
                "File with the API token used by the Data Manager. All DELETE "+
                        "requests or GET /index requests must carry this token.")
        flag.BoolVar(
-               &enforce_permissions,
+               &enforcePermissions,
                "enforce-permissions",
                false,
                "Enforce permission signatures on requests.")
        flag.StringVar(
                &listen,
                "listen",
-               DEFAULT_ADDR,
+               DefaultAddr,
                "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
        flag.BoolVar(
-               &never_delete,
+               &neverDelete,
                "never-delete",
-               false,
-               "If set, nothing will be deleted. HTTP 405 will be returned "+
-                       "for valid DELETE requests.")
+               true,
+               "If true, nothing will be deleted. "+
+                       "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+
+                       "You should leave this option alone unless you can afford to lose data.")
        flag.StringVar(
-               &blob_signing_key_file,
+               &blobSigningKeyFile,
                "permission-key-file",
                "",
                "Synonym for -blob-signing-key-file.")
        flag.StringVar(
-               &blob_signing_key_file,
+               &blobSigningKeyFile,
                "blob-signing-key-file",
                "",
                "File containing the secret key for generating and verifying "+
                        "blob permission signatures.")
        flag.IntVar(
-               &permission_ttl_sec,
+               &permissionTTLSec,
                "permission-ttl",
                0,
                "Synonym for -blob-signature-ttl.")
        flag.IntVar(
-               &permission_ttl_sec,
+               &permissionTTLSec,
                "blob-signature-ttl",
                int(time.Duration(2*7*24*time.Hour).Seconds()),
-               "Lifetime of blob permission signatures. "+
+               "Lifetime of blob permission signatures. Modifying the ttl will invalidate all existing signatures. "+
                        "See services/api/config/application.default.yml.")
        flag.BoolVar(
                &flagSerializeIO,
@@ -266,34 +201,61 @@ func main() {
                "readonly",
                false,
                "Do not write, delete, or touch anything on the following volumes.")
-       flag.Var(
-               &volumes,
-               "volumes",
-               "Deprecated synonym for -volume.")
-       flag.Var(
-               &volumes,
-               "volume",
-               "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
        flag.StringVar(
                &pidfile,
                "pid",
                "",
-               "Path to write pid file")
+               "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
        flag.IntVar(
                &maxBuffers,
                "max-buffers",
                maxBuffers,
-               fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BLOCKSIZE>>20))
+               fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
+       flag.DurationVar(
+               &trashLifetime,
+               "trash-lifetime",
+               0*time.Second,
+               "Time duration after a block is trashed during which it can be recovered using an /untrash request")
+       flag.DurationVar(
+               &trashCheckInterval,
+               "trash-check-interval",
+               24*time.Hour,
+               "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
 
        flag.Parse()
 
        if maxBuffers < 0 {
                log.Fatal("-max-buffers must be greater than zero.")
        }
-       bufs = newBufferPool(maxBuffers, BLOCKSIZE)
+       bufs = newBufferPool(maxBuffers, BlockSize)
+
+       if pidfile != "" {
+               f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
+               if err != nil {
+                       log.Fatalf("open pidfile (%s): %s", pidfile, err)
+               }
+               err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
+               if err != nil {
+                       log.Fatalf("flock pidfile (%s): %s", pidfile, err)
+               }
+               err = f.Truncate(0)
+               if err != nil {
+                       log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
+               }
+               _, err = fmt.Fprint(f, os.Getpid())
+               if err != nil {
+                       log.Fatalf("write pidfile (%s): %s", pidfile, err)
+               }
+               err = f.Sync()
+               if err != nil {
+                       log.Fatalf("sync pidfile (%s): %s", pidfile, err)
+               }
+               defer f.Close()
+               defer os.Remove(pidfile)
+       }
 
        if len(volumes) == 0 {
-               if volumes.Discover() == 0 {
+               if (&unixVolumeAdder{&volumes}).Discover() == 0 {
                        log.Fatal("No volumes found.")
                }
        }
@@ -305,31 +267,37 @@ func main() {
        // Initialize data manager token and permission key.
        // If these tokens are specified but cannot be read,
        // raise a fatal error.
-       if data_manager_token_file != "" {
-               if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
-                       data_manager_token = strings.TrimSpace(string(buf))
+       if dataManagerTokenFile != "" {
+               if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
+                       dataManagerToken = strings.TrimSpace(string(buf))
                } else {
                        log.Fatalf("reading data manager token: %s\n", err)
                }
        }
-       if blob_signing_key_file != "" {
-               if buf, err := ioutil.ReadFile(blob_signing_key_file); err == nil {
+
+       if neverDelete != true {
+               log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " +
+                       "been extensively tested. You should leave this option alone unless you can afford to lose data.")
+       }
+
+       if blobSigningKeyFile != "" {
+               if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
                        PermissionSecret = bytes.TrimSpace(buf)
                } else {
                        log.Fatalf("reading permission key: %s\n", err)
                }
        }
 
-       blob_signature_ttl = time.Duration(permission_ttl_sec) * time.Second
+       blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
 
        if PermissionSecret == nil {
-               if enforce_permissions {
+               if enforcePermissions {
                        log.Fatal("-enforce-permissions requires a permission key")
                } else {
                        log.Println("Running without a PermissionSecret. Block locators " +
                                "returned by this server will not be signed, and will be rejected " +
                                "by a server that enforces permissions.")
-                       log.Println("To fix this, use the -permission-key-file flag " +
+                       log.Println("To fix this, use the -blob-signing-key-file flag " +
                                "to specify the file containing the permission key.")
                }
        }
@@ -353,7 +321,6 @@ func main() {
        keepClient := &keepclient.KeepClient{
                Arvados:       nil,
                Want_replicas: 1,
-               Using_proxy:   true,
                Client:        &http.Client{},
        }
 
@@ -365,33 +332,42 @@ func main() {
        trashq = NewWorkQueue()
        go RunTrashWorker(trashq)
 
+       // Start emptyTrash goroutine
+       doneEmptyingTrash := make(chan bool)
+       go emptyTrash(doneEmptyingTrash, trashCheckInterval)
+
        // Shut down the server gracefully (by closing the listener)
        // if SIGTERM is received.
        term := make(chan os.Signal, 1)
        go func(sig <-chan os.Signal) {
                s := <-sig
                log.Println("caught signal:", s)
+               doneEmptyingTrash <- true
                listener.Close()
        }(term)
        signal.Notify(term, syscall.SIGTERM)
+       signal.Notify(term, syscall.SIGINT)
 
-       if pidfile != "" {
-               f, err := os.Create(pidfile)
-               if err == nil {
-                       fmt.Fprint(f, os.Getpid())
-                       f.Close()
-               } else {
-                       log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
-               }
-       }
-
-       // Start listening for requests.
+       log.Println("listening at", listen)
        srv := &http.Server{Addr: listen}
        srv.Serve(listener)
+}
 
-       log.Println("shutting down")
-
-       if pidfile != "" {
-               os.Remove(pidfile)
+// At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
+func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) {
+       ticker := time.NewTicker(trashCheckInterval)
+
+       for {
+               select {
+               case <-ticker.C:
+                       for _, v := range volumes {
+                               if v.Writable() {
+                                       v.EmptyTrash()
+                               }
+                       }
+               case <-doneEmptyingTrash:
+                       ticker.Stop()
+                       return
+               }
        }
 }