"bytes"
"flag"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"io/ioutil"
"log"
// actually deleting anything.
var neverDelete = true
+// trashLifetime is the time duration after a block is trashed
+// during which it can be recovered using an /untrash request
+// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
+var trashLifetime time.Duration
+
+// trashCheckInterval is the time duration at which the emptyTrash goroutine
+// will check and delete expired trashed blocks. Default is one day.
+// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively.
+var trashCheckInterval time.Duration
+
var maxBuffers = 128
var bufs *bufferPool
SizeRequiredError = &KeepError{411, "Missing Content-Length"}
TooLongError = &KeepError{413, "Block is too large"}
MethodDisabledError = &KeepError{405, "Method disabled"}
+ ErrNotImplemented = &KeepError{500, "Unsupported configuration"}
+ ErrClientDisconnect = &KeepError{503, "Client disconnected"}
)
func (e *KeepError) Error() string {
blobSigningKeyFile string
permissionTTLSec int
pidfile string
+ maxRequests int
)
flag.StringVar(
&dataManagerTokenFile,
"listen",
DefaultAddr,
"Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
+ flag.IntVar(
+ &maxRequests,
+ "max-requests",
+ 0,
+ "Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)")
flag.BoolVar(
&neverDelete,
"never-delete",
&permissionTTLSec,
"blob-signature-ttl",
int(time.Duration(2*7*24*time.Hour).Seconds()),
- "Lifetime of blob permission signatures. "+
+ "Lifetime of blob permission signatures. Modifying the ttl will invalidate all existing signatures. "+
"See services/api/config/application.default.yml.")
flag.BoolVar(
&flagSerializeIO,
"max-buffers",
maxBuffers,
fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
+ flag.DurationVar(
+ &trashLifetime,
+ "trash-lifetime",
+ 0*time.Second,
+ "Time duration after a block is trashed during which it can be recovered using an /untrash request")
+ flag.DurationVar(
+ &trashCheckInterval,
+ "trash-check-interval",
+ 24*time.Hour,
+ "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.")
flag.Parse()
}
}
+ if maxRequests <= 0 {
+ maxRequests = maxBuffers * 2
+ log.Printf("-max-requests <1 or not specified; defaulting to maxBuffers * 2 == %d", maxRequests)
+ }
+
// Start a round-robin VolumeManager with the volumes we have found.
KeepVM = MakeRRVolumeManager(volumes)
- // Tell the built-in HTTP server to direct all requests to the REST router.
- loggingRouter := MakeLoggingRESTRouter()
- http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
- loggingRouter.ServeHTTP(resp, req)
+ // Middleware stack: logger, maxRequests limiter, method handlers
+ http.Handle("/", &LoggingRESTRouter{
+ httpserver.NewRequestLimiter(maxRequests,
+ MakeRESTRouter()),
})
// Set up a TCP listener.
trashq = NewWorkQueue()
go RunTrashWorker(trashq)
+ // Start emptyTrash goroutine
+ doneEmptyingTrash := make(chan bool)
+ go emptyTrash(doneEmptyingTrash, trashCheckInterval)
+
// Shut down the server gracefully (by closing the listener)
// if SIGTERM is received.
term := make(chan os.Signal, 1)
go func(sig <-chan os.Signal) {
s := <-sig
log.Println("caught signal:", s)
+ doneEmptyingTrash <- true
listener.Close()
}(term)
signal.Notify(term, syscall.SIGTERM)
srv := &http.Server{Addr: listen}
srv.Serve(listener)
}
+
+// At every trashCheckInterval tick, invoke EmptyTrash on all volumes.
+func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) {
+ ticker := time.NewTicker(trashCheckInterval)
+
+ for {
+ select {
+ case <-ticker.C:
+ for _, v := range volumes {
+ if v.Writable() {
+ v.EmptyTrash()
+ }
+ }
+ case <-doneEmptyingTrash:
+ ticker.Stop()
+ return
+ }
+ }
+}