X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0662b235357dd40b5d27efd06b60044ddcec06f6..d4ac44b6224001cb9dc1d86e180c2ac48ef1bce6:/services/keepstore/keepstore.go diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index 96a887fecb..40e62c5c50 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -55,6 +55,16 @@ var dataManagerToken string // actually deleting anything. var neverDelete = true +// trashLifetime is the time duration after a block is trashed +// during which it can be recovered using an /untrash request +// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively. +var trashLifetime time.Duration + +// trashCheckInterval is the time duration at which the emptyTrash goroutine +// will check and delete expired trashed blocks. Default is one day. +// Use 10s or 10m or 10h to set as 10 seconds or minutes or hours respectively. +var trashCheckInterval time.Duration + var maxBuffers = 128 var bufs *bufferPool @@ -79,6 +89,7 @@ var ( SizeRequiredError = &KeepError{411, "Missing Content-Length"} TooLongError = &KeepError{413, "Block is too large"} MethodDisabledError = &KeepError{405, "Method disabled"} + ErrNotImplemented = &KeepError{500, "Unsupported configuration"} ) func (e *KeepError) Error() string { @@ -200,6 +211,16 @@ func main() { "max-buffers", maxBuffers, fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20)) + flag.DurationVar( + &trashLifetime, + "trash-lifetime", + 0*time.Second, + "Time duration after a block is trashed during which it can be recovered using an /untrash request") + flag.DurationVar( + &trashCheckInterval, + "trash-check-interval", + 24*time.Hour, + "Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day.") flag.Parse() @@ -311,12 +332,17 @@ func main() { trashq = NewWorkQueue() go RunTrashWorker(trashq) + // Start emptyTrash goroutine + doneEmptyingTrash := make(chan bool) + go emptyTrash(doneEmptyingTrash, trashCheckInterval) + // Shut down the server gracefully (by closing the listener) // if SIGTERM is received. term := make(chan os.Signal, 1) go func(sig <-chan os.Signal) { s := <-sig log.Println("caught signal:", s) + doneEmptyingTrash <- true listener.Close() }(term) signal.Notify(term, syscall.SIGTERM) @@ -326,3 +352,22 @@ func main() { srv := &http.Server{Addr: listen} srv.Serve(listener) } + +// At every trashCheckInterval tick, invoke EmptyTrash on all volumes. +func emptyTrash(doneEmptyingTrash chan bool, trashCheckInterval time.Duration) { + ticker := time.NewTicker(trashCheckInterval) + + for { + select { + case <-ticker.C: + for _, v := range volumes { + if v.Writable() { + v.EmptyTrash() + } + } + case <-doneEmptyingTrash: + ticker.Stop() + return + } + } +}