9 "git.curoverse.com/arvados.git/sdk/go/keepclient"
21 // ======================
22 // Configuration settings
24 // TODO(twp): make all of these configurable via command line flags
25 // and/or configuration file settings.
27 // Default TCP address on which to listen for requests.
28 // Initialized by the --listen flag.
29 const DEFAULT_ADDR = ":25107"
31 // A Keep "block" is 64MB.
32 const BLOCKSIZE = 64 * 1024 * 1024
34 // A Keep volume must have at least MIN_FREE_KILOBYTES available
35 // in order to permit writes.
36 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
38 var PROC_MOUNTS = "/proc/mounts"
40 // enforce_permissions controls whether permission signatures
41 // should be enforced (affecting GET and DELETE requests).
42 // Initialized by the -enforce-permissions flag.
43 var enforce_permissions bool
45 // blob_signature_ttl is the time duration for which new permission
46 // signatures (returned by PUT requests) will be valid.
47 // Initialized by the -permission-ttl flag.
48 var blob_signature_ttl time.Duration
50 // data_manager_token represents the API token used by the
51 // Data Manager, and is required on certain privileged operations.
52 // Initialized by the -data-manager-token-file flag.
53 var data_manager_token string
55 // never_delete can be used to prevent the DELETE handler from
56 // actually deleting anything.
57 var never_delete = true
65 type KeepError struct {
71 BadRequestError = &KeepError{400, "Bad Request"}
72 UnauthorizedError = &KeepError{401, "Unauthorized"}
73 CollisionError = &KeepError{500, "Collision"}
74 RequestHashError = &KeepError{422, "Hash mismatch in request"}
75 PermissionError = &KeepError{403, "Forbidden"}
76 DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
77 ExpiredError = &KeepError{401, "Expired permission signature"}
78 NotFoundError = &KeepError{404, "Not Found"}
79 GenericError = &KeepError{500, "Fail"}
80 FullError = &KeepError{503, "Full"}
81 SizeRequiredError = &KeepError{411, "Missing Content-Length"}
82 TooLongError = &KeepError{413, "Block is too large"}
83 MethodDisabledError = &KeepError{405, "Method disabled"}
86 func (e *KeepError) Error() string {
90 // ========================
91 // Internal data structures
93 // These global variables are used by multiple parts of the
94 // program. They are good candidates for moving into their own
97 // The Keep VolumeManager maintains a list of available volumes.
98 // Initialized by the --volumes flag (or by FindKeepVolumes).
99 var KeepVM VolumeManager
101 // The pull list manager and trash queue are threadsafe queues which
102 // support atomic update operations. The PullHandler and TrashHandler
103 // store results from Data Manager /pull and /trash requests here.
105 // See the Keep and Data Manager design documents for more details:
106 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
107 // https://arvados.org/projects/arvados/wiki/Data_Manager_Design_Doc
110 var trashq *WorkQueue
117 type volumeSet []Volume
119 func (vs *volumeSet) Set(value string) error {
120 if dirs := strings.Split(value, ","); len(dirs) > 1 {
121 log.Print("DEPRECATED: using comma-separated volume list.")
122 for _, dir := range dirs {
123 if err := vs.Set(dir); err != nil {
129 if len(value) == 0 || value[0] != '/' {
130 return errors.New("Invalid volume: must begin with '/'.")
132 if _, err := os.Stat(value); err != nil {
135 *vs = append(*vs, &UnixVolume{
137 serialize: flagSerializeIO,
138 readonly: flagReadonly,
143 func (vs *volumeSet) String() string {
145 for i, v := range *vs {
154 // Discover adds a volume for every directory named "keep" that is
155 // located at the top level of a device- or tmpfs-backed mount point
156 // other than "/". It returns the number of volumes added.
157 func (vs *volumeSet) Discover() int {
159 f, err := os.Open(PROC_MOUNTS)
161 log.Fatalf("opening %s: %s", PROC_MOUNTS, err)
163 scanner := bufio.NewScanner(f)
165 args := strings.Fields(scanner.Text())
166 if err := scanner.Err(); err != nil {
167 log.Fatalf("reading %s: %s", PROC_MOUNTS, err)
169 dev, mount := args[0], args[1]
173 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
176 keepdir := mount + "/keep"
177 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
180 // Set the -readonly flag (but only for this volume)
181 // if the filesystem is mounted readonly.
182 flagReadonlyWas := flagReadonly
183 for _, fsopt := range strings.Split(args[3], ",") {
193 flagReadonly = flagReadonlyWas
199 // TODO(twp): continue moving as much code as possible out of main
200 // so it can be effectively tested. Esp. handling and postprocessing
201 // of command line flags (identifying Keep volumes and initializing
202 // permission arguments).
205 log.Println("keepstore starting, pid", os.Getpid())
206 defer log.Println("keepstore exiting, pid", os.Getpid())
209 data_manager_token_file string
211 blob_signing_key_file string
212 permission_ttl_sec int
217 &data_manager_token_file,
218 "data-manager-token-file",
220 "File with the API token used by the Data Manager. All DELETE "+
221 "requests or GET /index requests must carry this token.")
223 &enforce_permissions,
224 "enforce-permissions",
226 "Enforce permission signatures on requests.")
231 "Listening address, in the form \"host:port\". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.")
236 "If set, nothing will be deleted. HTTP 405 will be returned "+
237 "for valid DELETE requests.")
239 &blob_signing_key_file,
240 "permission-key-file",
242 "Synonym for -blob-signing-key-file.")
244 &blob_signing_key_file,
245 "blob-signing-key-file",
247 "File containing the secret key for generating and verifying "+
248 "blob permission signatures.")
253 "Synonym for -blob-signature-ttl.")
256 "blob-signature-ttl",
257 int(time.Duration(2*7*24*time.Hour).Seconds()),
258 "Lifetime of blob permission signatures. "+
259 "See services/api/config/application.default.yml.")
264 "Serialize read and write operations on the following volumes.")
269 "Do not write, delete, or touch anything on the following volumes.")
273 "Deprecated synonym for -volume.")
277 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
282 "Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.")
287 fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BLOCKSIZE>>20))
291 if never_delete != true {
292 log.Fatal("never_delete must be true, see #6221")
296 log.Fatal("-max-buffers must be greater than zero.")
298 bufs = newBufferPool(maxBuffers, BLOCKSIZE)
301 f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
303 log.Fatalf("open pidfile (%s): %s", pidfile, err)
305 err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
307 log.Fatalf("flock pidfile (%s): %s", pidfile, err)
311 log.Fatalf("truncate pidfile (%s): %s", pidfile, err)
313 _, err = fmt.Fprint(f, os.Getpid())
315 log.Fatalf("write pidfile (%s): %s", pidfile, err)
319 log.Fatalf("sync pidfile (%s): %s", pidfile, err)
322 defer os.Remove(pidfile)
325 if len(volumes) == 0 {
326 if volumes.Discover() == 0 {
327 log.Fatal("No volumes found.")
331 for _, v := range volumes {
332 log.Printf("Using volume %v (writable=%v)", v, v.Writable())
335 // Initialize data manager token and permission key.
336 // If these tokens are specified but cannot be read,
337 // raise a fatal error.
338 if data_manager_token_file != "" {
339 if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
340 data_manager_token = strings.TrimSpace(string(buf))
342 log.Fatalf("reading data manager token: %s\n", err)
345 if blob_signing_key_file != "" {
346 if buf, err := ioutil.ReadFile(blob_signing_key_file); err == nil {
347 PermissionSecret = bytes.TrimSpace(buf)
349 log.Fatalf("reading permission key: %s\n", err)
353 blob_signature_ttl = time.Duration(permission_ttl_sec) * time.Second
355 if PermissionSecret == nil {
356 if enforce_permissions {
357 log.Fatal("-enforce-permissions requires a permission key")
359 log.Println("Running without a PermissionSecret. Block locators " +
360 "returned by this server will not be signed, and will be rejected " +
361 "by a server that enforces permissions.")
362 log.Println("To fix this, use the -blob-signing-key-file flag " +
363 "to specify the file containing the permission key.")
367 // Start a round-robin VolumeManager with the volumes we have found.
368 KeepVM = MakeRRVolumeManager(volumes)
370 // Tell the built-in HTTP server to direct all requests to the REST router.
371 loggingRouter := MakeLoggingRESTRouter()
372 http.HandleFunc("/", func(resp http.ResponseWriter, req *http.Request) {
373 loggingRouter.ServeHTTP(resp, req)
376 // Set up a TCP listener.
377 listener, err := net.Listen("tcp", listen)
382 // Initialize Pull queue and worker
383 keepClient := &keepclient.KeepClient{
387 Client: &http.Client{},
390 // Initialize the pullq and worker
391 pullq = NewWorkQueue()
392 go RunPullWorker(pullq, keepClient)
394 // Initialize the trashq and worker
395 trashq = NewWorkQueue()
396 go RunTrashWorker(trashq)
398 // Shut down the server gracefully (by closing the listener)
399 // if SIGTERM is received.
400 term := make(chan os.Signal, 1)
401 go func(sig <-chan os.Signal) {
403 log.Println("caught signal:", s)
406 signal.Notify(term, syscall.SIGTERM)
407 signal.Notify(term, syscall.SIGINT)
409 log.Println("listening at", listen)
410 srv := &http.Server{Addr: listen}