9343: Do not check env vars when setting up Keep client for pull requests.
[arvados.git] / services / keepstore / keepstore.go
index 3e360e1799117e80e773e1e5c58fa3b5560b07ef..819d52fe0adecd71670ab89d57f1967b64368b4a 100644 (file)
@@ -1,11 +1,11 @@
 package main
 
 import (
-       "bufio"
        "bytes"
-       "errors"
        "flag"
        "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "io/ioutil"
        "log"
@@ -14,7 +14,6 @@ import (
        "os"
        "os/signal"
        "strings"
-       "sync"
        "syscall"
        "time"
 )
@@ -36,10 +35,6 @@ const BlockSize = 64 * 1024 * 1024
 // in order to permit writes.
 const MinFreeKilobytes = BlockSize / 1024
 
-// Until #6221 is resolved, never_delete must be true.
-// However, allow it to be false in testing with TestDataManagerToken
-const TestDataManagerToken = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
-
 // ProcMounts /proc/mounts
 var ProcMounts = "/proc/mounts"
 
@@ -62,6 +57,16 @@ var dataManagerToken string
 // actually deleting anything.
 var neverDelete = true
 
+// trashLifetime is the time duration after a block is trashed
+// during which it can be recovered using an /untrash request
+// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
+var trashLifetime time.Duration
+
+// trashCheckInterval is the time duration at which the emptyTrash goroutine
+// will check and delete expired trashed blocks. Default is one day.
+// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
+var trashCheckInterval time.Duration
+
 var maxBuffers = 128
 var bufs *bufferPool
 
@@ -86,6 +91,8 @@ var (
        SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
        TooLongError        = &KeepError{413, "Block is too large"}
        MethodDisabledError = &KeepError{405, "Method disabled"}
+       ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
+       ErrClientDisconnect = &KeepError{503, "Client disconnected"}
 )
 
 func (e *KeepError) Error() string {
@@ -114,95 +121,16 @@ var KeepVM VolumeManager
 var pullq *WorkQueue
 var trashq *WorkQueue
 
+type volumeSet []Volume
+
 var (
        flagSerializeIO bool
        flagReadonly    bool
+       volumes         volumeSet
 )
 
-type volumeSet []Volume
-
-func (vs *volumeSet) Set(value string) error {
-       if dirs := strings.Split(value, ","); len(dirs) > 1 {
-               log.Print("DEPRECATED: using comma-separated volume list.")
-               for _, dir := range dirs {
-                       if err := vs.Set(dir); err != nil {
-                               return err
-                       }
-               }
-               return nil
-       }
-       if len(value) == 0 || value[0] != '/' {
-               return errors.New("Invalid volume: must begin with '/'.")
-       }
-       if _, err := os.Stat(value); err != nil {
-               return err
-       }
-       var locker sync.Locker
-       if flagSerializeIO {
-               locker = &sync.Mutex{}
-       }
-       *vs = append(*vs, &UnixVolume{
-               root:     value,
-               locker:   locker,
-               readonly: flagReadonly,
-       })
-       return nil
-}
-
 func (vs *volumeSet) String() string {
-       s := "["
-       for i, v := range *vs {
-               if i > 0 {
-                       s = s + " "
-               }
-               s = s + v.String()
-       }
-       return s + "]"
-}
-
-// Discover adds a volume for every directory named "keep" that is
-// located at the top level of a device- or tmpfs-backed mount point
-// other than "/". It returns the number of volumes added.
-func (vs *volumeSet) Discover() int {
-       added := 0
-       f, err := os.Open(ProcMounts)
-       if err != nil {
-               log.Fatalf("opening %s: %s", ProcMounts, err)
-       }
-       scanner := bufio.NewScanner(f)
-       for scanner.Scan() {
-               args := strings.Fields(scanner.Text())
-               if err := scanner.Err(); err != nil {
-                       log.Fatalf("reading %s: %s", ProcMounts, err)
-               }
-               dev, mount := args[0], args[1]
-               if mount == "/" {
-                       continue
-               }
-               if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
-                       continue
-               }
-               keepdir := mount + "/keep"
-               if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
-                       continue
-               }
-               // Set the -readonly flag (but only for this volume)
-               // if the filesystem is mounted readonly.
-               flagReadonlyWas := flagReadonly
-               for _, fsopt := range strings.Split(args[3], ",") {
-                       if fsopt == "ro" {
-                               flagReadonly = true
-                               break
-                       }
-                       if fsopt == "rw" {
-                               break
-                       }
-               }
-               vs.Set(keepdir)
-               flagReadonly = flagReadonlyWas
-               added++
-       }
-       return added
+       return fmt.Sprintf("%+v", (*vs)[:])
 }
 
 // TODO(twp): continue moving as much code as possible out of main
