X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/fcd19228447a2983c7aba8a0f09984eba06ded48..9a4fcabed1adeff0044d419977d5136c5cb1db3e:/services/keepstore/keepstore.go diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index 83974ffc2c..f2973b586a 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -1,61 +1,23 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( - "bytes" - "flag" - "fmt" - "git.curoverse.com/arvados.git/sdk/go/keepclient" - "io/ioutil" - "log" - "net" - "net/http" - "os" - "os/signal" - "strings" - "syscall" "time" ) -// ====================== -// Configuration settings -// -// TODO(twp): make all of these configurable via command line flags -// and/or configuration file settings. - -// Default TCP address on which to listen for requests. -// Initialized by the --listen flag. -const DEFAULT_ADDR = ":25107" - // A Keep "block" is 64MB. -const BLOCKSIZE = 64 * 1024 * 1024 +const BlockSize = 64 * 1024 * 1024 -// A Keep volume must have at least MIN_FREE_KILOBYTES available +// A Keep volume must have at least MinFreeKilobytes available // in order to permit writes. -const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024 - -var PROC_MOUNTS = "/proc/mounts" - -// enforce_permissions controls whether permission signatures -// should be enforced (affecting GET and DELETE requests). -// Initialized by the --enforce-permissions flag. -var enforce_permissions bool +const MinFreeKilobytes = BlockSize / 1024 -// permission_ttl is the time duration for which new permission -// signatures (returned by PUT requests) will be valid. -// Initialized by the --permission-ttl flag. -var permission_ttl time.Duration +var bufs *bufferPool -// data_manager_token represents the API token used by the -// Data Manager, and is required on certain privileged operations. -// Initialized by the --data-manager-token-file flag. -var data_manager_token string - -// never_delete can be used to prevent the DELETE handler from -// actually deleting anything. -var never_delete = false - -// ========== -// Error types. +// KeepError types. // type KeepError struct { HTTPCode int @@ -71,248 +33,25 @@ var ( DiskHashError = &KeepError{500, "Hash mismatch in stored data"} ExpiredError = &KeepError{401, "Expired permission signature"} NotFoundError = &KeepError{404, "Not Found"} + VolumeBusyError = &KeepError{503, "Volume backend busy"} GenericError = &KeepError{500, "Fail"} FullError = &KeepError{503, "Full"} - TooLongError = &KeepError{504, "Timeout"} + SizeRequiredError = &KeepError{411, "Missing Content-Length"} + TooLongError = &KeepError{413, "Block is too large"} MethodDisabledError = &KeepError{405, "Method disabled"} + ErrNotImplemented = &KeepError{500, "Unsupported configuration"} + ErrClientDisconnect = &KeepError{503, "Client disconnected"} ) func (e *KeepError) Error() string { return e.ErrMsg } -// ======================== -// Internal data structures -// -// These global variables are used by multiple parts of the -// program. They are good candidates for moving into their own -// packages. - -// The Keep VolumeManager maintains a list of available volumes. -// Initialized by the --volumes flag (or by FindKeepVolumes). -var KeepVM VolumeManager - -// The pull list manager and trash queue are threadsafe queues which -// support atomic update operations. The PullHandler and TrashHandler -// store results from Data Manager /pull and /trash requests here. -// -// See the Keep and Data Manager design documents for more details: -// https://arvados.org/projects/arvados/wiki/Keep_Design_Doc -// https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc -// -var pullq *WorkQueue -var trashq *WorkQueue - -// TODO(twp): continue moving as much code as possible out of main -// so it can be effectively tested. Esp. handling and postprocessing -// of command line flags (identifying Keep volumes and initializing -// permission arguments). - -func main() { - log.Println("Keep started: pid", os.Getpid()) - - // Parse command-line flags: - // - // -listen=ipaddr:port - // Interface on which to listen for requests. Use :port without - // an ipaddr to listen on all network interfaces. - // Examples: - // -listen=127.0.0.1:4949 - // -listen=10.0.1.24:8000 - // -listen=:25107 (to listen to port 25107 on all interfaces) - // - // -volumes - // A comma-separated list of directories to use as Keep volumes. - // Example: - // -volumes=/var/keep01,/var/keep02,/var/keep03/subdir - // - // If -volumes is empty or is not present, Keep will select volumes - // by looking at currently mounted filesystems for /keep top-level - // directories. - - var ( - data_manager_token_file string - listen string - permission_key_file string - permission_ttl_sec int - serialize_io bool - volumearg string - pidfile string - ) - flag.StringVar( - &data_manager_token_file, - "data-manager-token-file", - "", - "File with the API token used by the Data Manager. All DELETE "+ - "requests or GET /index requests must carry this token.") - flag.BoolVar( - &enforce_permissions, - "enforce-permissions", - false, - "Enforce permission signatures on requests.") - flag.StringVar( - &listen, - "listen", - DEFAULT_ADDR, - "Interface on which to listen for requests, in the format "+ - "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+ - "to listen on all network interfaces.") - flag.BoolVar( - &never_delete, - "never-delete", - false, - "If set, nothing will be deleted. HTTP 405 will be returned "+ - "for valid DELETE requests.") - flag.StringVar( - &permission_key_file, - "permission-key-file", - "", - "File containing the secret key for generating and verifying "+ - "permission signatures.") - flag.IntVar( - &permission_ttl_sec, - "permission-ttl", - 1209600, - "Expiration time (in seconds) for newly generated permission "+ - "signatures.") - flag.BoolVar( - &serialize_io, - "serialize", - false, - "If set, all read and write operations on local Keep volumes will "+ - "be serialized.") - flag.StringVar( - &volumearg, - "volumes", - "", - "Comma-separated list of directories to use for Keep volumes, "+ - "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+ - "supplied, Keep will scan mounted filesystems for volumes "+ - "with a /keep top-level directory.") - - flag.StringVar( - &pidfile, - "pid", - "", - "Path to write pid file") - - flag.Parse() - - // Look for local keep volumes. - var keepvols []string - if volumearg == "" { - // TODO(twp): decide whether this is desirable default behavior. - // In production we may want to require the admin to specify - // Keep volumes explicitly. - keepvols = FindKeepVolumes() - } else { - keepvols = strings.Split(volumearg, ",") - } - - // Check that the specified volumes actually exist. - var goodvols []Volume = nil - for _, v := range keepvols { - if _, err := os.Stat(v); err == nil { - log.Println("adding Keep volume:", v) - newvol := MakeUnixVolume(v, serialize_io) - goodvols = append(goodvols, &newvol) - } else { - log.Printf("bad Keep volume: %s\n", err) +// Periodically (once per interval) invoke EmptyTrash on all volumes. +func emptyTrash(mounts []*VolumeMount, interval time.Duration) { + for range time.NewTicker(interval).C { + for _, v := range mounts { + v.EmptyTrash() } } - - if len(goodvols) == 0 { - log.Fatal("could not find any keep volumes") - } - - // Initialize data manager token and permission key. - // If these tokens are specified but cannot be read, - // raise a fatal error. - if data_manager_token_file != "" { - if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil { - data_manager_token = strings.TrimSpace(string(buf)) - } else { - log.Fatalf("reading data manager token: %s\n", err) - } - } - if permission_key_file != "" { - if buf, err := ioutil.ReadFile(permission_key_file); err == nil { - PermissionSecret = bytes.TrimSpace(buf) - } else { - log.Fatalf("reading permission key: %s\n", err) - } - } - - // Initialize permission TTL - permission_ttl = time.Duration(permission_ttl_sec) * time.Second - - // If --enforce-permissions is true, we must have a permission key - // to continue. - if PermissionSecret == nil { - if enforce_permissions { - log.Fatal("--enforce-permissions requires a permission key") - } else { - log.Println("Running without a PermissionSecret. Block locators " + - "returned by this server will not be signed, and will be rejected " + - "by a server that enforces permissions.") - log.Println("To fix this, run Keep with --permission-key-file= " + - "to define the location of a file containing the permission key.") - } - } - - // Start a round-robin VolumeManager with the volumes we have found. - KeepVM = MakeRRVolumeManager(goodvols) - - // Tell the built-in HTTP server to direct all requests to the REST router. - loggingRouter := MakeLoggingRESTRouter() - http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) { - loggingRouter.ServeHTTP(resp, req) - }) - - // Set up a TCP listener. - listener, err := net.Listen("tcp", listen) - if err != nil { - log.Fatal(err) - } - - // Initialize Pull queue and worker - keepClient := keepclient.KeepClient{ - Arvados: nil, - Want_replicas: 1, - Using_proxy: true, - Client: &http.Client{}, - } - - pullq = NewWorkQueue() - go RunPullWorker(pullq, keepClient) - - // Shut down the server gracefully (by closing the listener) - // if SIGTERM is received. - term := make(chan os.Signal, 1) - go func(sig <-chan os.Signal) { - s := <-sig - log.Println("caught signal:", s) - listener.Close() - }(term) - signal.Notify(term, syscall.SIGTERM) - - if pidfile != "" { - f, err := os.Create(pidfile) - if err == nil { - fmt.Fprint(f, os.Getpid()) - f.Close() - } else { - log.Printf("Error writing pid file (%s): %s", pidfile, err.Error()) - } - } - - // Start listening for requests. - srv := &http.Server{Addr: listen} - srv.Serve(listener) - - log.Println("shutting down") - - if pidfile != "" { - os.Remove(pidfile) - } }