// Default TCP address on which to listen for requests.
// Initialized by the --listen flag.
-const DEFAULT_ADDR = ":25107"
+const DefaultAddr = ":25107"
// A Keep "block" is 64MB.
-const BLOCKSIZE = 64 * 1024 * 1024
+const BlockSize = 64 * 1024 * 1024
-// A Keep volume must have at least MIN_FREE_KILOBYTES available
+// A Keep volume must have at least MinFreeKilobytes available
// in order to permit writes.
-const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
+const MinFreeKilobytes = BlockSize / 1024
-var PROC_MOUNTS = "/proc/mounts"
+// ProcMounts /proc/mounts
+var ProcMounts = "/proc/mounts"
-// enforce_permissions controls whether permission signatures
+// enforcePermissions controls whether permission signatures
// should be enforced (affecting GET and DELETE requests).
-// Initialized by the --enforce-permissions flag.
-var enforce_permissions bool
+// Initialized by the -enforce-permissions flag.
+var enforcePermissions bool
-// permission_ttl is the time duration for which new permission
+// blobSignatureTTL is the time duration for which new permission
// signatures (returned by PUT requests) will be valid.
-// Initialized by the --permission-ttl flag.
-var permission_ttl time.Duration
+// Initialized by the -permission-ttl flag.
+var blobSignatureTTL time.Duration
-// data_manager_token represents the API token used by the
+// dataManagerToken represents the API token used by the
// Data Manager, and is required on certain privileged operations.
-// Initialized by the --data-manager-token-file flag.
-var data_manager_token string
+// Initialized by the -data-manager-token-file flag.
+var dataManagerToken string
-// never_delete can be used to prevent the DELETE handler from
+// neverDelete can be used to prevent the DELETE handler from
// actually deleting anything.
-var never_delete = false
+var neverDelete = true
-// ==========
-// Error types.
+var maxBuffers = 128
+var bufs *bufferPool
+
+// KeepError types.
//
type KeepError struct {
HTTPCode int
NotFoundError = &KeepError{404, "Not Found"}
GenericError = &KeepError{500, "Fail"}
FullError = &KeepError{503, "Full"}
- TooLongError = &KeepError{504, "Timeout"}
+ SizeRequiredError = &KeepError{411, "Missing Content-Length"}
+ TooLongError = &KeepError{413, "Block is too large"}
MethodDisabledError = &KeepError{405, "Method disabled"}
)
var pullq *WorkQueue
var trashq *WorkQueue
+type volumeSet []Volume
+
+var (
+ flagSerializeIO bool
+ flagReadonly bool
+ volumes volumeSet
+)
+
+func (vs *volumeSet) String() string {
+ return fmt.Sprintf("%+v", (*vs)[:])
+}
+
// TODO(twp): continue moving as much code as possible out of main
// so it can be effectively tested. Esp. handling and postprocessing
// of command line flags (identifying Keep volumes and initializing
// permission arguments).
func main() {
- log.Println("Keep started: pid", os.Getpid())
-
- // Parse command-line flags:
- //
- // -listen=ipaddr:port
- // Interface on which to listen for requests. Use :port without
- // an ipaddr to listen on all network interfaces.
- // Examples:
- // -listen=127.0.0.1:4949
- // -listen=10.0.1.24:8000
- // -listen=:25107 (to listen to port 25107 on all interfaces)
- //
- // -volumes
- // A comma-separated list of directories to use as Keep volumes.
- // Example:
- // -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
- //
- // If -volumes is empty or is not present, Keep will select volumes
- // by looking at currently mounted filesystems for /keep top-level
- // directories.
+ log.Println("keepstore starting, pid", os.Getpid())
+ defer log.Println("keepstore exiting, pid", os.Getpid())
var (
- data_manager_token_file string
- listen string
- permission_key_file string
- permission_ttl_sec int
- serialize_io bool
- volumearg string
- pidfile string
+ dataManagerTokenFile string
+ listen string
+ blobSigningKeyFile string
+ permissionTTLSec int
+ pidfile string
)
flag.StringVar(
- &data_manager_token_file,
+ &dataManagerTokenFile,
"data-manager-token-file",
"",
"File with the API token used by the Data Manager. All DELETE "+
"requests or GET /index requests must carry this token.")
flag.BoolVar(
- &enforce_permissions,
+ &enforcePermissions,
"enforce-permissions",
false,
"Enforce permission signatures on requests.")
flag.StringVar(
&listen,
"listen",
- DEFAULT_ADDR,
- "Interface on which to listen for requests, in the format "+
- "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
- "to listen on all network interfaces.")
+ DefaultAddr,
+ "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
flag.BoolVar(
- &never_delete,
+ &neverDelete,
"never-delete",
- false,
- "If set, nothing will be deleted. HTTP 405 will be returned "+
- "for valid DELETE requests.")
+ true,
+ "If true, nothing will be deleted. "+
+ "Warning: the relevant features in keepstore and data manager have not been extensively tested. "+
+ "You should leave this option alone unless you can afford to lose data.")
flag.StringVar(
- &permission_key_file,
+ &blobSigningKeyFile,
"permission-key-file",
"",
+ "Synonym for -blob-signing-key-file.")
+ flag.StringVar(
+ &blobSigningKeyFile,
+ "blob-signing-key-file",
+ "",
"File containing the secret key for generating and verifying "+
- "permission signatures.")
+ "blob permission signatures.")
flag.IntVar(
- &permission_ttl_sec,
+ &permissionTTLSec,
"permission-ttl",
- 1209600,
- "Expiration time (in seconds) for newly generated permission "+
- "signatures.")
+ 0,
+ "Synonym for -blob-signature-ttl.")
+ flag.IntVar(
+ &permissionTTLSec,
+ "blob-signature-ttl",
+ int(time.Duration(2*7*24*time.Hour).Seconds()),
+ "Lifetime of blob permission signatures. "+
+ "See services/api/config/application.default.yml.")
flag.BoolVar(
- &serialize_io,
+ &flagSerializeIO,
"serialize",
false,
- "If set, all read and write operations on local Keep volumes will "+
- "be serialized.")
- flag.StringVar(
- &volumearg,
- "volumes",
- "",
- "Comma-separated list of directories to use for Keep volumes, "+
- "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
- "supplied, Keep will scan mounted filesystems for volumes "+
- "with a /keep top-level directory.")
-
+ "Serialize read and write operations on the following volumes.")
+ flag.BoolVar(
+ &flagReadonly,
+ "readonly",
+ false,
+ "Do not write, delete, or touch anything on the following volumes.")
flag.StringVar(
&pidfile,
"pid",
"",
- "Path to write pid file")
+ "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
+ flag.IntVar(
+ &maxBuffers,
+ "max-buffers",
+ maxBuffers,
+ fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
flag.Parse()
- // Look for local keep volumes.
- var keepvols []string
- if volumearg == "" {
- // TODO(twp): decide whether this is desirable default behavior.
- // In production we may want to require the admin to specify
- // Keep volumes explicitly.
- keepvols = FindKeepVolumes()
- } else {
- keepvols = strings.Split(volumearg, ",")
+ if maxBuffers < 0 {
+ log.Fatal("-max-buffers must be greater than zero.")
}
+ bufs = newBufferPool(maxBuffers, BlockSize)
- // Check that the specified volumes actually exist.
- var goodvols []Volume = nil
- for _, v := range keepvols {
- if _, err := os.Stat(v); err == nil {
- log.Println("adding Keep volume:", v)
- newvol := MakeUnixVolume(v, serialize_io)
- goodvols = append(goodvols, &newvol)
- } else {
- log.Printf("bad Keep volume: %s\n", err)
+ if pidfile != "" {
+ f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
+ if err != nil {
+ log.Fatalf("open pidfile (%s): %s", pidfile, err)
+ }
+ err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
+ if err != nil {
+ log.Fatalf("flock pidfile (%s): %s", pidfile, err)
+ }
+ err = f.Truncate(0)
+ if err != nil {
+ log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
+ }
+ _, err = fmt.Fprint(f, os.Getpid())
+ if err != nil {
+ log.Fatalf("write pidfile (%s): %s", pidfile, err)
+ }
+ err = f.Sync()
+ if err != nil {
+ log.Fatalf("sync pidfile (%s): %s", pidfile, err)
+ }
+ defer f.Close()
+ defer os.Remove(pidfile)
+ }
+
+ if len(volumes) == 0 {
+ if (&unixVolumeAdder{&volumes}).Discover() == 0 {
+ log.Fatal("No volumes found.")
}
}
- if len(goodvols) == 0 {
- log.Fatal("could not find any keep volumes")
+ for _, v := range volumes {
+ log.Printf("Using volume %v (writable=%v)", v, v.Writable())
}
// Initialize data manager token and permission key.
// If these tokens are specified but cannot be read,
// raise a fatal error.
- if data_manager_token_file != "" {
- if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
- data_manager_token = strings.TrimSpace(string(buf))
+ if dataManagerTokenFile != "" {
+ if buf, err := ioutil.ReadFile(dataManagerTokenFile); err == nil {
+ dataManagerToken = strings.TrimSpace(string(buf))
} else {
log.Fatalf("reading data manager token: %s\n", err)
}
}
- if permission_key_file != "" {
- if buf, err := ioutil.ReadFile(permission_key_file); err == nil {
+
+ if neverDelete != true {
+ log.Print("never-delete is not set. Warning: the relevant features in keepstore and data manager have not " +
+ "been extensively tested. You should leave this option alone unless you can afford to lose data.")
+ }
+
+ if blobSigningKeyFile != "" {
+ if buf, err := ioutil.ReadFile(blobSigningKeyFile); err == nil {
PermissionSecret = bytes.TrimSpace(buf)
} else {
log.Fatalf("reading permission key: %s\n", err)
}
}
- // Initialize permission TTL
- permission_ttl = time.Duration(permission_ttl_sec) * time.Second
+ blobSignatureTTL = time.Duration(permissionTTLSec) * time.Second
- // If --enforce-permissions is true, we must have a permission key
- // to continue.
if PermissionSecret == nil {
- if enforce_permissions {
- log.Fatal("--enforce-permissions requires a permission key")
+ if enforcePermissions {
+ log.Fatal("-enforce-permissions requires a permission key")
} else {
log.Println("Running without a PermissionSecret. Block locators " +
"returned by this server will not be signed, and will be rejected " +
"by a server that enforces permissions.")
- log.Println("To fix this, run Keep with --permission-key-file=<path> " +
- "to define the location of a file containing the permission key.")
+ log.Println("To fix this, use the -blob-signing-key-file flag " +
+ "to specify the file containing the permission key.")
}
}
// Start a round-robin VolumeManager with the volumes we have found.
- KeepVM = MakeRRVolumeManager(goodvols)
+ KeepVM = MakeRRVolumeManager(volumes)
// Tell the built-in HTTP server to direct all requests to the REST router.
loggingRouter := MakeLoggingRESTRouter()
}
// Initialize Pull queue and worker
- keepClient := keepclient.KeepClient{
+ keepClient := &keepclient.KeepClient{
Arvados: nil,
Want_replicas: 1,
Using_proxy: true,
listener.Close()
}(term)
signal.Notify(term, syscall.SIGTERM)
+ signal.Notify(term, syscall.SIGINT)
- if pidfile != "" {
- f, err := os.Create(pidfile)
- if err == nil {
- fmt.Fprint(f, os.Getpid())
- f.Close()
- } else {
- log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
- }
- }
-
- // Start listening for requests.
+ log.Println("listening at", listen)
srv := &http.Server{Addr: listen}
srv.Serve(listener)
-
- log.Println("shutting down")
-
- if pidfile != "" {
- os.Remove(pidfile)
- }
}