@@ -219,8 +147,8 @@ func main() {
                listen               string
                blobSigningKeyFile   string
                permissionTTLSec     int
-               volumes              volumeSet
                pidfile              string
+               maxRequests          int
        )
        flag.StringVar(
                &dataManagerTokenFile,
@@ -238,12 +166,18 @@ func main() {
                "listen",
                DefaultAddr,
                "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
+       flag.IntVar(
+               &maxRequests,
+               "max-requests",
+               0,
+               "Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)")
        flag.BoolVar(
                &neverDelete,
                "never-delete",
                true,
-               "If set, nothing will be deleted. HTTP 405 will be returned "+
-                       "for valid DELETE requests.")
+               "If true, nothing will be deleted. "+
+                       "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+
+                       "You should leave this option alone unless you can afford to lose data.")
        flag.StringVar(
                &blobSigningKeyFile,
                "permission-key-file",
@@ -264,7 +198,7 @@ func main() {
                &permissionTTLSec,
                "blob-signature-ttl",
                int(time.Duration(2*7*24*time.Hour).Seconds()),
-               "Lifetime of blob permission signatures. "+
+               "Lifetime of blob permission signatures. Modifying the ttl will invalidate all existing signatures. "+
                        "See services/api/config/application.default.yml.")
        flag.BoolVar(
                &flagSerializeIO,
@@ -276,14 +210,6 @@ func main() {
                "readonly",
                false,
                "Do not write, delete, or touch anything on the following volumes.")
-       flag.Var(
-               &volumes,
-               "volumes",
-               "Deprecated synonym for -volume.")
-       flag.Var(
-               &volumes,
-               "volume",
-               "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
        flag.StringVar(
                &pidfile,
                "pid",
@@ -294,6 +220,16 @@ func main() {
                "max-buffers",
                maxBuffers,
                fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
+       flag.DurationVar(
+               &trashLifetime,
+               "trash-lifetime",
+               0*time.Second,
+               "Time duration after a block is trashed during which it can be recovered using an /untrash request")
+       flag.DurationVar(
+               &trashCheckInterval,
+               "trash-check-interval",
+               24*time.Hour,
+               "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
 
        flag.Parse()
 
@@ -328,7 +264,7 @@ func main() {
        }
 
        if len(volumes) == 0 {
-               if volumes.Discover() == 0 {
+               if (&unixVolumeAdder{&volumes}).Discover() == 0 {
                        log.Fatal("No volumes found.")
                }
        }
@@ -348,8 +284,9 @@ func main() {
                }
        }
 
-       if neverDelete != true && dataManagerToken != TestDataManagerToken {
-               log.Fatal("never_delete must be true, see #6221")
+       if neverDelete != true {
+               log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " +
+                       "been extensively tested. You should leave this option alone unless you can afford to lose data.")
        }
 
        if blobSigningKeyFile != "" {
@@ -374,13 +311,18 @@ func main() {
                }
        }
 
+       if maxRequests <= 0 {
+               maxRequests = maxBuffers * 2
+               log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests)
+       }
+
        // Start a round-robin VolumeManager with the volumes we have found.
        KeepVM = MakeRRVolumeManager(volumes)
 
-       // Tell the built-in HTTP server to direct all requests to the REST router.
-       loggingRouter := MakeLoggingRESTRouter()
-       http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
-               loggingRouter.ServeHTTP(resp, req)
+       // Middleware stack: logger, maxRequests limiter, method handlers
+       http.Handle("/", &LoggingRESTRouter{
+               httpserver.NewRequestLimiter(maxRequests,
+                       MakeRESTRouter()),
        })
 
        // Set up a TCP listener.
@@ -391,9 +333,8 @@ func main() {
 
        // Initialize Pull queue and worker
        keepClient := &keepclient.KeepClient{
-               Arvados:       nil,
+               Arvados:       &arvadosclient.ArvadosClient{},
                Want_replicas: 1,
-               Using_proxy:   true,
                Client:        &http.Client{},
        }
 
@@ -405,12 +346,17 @@ func main() {
        trashq = NewWorkQueue()
        go RunTrashWorker(trashq)
 
+       // Start emptyTrash goroutine
+       doneEmptyingTrash := make(chan bool)
+       go emptyTrash(doneEmptyingTrash, trashCheckInterval)
+
        // Shut down the server gracefully (by closing the listener)
        // if SIGTERM is received.
        term := make(chan os.Signal, 1)
        go func(sig <-chan os.Signal) {
                s := <-sig
                log.Println("caught signal:", s)
+               doneEmptyingTrash <- true
                listener.Close()
        }(term)
        signal.Notify(term, syscall.SIGTERM)
@@ -420,3 +366,22 @@ func main() {
        srv := &http.Server{Addr: listen}
        srv.Serve(listener)
 }
+
+// At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
+func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) {
+       ticker := time.NewTicker(trashCheckInterval)
+
+       for {
+               select {
+               case <-ticker.C:
+                       for _, v := range volumes {
+                               if v.Writable() {
+                                       v.EmptyTrash()
+                               }
+                       }
+               case <-doneEmptyingTrash:
+                       ticker.Stop()
+                       return
+               }
+       }
+}