X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/46b11ba2ed71e2c074e9e6c8f5b9f7a003e7067f..2efdb0e205abe63c5ca777fc4dacca65192e5dd3:/services/keepstore/keepstore.go diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index 2528f6d6a6..819d52fe0a 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -4,6 +4,8 @@ import ( "bytes" "flag" "fmt" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/httpserver" "git.curoverse.com/arvados.git/sdk/go/keepclient" "io/ioutil" "log" @@ -33,10 +35,6 @@ const BlockSize = 64 * 1024 * 1024 // in order to permit writes. const MinFreeKilobytes = BlockSize / 1024 -// Until #6221 is resolved, never_delete must be true. -// However, allow it to be false in testing with TestDataManagerToken -const TestDataManagerToken = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h" - // ProcMounts /proc/mounts var ProcMounts = "/proc/mounts" @@ -59,6 +57,16 @@ var dataManagerToken string // actually deleting anything. var neverDelete = true +// trashLifetime is the time duration after a block is trashed +// during which it can be recovered using an /untrash request +// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively. +var trashLifetime time.Duration + +// trashCheckInterval is the time duration at which the emptyTrash goroutine +// will check and delete expired trashed blocks. Default is one day. +// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively. +var trashCheckInterval time.Duration + var maxBuffers = 128 var bufs *bufferPool @@ -83,6 +91,8 @@ var ( SizeRequiredError = &KeepError{411, "Missing Content-Length"} TooLongError = &KeepError{413, "Block is too large"} MethodDisabledError = &KeepError{405, "Method disabled"} + ErrNotImplemented = &KeepError{500, "Unsupported configuration"} + ErrClientDisconnect = &KeepError{503, "Client disconnected"} ) func (e *KeepError) Error() string { @@ -138,6 +148,7 @@ func main() { blobSigningKeyFile string permissionTTLSec int pidfile string + maxRequests int ) flag.StringVar( &dataManagerTokenFile, @@ -155,12 +166,18 @@ func main() { "listen", DefaultAddr, "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.") + flag.IntVar( + &maxRequests, + "max-requests", + 0, + "Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)") flag.BoolVar( &neverDelete, "never-delete", true, - "If set, nothing will be deleted. HTTP 405 will be returned "+ - "for valid DELETE requests.") + "If true, nothing will be deleted. "+ + "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+ + "You should leave this option alone unless you can afford to lose data.") flag.StringVar( &blobSigningKeyFile, "permission-key-file", @@ -181,7 +198,7 @@ func main() { &permissionTTLSec, "blob-signature-ttl", int(time.Duration(2*7*24*time.Hour).Seconds()), - "Lifetime of blob permission signatures. "+ + "Lifetime of blob permission signatures. Modifying the ttl will invalidate all existing signatures. "+ "See services/api/config/application.default.yml.") flag.BoolVar( &flagSerializeIO, @@ -203,6 +220,16 @@ func main() { "max-buffers", maxBuffers, fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20)) + flag.DurationVar( + &trashLifetime, + "trash-lifetime", + 0*time.Second, + "Time duration after a block is trashed during which it can be recovered using an /untrash request") + flag.DurationVar( + &trashCheckInterval, + "trash-check-interval", + 24*time.Hour, + "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.") flag.Parse() @@ -257,8 +284,9 @@ func main() { } } - if neverDelete != true && dataManagerToken != TestDataManagerToken { - log.Fatal("never_delete must be true, see #6221") + if neverDelete != true { + log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " + + "been extensively tested. You should leave this option alone unless you can afford to lose data.") } if blobSigningKeyFile != "" { @@ -283,13 +311,18 @@ func main() { } } + if maxRequests <= 0 { + maxRequests = maxBuffers * 2 + log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests) + } + // Start a round-robin VolumeManager with the volumes we have found. KeepVM = MakeRRVolumeManager(volumes) - // Tell the built-in HTTP server to direct all requests to the REST router. - loggingRouter := MakeLoggingRESTRouter() - http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) { - loggingRouter.ServeHTTP(resp, req) + // Middleware stack: logger, maxRequests limiter, method handlers + http.Handle("/", &LoggingRESTRouter{ + httpserver.NewRequestLimiter(maxRequests, + MakeRESTRouter()), }) // Set up a TCP listener. @@ -300,9 +333,8 @@ func main() { // Initialize Pull queue and worker keepClient := &keepclient.KeepClient{ - Arvados: nil, + Arvados: &arvadosclient.ArvadosClient{}, Want_replicas: 1, - Using_proxy: true, Client: &http.Client{}, } @@ -314,12 +346,17 @@ func main() { trashq = NewWorkQueue() go RunTrashWorker(trashq) + // Start emptyTrash goroutine + doneEmptyingTrash := make(chan bool) + go emptyTrash(doneEmptyingTrash, trashCheckInterval) + // Shut down the server gracefully (by closing the listener) // if SIGTERM is received. term := make(chan os.Signal, 1) go func(sig <-chan os.Signal) { s := <-sig log.Println("caught signal:", s) + doneEmptyingTrash <- true listener.Close() }(term) signal.Notify(term, syscall.SIGTERM) @@ -329,3 +366,22 @@ func main() { srv := &http.Server{Addr: listen} srv.Serve(listener) } + +// At every trashCheckInterval tick, invoke EmptyTrash on all volumes. +func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) { + ticker := time.NewTicker(trashCheckInterval) + + for { + select { + case <-ticker.C: + for _, v := range volumes { + if v.Writable() { + v.EmptyTrash() + } + } + case <-doneEmptyingTrash: + ticker.Stop() + return + } + } +}