9343: Do not check env vars when setting up Keep client for pull requests.
[arvados.git] / services / keepstore / keepstore.go
index 7525441aaec995d4a524a7ff20f954ee06ae50c9..819d52fe0adecd71670ab89d57f1967b64368b4a 100644 (file)
@@ -4,6 +4,8 @@ import (
        "bytes"
        "flag"
        "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "io/ioutil"
        "log"
@@ -55,6 +57,16 @@ var dataManagerToken string
 // actually deleting anything.
 var neverDelete = true
 
+// trashLifetime is the time duration after a block is trashed
+// during which it can be recovered using an /untrash request
+// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
+var trashLifetime time.Duration
+
+// trashCheckInterval is the time duration at which the emptyTrash goroutine
+// will check and delete expired trashed blocks. Default is one day.
+// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
+var trashCheckInterval time.Duration
+
 var maxBuffers = 128
 var bufs *bufferPool
 
@@ -79,6 +91,8 @@ var (
        SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
        TooLongError        = &KeepError{413, "Block is too large"}
        MethodDisabledError = &KeepError{405, "Method disabled"}
+       ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
+       ErrClientDisconnect = &KeepError{503, "Client disconnected"}
 )
 
 func (e *KeepError) Error() string {
@@ -134,6 +148,7 @@ func main() {
                blobSigningKeyFile   string
                permissionTTLSec     int
                pidfile              string
+               maxRequests          int
        )
        flag.StringVar(
                &dataManagerTokenFile,
@@ -151,6 +166,11 @@ func main() {
                "listen",
                DefaultAddr,
                "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
+       flag.IntVar(
+               &maxRequests,
+               "max-requests",
+               0,
+               "Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)")
        flag.BoolVar(
                &neverDelete,
                "never-delete",
@@ -178,7 +198,7 @@ func main() {
                &permissionTTLSec,
                "blob-signature-ttl",
                int(time.Duration(2*7*24*time.Hour).Seconds()),
-               "Lifetime of blob permission signatures. "+
+               "Lifetime of blob permission signatures. Modifying the ttl will invalidate all existing signatures. "+
                        "See services/api/config/application.default.yml.")
        flag.BoolVar(
                &flagSerializeIO,
@@ -200,6 +220,16 @@ func main() {
                "max-buffers",
                maxBuffers,
                fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
+       flag.DurationVar(
+               &trashLifetime,
+               "trash-lifetime",
+               0*time.Second,
+               "Time duration after a block is trashed during which it can be recovered using an /untrash request")
+       flag.DurationVar(
+               &trashCheckInterval,
+               "trash-check-interval",
+               24*time.Hour,
+               "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
 
        flag.Parse()
 
@@ -281,13 +311,18 @@ func main() {
                }
        }
 
+       if maxRequests <= 0 {
+               maxRequests = maxBuffers * 2
+               log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests)
+       }
+
        // Start a round-robin VolumeManager with the volumes we have found.
        KeepVM = MakeRRVolumeManager(volumes)
 
-       // Tell the built-in HTTP server to direct all requests to the REST router.
-       loggingRouter := MakeLoggingRESTRouter()
-       http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
-               loggingRouter.ServeHTTP(resp, req)
+       // Middleware stack: logger, maxRequests limiter, method handlers
+       http.Handle("/", &LoggingRESTRouter{
+               httpserver.NewRequestLimiter(maxRequests,
+                       MakeRESTRouter()),
        })
 
        // Set up a TCP listener.
@@ -298,9 +333,8 @@ func main() {
 
        // Initialize Pull queue and worker
        keepClient := &keepclient.KeepClient{
-               Arvados:       nil,
+               Arvados:       &arvadosclient.ArvadosClient{},
                Want_replicas: 1,
-               Using_proxy:   true,
                Client:        &http.Client{},
        }
 
@@ -312,12 +346,17 @@ func main() {
        trashq = NewWorkQueue()
        go RunTrashWorker(trashq)
 
+       // Start emptyTrash goroutine
+       doneEmptyingTrash := make(chan bool)
+       go emptyTrash(doneEmptyingTrash, trashCheckInterval)
+
        // Shut down the server gracefully (by closing the listener)
        // if SIGTERM is received.
        term := make(chan os.Signal, 1)
        go func(sig <-chan os.Signal) {
                s := <-sig
                log.Println("caught signal:", s)
+               doneEmptyingTrash <- true
                listener.Close()
        }(term)
        signal.Notify(term, syscall.SIGTERM)
@@ -327,3 +366,22 @@ func main() {
        srv := &http.Server{Addr: listen}
        srv.Serve(listener)
 }
+
+// At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
+func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) {
+       ticker := time.NewTicker(trashCheckInterval)
+
+       for {
+               select {
+               case <-ticker.C:
+                       for _, v := range volumes {
+                               if v.Writable() {
+                                       v.EmptyTrash()
+                               }
+                       }
+               case <-doneEmptyingTrash:
+                       ticker.Stop()
+                       return
+               }
+       }
+}