X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/14a27997ba2a94c5a7250ddde4519d5f68b6eda0..d53271e587b7bdbfe37b8ae8eaf890dd69c2796b:/services/keepstore/keepstore.go?ds=sidebyside diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index c0b8aec797..3e360e1799 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -14,6 +14,7 @@ import ( "os" "os/signal" "strings" + "sync" "syscall" "time" ) @@ -26,41 +27,45 @@ import ( // Default TCP address on which to listen for requests. // Initialized by the --listen flag. -const DEFAULT_ADDR = ":25107" +const DefaultAddr = ":25107" // A Keep "block" is 64MB. -const BLOCKSIZE = 64 * 1024 * 1024 +const BlockSize = 64 * 1024 * 1024 -// A Keep volume must have at least MIN_FREE_KILOBYTES available +// A Keep volume must have at least MinFreeKilobytes available // in order to permit writes. -const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024 +const MinFreeKilobytes = BlockSize / 1024 -var PROC_MOUNTS = "/proc/mounts" +// Until #6221 is resolved, never_delete must be true. +// However, allow it to be false in testing with TestDataManagerToken +const TestDataManagerToken = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h" -// enforce_permissions controls whether permission signatures +// ProcMounts /proc/mounts +var ProcMounts = "/proc/mounts" + +// enforcePermissions controls whether permission signatures // should be enforced (affecting GET and DELETE requests). // Initialized by the -enforce-permissions flag. -var enforce_permissions bool +var enforcePermissions bool -// blob_signature_ttl is the time duration for which new permission +// blobSignatureTTL is the time duration for which new permission // signatures (returned by PUT requests) will be valid. // Initialized by the -permission-ttl flag. -var blob_signature_ttl time.Duration +var blobSignatureTTL time.Duration -// data_manager_token represents the API token used by the +// dataManagerToken represents the API token used by the // Data Manager, and is required on certain privileged operations. // Initialized by the -data-manager-token-file flag. -var data_manager_token string +var dataManagerToken string -// never_delete can be used to prevent the DELETE handler from +// neverDelete can be used to prevent the DELETE handler from // actually deleting anything. -var never_delete = false +var neverDelete = true var maxBuffers = 128 var bufs *bufferPool -// ========== -// Error types. +// KeepError types. // type KeepError struct { HTTPCode int @@ -132,10 +137,14 @@ func (vs *volumeSet) Set(value string) error { if _, err := os.Stat(value); err != nil { return err } + var locker sync.Locker + if flagSerializeIO { + locker = &sync.Mutex{} + } *vs = append(*vs, &UnixVolume{ - root: value, - serialize: flagSerializeIO, - readonly: flagReadonly, + root: value, + locker: locker, + readonly: flagReadonly, }) return nil } @@ -156,15 +165,15 @@ func (vs *volumeSet) String() string { // other than "/". It returns the number of volumes added. func (vs *volumeSet) Discover() int { added := 0 - f, err := os.Open(PROC_MOUNTS) + f, err := os.Open(ProcMounts) if err != nil { - log.Fatalf("opening %s: %s", PROC_MOUNTS, err) + log.Fatalf("opening %s: %s", ProcMounts, err) } scanner := bufio.NewScanner(f) for scanner.Scan() { args := strings.Fields(scanner.Text()) if err := scanner.Err(); err != nil { - log.Fatalf("reading %s: %s", PROC_MOUNTS, err) + log.Fatalf("reading %s: %s", ProcMounts, err) } dev, mount := args[0], args[1] if mount == "/" { @@ -202,56 +211,57 @@ func (vs *volumeSet) Discover() int { // permission arguments). func main() { - log.Println("Keep started: pid", os.Getpid()) + log.Println("keepstore starting, pid", os.Getpid()) + defer log.Println("keepstore exiting, pid", os.Getpid()) var ( - data_manager_token_file string - listen string - blob_signing_key_file string - permission_ttl_sec int - volumes volumeSet - pidfile string + dataManagerTokenFile string + listen string + blobSigningKeyFile string + permissionTTLSec int + volumes volumeSet + pidfile string ) flag.StringVar( - &data_manager_token_file, + &dataManagerTokenFile, "data-manager-token-file", "", "File with the API token used by the Data Manager. All DELETE "+ "requests or GET /index requests must carry this token.") flag.BoolVar( - &enforce_permissions, + &enforcePermissions, "enforce-permissions", false, "Enforce permission signatures on requests.") flag.StringVar( &listen, "listen", - DEFAULT_ADDR, + DefaultAddr, "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.") flag.BoolVar( - &never_delete, + &neverDelete, "never-delete", - false, + true, "If set, nothing will be deleted. HTTP 405 will be returned "+ "for valid DELETE requests.") flag.StringVar( - &blob_signing_key_file, + &blobSigningKeyFile, "permission-key-file", "", "Synonym for -blob-signing-key-file.") flag.StringVar( - &blob_signing_key_file, + &blobSigningKeyFile, "blob-signing-key-file", "", "File containing the secret key for generating and verifying "+ "blob permission signatures.") flag.IntVar( - &permission_ttl_sec, + &permissionTTLSec, "permission-ttl", 0, "Synonym for -blob-signature-ttl.") flag.IntVar( - &permission_ttl_sec, + &permissionTTLSec, "blob-signature-ttl", int(time.Duration(2*7*24*time.Hour).Seconds()), "Lifetime of blob permission signatures. "+ @@ -278,19 +288,44 @@ func main() { &pidfile, "pid", "", - "Path to write pid file") + "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.") flag.IntVar( &maxBuffers, "max-buffers", maxBuffers, - fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BLOCKSIZE>>20)) + fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20)) flag.Parse() if maxBuffers < 0 { log.Fatal("-max-buffers must be greater than zero.") } - bufs = newBufferPool(maxBuffers, BLOCKSIZE) + bufs = newBufferPool(maxBuffers, BlockSize) + + if pidfile != "" { + f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777) + if err != nil { + log.Fatalf("open pidfile (%s): %s", pidfile, err) + } + err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) + if err != nil { + log.Fatalf("flock pidfile (%s): %s", pidfile, err) + } + err = f.Truncate(0) + if err != nil { + log.Fatalf("truncate pidfile (%s): %s", pidfile, err) + } + _, err = fmt.Fprint(f, os.Getpid()) + if err != nil { + log.Fatalf("write pidfile (%s): %s", pidfile, err) + } + err = f.Sync() + if err != nil { + log.Fatalf("sync pidfile (%s): %s", pidfile, err) + } + defer f.Close() + defer os.Remove(pidfile) + } if len(volumes) == 0 { if volumes.Discover() == 0 { @@ -305,31 +340,36 @@ func main() { // Initialize data manager token and permission key. // If these tokens are specified but cannot be read, // raise a fatal error. - if data_manager_token_file != "" { - if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil { - data_manager_token = strings.TrimSpace(string(buf)) + if dataManagerTokenFile != "" { + if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil { + dataManagerToken = strings.TrimSpace(string(buf)) } else { log.Fatalf("reading data manager token: %s\n", err) } } - if blob_signing_key_file != "" { - if buf, err := ioutil.ReadFile(blob_signing_key_file); err == nil { + + if neverDelete != true && dataManagerToken != TestDataManagerToken { + log.Fatal("never_delete must be true, see #6221") + } + + if blobSigningKeyFile != "" { + if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil { PermissionSecret = bytes.TrimSpace(buf) } else { log.Fatalf("reading permission key: %s\n", err) } } - blob_signature_ttl = time.Duration(permission_ttl_sec) * time.Second + blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second if PermissionSecret == nil { - if enforce_permissions { + if enforcePermissions { log.Fatal("-enforce-permissions requires a permission key") } else { log.Println("Running without a PermissionSecret. Block locators " + "returned by this server will not be signed, and will be rejected " + "by a server that enforces permissions.") - log.Println("To fix this, use the -permission-key-file flag " + + log.Println("To fix this, use the -blob-signing-key-file flag " + "to specify the file containing the permission key.") } } @@ -374,24 +414,9 @@ func main() { listener.Close() }(term) signal.Notify(term, syscall.SIGTERM) + signal.Notify(term, syscall.SIGINT) - if pidfile != "" { - f, err := os.Create(pidfile) - if err == nil { - fmt.Fprint(f, os.Getpid()) - f.Close() - } else { - log.Printf("Error writing pid file (%s): %s", pidfile, err.Error()) - } - } - - // Start listening for requests. + log.Println("listening at", listen) srv := &http.Server{Addr: listen} srv.Serve(listener) - - log.Println("shutting down") - - if pidfile != "" { - os.Remove(pidfile) - } }