X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a9b4a906124081439326fedc1a1e73fae9c2f40b..e2197875b3fa58b235268a86170fec582c1a7f59:/services/keepstore/keepstore.go diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index 83974ffc2c..96a887fecb 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -24,38 +24,41 @@ import ( // Default TCP address on which to listen for requests. // Initialized by the --listen flag. -const DEFAULT_ADDR = ":25107" +const DefaultAddr = ":25107" // A Keep "block" is 64MB. -const BLOCKSIZE = 64 * 1024 * 1024 +const BlockSize = 64 * 1024 * 1024 -// A Keep volume must have at least MIN_FREE_KILOBYTES available +// A Keep volume must have at least MinFreeKilobytes available // in order to permit writes. -const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024 +const MinFreeKilobytes = BlockSize / 1024 -var PROC_MOUNTS = "/proc/mounts" +// ProcMounts /proc/mounts +var ProcMounts = "/proc/mounts" -// enforce_permissions controls whether permission signatures +// enforcePermissions controls whether permission signatures // should be enforced (affecting GET and DELETE requests). -// Initialized by the --enforce-permissions flag. -var enforce_permissions bool +// Initialized by the -enforce-permissions flag. +var enforcePermissions bool -// permission_ttl is the time duration for which new permission +// blobSignatureTTL is the time duration for which new permission // signatures (returned by PUT requests) will be valid. -// Initialized by the --permission-ttl flag. -var permission_ttl time.Duration +// Initialized by the -permission-ttl flag. +var blobSignatureTTL time.Duration -// data_manager_token represents the API token used by the +// dataManagerToken represents the API token used by the // Data Manager, and is required on certain privileged operations. -// Initialized by the --data-manager-token-file flag. -var data_manager_token string +// Initialized by the -data-manager-token-file flag. +var dataManagerToken string -// never_delete can be used to prevent the DELETE handler from +// neverDelete can be used to prevent the DELETE handler from // actually deleting anything. -var never_delete = false +var neverDelete = true -// ========== -// Error types. +var maxBuffers = 128 +var bufs *bufferPool + +// KeepError types. // type KeepError struct { HTTPCode int @@ -73,7 +76,8 @@ var ( NotFoundError = &KeepError{404, "Not Found"} GenericError = &KeepError{500, "Fail"} FullError = &KeepError{503, "Full"} - TooLongError = &KeepError{504, "Timeout"} + SizeRequiredError = &KeepError{411, "Missing Content-Length"} + TooLongError = &KeepError{413, "Block is too large"} MethodDisabledError = &KeepError{405, "Method disabled"} ) @@ -103,165 +107,182 @@ var KeepVM VolumeManager var pullq *WorkQueue var trashq *WorkQueue +type volumeSet []Volume + +var ( + flagSerializeIO bool + flagReadonly bool + volumes volumeSet +) + +func (vs *volumeSet) String() string { + return fmt.Sprintf("%+v", (*vs)[:]) +} + // TODO(twp): continue moving as much code as possible out of main // so it can be effectively tested. Esp. handling and postprocessing // of command line flags (identifying Keep volumes and initializing // permission arguments). func main() { - log.Println("Keep started: pid", os.Getpid()) - - // Parse command-line flags: - // - // -listen=ipaddr:port - // Interface on which to listen for requests. Use :port without - // an ipaddr to listen on all network interfaces. - // Examples: - // -listen=127.0.0.1:4949 - // -listen=10.0.1.24:8000 - // -listen=:25107 (to listen to port 25107 on all interfaces) - // - // -volumes - // A comma-separated list of directories to use as Keep volumes. - // Example: - // -volumes=/var/keep01,/var/keep02,/var/keep03/subdir - // - // If -volumes is empty or is not present, Keep will select volumes - // by looking at currently mounted filesystems for /keep top-level - // directories. + log.Println("keepstore starting, pid", os.Getpid()) + defer log.Println("keepstore exiting, pid", os.Getpid()) var ( - data_manager_token_file string - listen string - permission_key_file string - permission_ttl_sec int - serialize_io bool - volumearg string - pidfile string + dataManagerTokenFile string + listen string + blobSigningKeyFile string + permissionTTLSec int + pidfile string ) flag.StringVar( - &data_manager_token_file, + &dataManagerTokenFile, "data-manager-token-file", "", "File with the API token used by the Data Manager. All DELETE "+ "requests or GET /index requests must carry this token.") flag.BoolVar( - &enforce_permissions, + &enforcePermissions, "enforce-permissions", false, "Enforce permission signatures on requests.") flag.StringVar( &listen, "listen", - DEFAULT_ADDR, - "Interface on which to listen for requests, in the format "+ - "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+ - "to listen on all network interfaces.") + DefaultAddr, + "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.") flag.BoolVar( - &never_delete, + &neverDelete, "never-delete", - false, - "If set, nothing will be deleted. HTTP 405 will be returned "+ - "for valid DELETE requests.") + true, + "If true, nothing will be deleted. "+ + "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+ + "You should leave this option alone unless you can afford to lose data.") flag.StringVar( - &permission_key_file, + &blobSigningKeyFile, "permission-key-file", "", + "Synonym for -blob-signing-key-file.") + flag.StringVar( + &blobSigningKeyFile, + "blob-signing-key-file", + "", "File containing the secret key for generating and verifying "+ - "permission signatures.") + "blob permission signatures.") flag.IntVar( - &permission_ttl_sec, + &permissionTTLSec, "permission-ttl", - 1209600, - "Expiration time (in seconds) for newly generated permission "+ - "signatures.") + 0, + "Synonym for -blob-signature-ttl.") + flag.IntVar( + &permissionTTLSec, + "blob-signature-ttl", + int(time.Duration(2*7*24*time.Hour).Seconds()), + "Lifetime of blob permission signatures. "+ + "See services/api/config/application.default.yml.") flag.BoolVar( - &serialize_io, + &flagSerializeIO, "serialize", false, - "If set, all read and write operations on local Keep volumes will "+ - "be serialized.") - flag.StringVar( - &volumearg, - "volumes", - "", - "Comma-separated list of directories to use for Keep volumes, "+ - "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+ - "supplied, Keep will scan mounted filesystems for volumes "+ - "with a /keep top-level directory.") - + "Serialize read and write operations on the following volumes.") + flag.BoolVar( + &flagReadonly, + "readonly", + false, + "Do not write, delete, or touch anything on the following volumes.") flag.StringVar( &pidfile, "pid", "", - "Path to write pid file") + "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.") + flag.IntVar( + &maxBuffers, + "max-buffers", + maxBuffers, + fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20)) flag.Parse() - // Look for local keep volumes. - var keepvols []string - if volumearg == "" { - // TODO(twp): decide whether this is desirable default behavior. - // In production we may want to require the admin to specify - // Keep volumes explicitly. - keepvols = FindKeepVolumes() - } else { - keepvols = strings.Split(volumearg, ",") + if maxBuffers < 0 { + log.Fatal("-max-buffers must be greater than zero.") } + bufs = newBufferPool(maxBuffers, BlockSize) - // Check that the specified volumes actually exist. - var goodvols []Volume = nil - for _, v := range keepvols { - if _, err := os.Stat(v); err == nil { - log.Println("adding Keep volume:", v) - newvol := MakeUnixVolume(v, serialize_io) - goodvols = append(goodvols, &newvol) - } else { - log.Printf("bad Keep volume: %s\n", err) + if pidfile != "" { + f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777) + if err != nil { + log.Fatalf("open pidfile (%s): %s", pidfile, err) + } + err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) + if err != nil { + log.Fatalf("flock pidfile (%s): %s", pidfile, err) + } + err = f.Truncate(0) + if err != nil { + log.Fatalf("truncate pidfile (%s): %s", pidfile, err) + } + _, err = fmt.Fprint(f, os.Getpid()) + if err != nil { + log.Fatalf("write pidfile (%s): %s", pidfile, err) + } + err = f.Sync() + if err != nil { + log.Fatalf("sync pidfile (%s): %s", pidfile, err) + } + defer f.Close() + defer os.Remove(pidfile) + } + + if len(volumes) == 0 { + if (&unixVolumeAdder{&volumes}).Discover() == 0 { + log.Fatal("No volumes found.") } } - if len(goodvols) == 0 { - log.Fatal("could not find any keep volumes") + for _, v := range volumes { + log.Printf("Using volume %v (writable=%v)", v, v.Writable()) } // Initialize data manager token and permission key. // If these tokens are specified but cannot be read, // raise a fatal error. - if data_manager_token_file != "" { - if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil { - data_manager_token = strings.TrimSpace(string(buf)) + if dataManagerTokenFile != "" { + if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil { + dataManagerToken = strings.TrimSpace(string(buf)) } else { log.Fatalf("reading data manager token: %s\n", err) } } - if permission_key_file != "" { - if buf, err := ioutil.ReadFile(permission_key_file); err == nil { + + if neverDelete != true { + log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " + + "been extensively tested. You should leave this option alone unless you can afford to lose data.") + } + + if blobSigningKeyFile != "" { + if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil { PermissionSecret = bytes.TrimSpace(buf) } else { log.Fatalf("reading permission key: %s\n", err) } } - // Initialize permission TTL - permission_ttl = time.Duration(permission_ttl_sec) * time.Second + blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second - // If --enforce-permissions is true, we must have a permission key - // to continue. if PermissionSecret == nil { - if enforce_permissions { - log.Fatal("--enforce-permissions requires a permission key") + if enforcePermissions { + log.Fatal("-enforce-permissions requires a permission key") } else { log.Println("Running without a PermissionSecret. Block locators " + "returned by this server will not be signed, and will be rejected " + "by a server that enforces permissions.") - log.Println("To fix this, run Keep with --permission-key-file= " + - "to define the location of a file containing the permission key.") + log.Println("To fix this, use the -blob-signing-key-file flag " + + "to specify the file containing the permission key.") } } // Start a round-robin VolumeManager with the volumes we have found. - KeepVM = MakeRRVolumeManager(goodvols) + KeepVM = MakeRRVolumeManager(volumes) // Tell the built-in HTTP server to direct all requests to the REST router. loggingRouter := MakeLoggingRESTRouter() @@ -276,16 +297,20 @@ func main() { } // Initialize Pull queue and worker - keepClient := keepclient.KeepClient{ + keepClient := &keepclient.KeepClient{ Arvados: nil, Want_replicas: 1, - Using_proxy: true, Client: &http.Client{}, } + // Initialize the pullq and worker pullq = NewWorkQueue() go RunPullWorker(pullq, keepClient) + // Initialize the trashq and worker + trashq = NewWorkQueue() + go RunTrashWorker(trashq) + // Shut down the server gracefully (by closing the listener) // if SIGTERM is received. term := make(chan os.Signal, 1) @@ -295,24 +320,9 @@ func main() { listener.Close() }(term) signal.Notify(term, syscall.SIGTERM) + signal.Notify(term, syscall.SIGINT) - if pidfile != "" { - f, err := os.Create(pidfile) - if err == nil { - fmt.Fprint(f, os.Getpid()) - f.Close() - } else { - log.Printf("Error writing pid file (%s): %s", pidfile, err.Error()) - } - } - - // Start listening for requests. + log.Println("listening at", listen) srv := &http.Server{Addr: listen} srv.Serve(listener) - - log.Println("shutting down") - - if pidfile != "" { - os.Remove(pidfile) - } }