3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
17 "github.com/gorilla/mux"
30 // MakeRESTRouter returns a new mux.Router that forwards all Keep
31 // requests to the appropriate handlers.
33 func MakeRESTRouter() *mux.Router {
34 rest := mux.NewRouter()
37 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
39 `/{hash:[0-9a-f]{32}}+{hints}`,
40 GetBlockHandler).Methods("GET", "HEAD")
42 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
43 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
45 // For IndexHandler we support:
46 // /index - returns all locators
47 // /index/{prefix} - returns all locators that begin with {prefix}
48 // {prefix} is a string of hexadecimal digits between 0 and 32 digits.
49 // If {prefix} is the empty string, return an index of all locators
50 // (so /index and /index/ behave identically)
51 // A client may supply a full 32-digit locator string, in which
52 // case the server will return an index with either zero or one
53 // entries. This usage allows a client to check whether a block is
54 // present, and its size and upload time, without retrieving the
57 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
59 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
60 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
62 // The PullHandler and TrashHandler process "PUT /pull" and "PUT
63 // /trash" requests from Data Manager. These requests instruct
64 // Keep to replicate or delete blocks; see
65 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
68 // Each handler parses the JSON list of block management requests
69 // in the message body, and replaces any existing pull queue or
70 // trash queue with their contentes.
72 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
73 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
75 // Any request which does not match any of these routes gets
77 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
82 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
83 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
86 // FindKeepVolumes scans all mounted volumes on the system for Keep
87 // volumes, and returns a list of matching paths.
89 // A device is assumed to be a Keep volume if it is a normal or tmpfs
90 // volume and has a "/keep" directory directly underneath the mount
93 func FindKeepVolumes() []string {
94 vols := make([]string, 0)
96 if f, err := os.Open(PROC_MOUNTS); err != nil {
97 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
99 scanner := bufio.NewScanner(f)
101 args := strings.Fields(scanner.Text())
102 dev, mount := args[0], args[1]
104 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
105 keep := mount + "/keep"
106 if st, err := os.Stat(keep); err == nil && st.IsDir() {
107 vols = append(vols, keep)
111 if err := scanner.Err(); err != nil {
118 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
119 hash := mux.Vars(req)["hash"]
121 hints := mux.Vars(req)["hints"]
123 // Parse the locator string and hints from the request.
124 // TODO(twp): implement a Locator type.
125 var signature, timestamp string
127 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
128 for _, hint := range strings.Split(hints, "+") {
129 if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
130 // Server ignores size hints
131 } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
134 } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
135 // Any unknown hint that starts with an uppercase letter is
136 // presumed to be valid and ignored, to permit forward compatibility.
138 // Unknown format; not a valid locator.
139 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
145 // If permission checking is in effect, verify this
146 // request's permission signature.
147 if enforce_permissions {
148 if signature == "" || timestamp == "" {
149 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
151 } else if IsExpired(timestamp) {
152 http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
155 req_locator := req.URL.Path[1:] // strip leading slash
156 if !VerifySignature(req_locator, GetApiToken(req)) {
157 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
163 block, err := GetBlock(hash, false)
165 // Garbage collect after each GET. Fixes #2865.
166 // TODO(twp): review Keep memory usage and see if there's
167 // a better way to do this than blindly garbage collecting
168 // after every block.
172 // This type assertion is safe because the only errors
173 // GetBlock can return are DiskHashError or NotFoundError.
174 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
178 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
180 // If/when we support HTTP Range header (#3734), then Content-Length
181 // could be smaller than Block size
182 resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
184 _, err = resp.Write(block)
189 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
190 // Garbage collect after each PUT. Fixes #2865.
191 // See also GetBlockHandler.
194 hash := mux.Vars(req)["hash"]
196 // Read the block data to be stored.
197 // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
199 if req.ContentLength > BLOCKSIZE {
200 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
204 buf := make([]byte, req.ContentLength)
205 nread, err := io.ReadFull(req.Body, buf)
207 http.Error(resp, err.Error(), 500)
208 } else if int64(nread) < req.ContentLength {
209 http.Error(resp, "request truncated", 500)
211 if err := PutBlock(buf, hash); err == nil {
212 // Success; add a size hint, sign the locator if
213 // possible, and return it to the client.
214 return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
215 api_token := GetApiToken(req)
216 if PermissionSecret != nil && api_token != "" {
217 expiry := time.Now().Add(permission_ttl)
218 return_hash = SignLocator(return_hash, api_token, expiry)
220 resp.Write([]byte(return_hash + "\n"))
222 ke := err.(*KeepError)
223 http.Error(resp, ke.Error(), ke.HTTPCode)
230 // A HandleFunc to address /index and /index/{prefix} requests.
232 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
233 // Reject unauthorized requests.
234 if !IsDataManagerToken(GetApiToken(req)) {
235 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
239 prefix := mux.Vars(req)["prefix"]
242 for _, vol := range KeepVM.Volumes() {
243 index = index + vol.Index(prefix)
245 resp.Write([]byte(index))
249 // Responds to /status.json requests with the current node status,
250 // described in a JSON structure.
252 // The data given in a status.json response includes:
253 // volumes - a list of Keep volumes currently in use by this server
254 // each volume is an object with the following fields:
256 // * device_num (an integer identifying the underlying filesystem)
260 type VolumeStatus struct {
261 MountPoint string `json:"mount_point"`
262 DeviceNum uint64 `json:"device_num"`
263 BytesFree uint64 `json:"bytes_free"`
264 BytesUsed uint64 `json:"bytes_used"`
267 type NodeStatus struct {
268 Volumes []*VolumeStatus `json:"volumes"`
271 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
272 st := GetNodeStatus()
273 if jstat, err := json.Marshal(st); err == nil {
276 log.Printf("json.Marshal: %s\n", err)
277 log.Printf("NodeStatus = %v\n", st)
278 http.Error(resp, err.Error(), 500)
283 // Returns a NodeStatus struct describing this Keep
284 // node's current status.
286 func GetNodeStatus() *NodeStatus {
287 st := new(NodeStatus)
289 st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
290 for i, vol := range KeepVM.Volumes() {
291 st.Volumes[i] = vol.Status()
297 // Returns a VolumeStatus describing the requested volume.
299 func GetVolumeStatus(volume string) *VolumeStatus {
300 var fs syscall.Statfs_t
303 if fi, err := os.Stat(volume); err == nil {
304 devnum = fi.Sys().(*syscall.Stat_t).Dev
306 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
310 err := syscall.Statfs(volume, &fs)
312 log.Printf("GetVolumeStatus: statfs: %s\n", err)
315 // These calculations match the way df calculates disk usage:
316 // "free" space is measured by fs.Bavail, but "used" space
317 // uses fs.Blocks - fs.Bfree.
318 free := fs.Bavail * uint64(fs.Bsize)
319 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
320 return &VolumeStatus{volume, devnum, free, used}
323 // DeleteHandler processes DELETE requests.
325 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
326 // from all connected volumes.
328 // Only the Data Manager, or an Arvados admin with scope "all", are
329 // allowed to issue DELETE requests. If a DELETE request is not
330 // authenticated or is issued by a non-admin user, the server returns
331 // a PermissionError.
333 // Upon receiving a valid request from an authorized user,
334 // DeleteHandler deletes all copies of the specified block on local
339 // If the requested blocks was not found on any volume, the response
340 // code is HTTP 404 Not Found.
342 // Otherwise, the response code is 200 OK, with a response body
343 // consisting of the JSON message
345 // {"copies_deleted":d,"copies_failed":f}
347 // where d and f are integers representing the number of blocks that
348 // were successfully and unsuccessfully deleted.
350 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
351 hash := mux.Vars(req)["hash"]
353 // Confirm that this user is an admin and has a token with unlimited scope.
354 var tok = GetApiToken(req)
355 if tok == "" || !CanDelete(tok) {
356 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
361 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
365 // Delete copies of this block from all available volumes. Report
366 // how many blocks were successfully and unsuccessfully
369 Deleted int `json:"copies_deleted"`
370 Failed int `json:"copies_failed"`
372 for _, vol := range KeepVM.Volumes() {
373 if err := vol.Delete(hash); err == nil {
375 } else if os.IsNotExist(err) {
379 log.Println("DeleteHandler:", err)
385 if result.Deleted == 0 && result.Failed == 0 {
386 st = http.StatusNotFound
393 if st == http.StatusOK {
394 if body, err := json.Marshal(result); err == nil {
397 log.Printf("json.Marshal: %s (result = %v)\n", err, result)
398 http.Error(resp, err.Error(), 500)
403 /* PullHandler processes "PUT /pull" requests for the data manager.
404 The request body is a JSON message containing a list of pull
405 requests in the following format:
409 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
411 "keep0.qr1hi.arvadosapi.com:25107",
412 "keep1.qr1hi.arvadosapi.com:25108"
416 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
426 Each pull request in the list consists of a block locator string
427 and an ordered list of servers. Keepstore should try to fetch the
428 block from each server in turn.
430 If the request has not been sent by the Data Manager, return 401
433 If the JSON unmarshalling fails, return 400 Bad Request.
436 type PullRequest struct {
437 Locator string `json:"locator"`
438 Servers []string `json:"servers"`
441 func PullHandler(resp http.ResponseWriter, req *http.Request) {
442 // Reject unauthorized requests.
443 if !IsDataManagerToken(GetApiToken(req)) {
444 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
448 // Parse the request body.
450 r := json.NewDecoder(req.Body)
451 if err := r.Decode(&pr); err != nil {
452 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
456 // We have a properly formatted pull list sent from the data
457 // manager. Report success and send the list to the pull list
458 // manager for further handling.
459 resp.WriteHeader(http.StatusOK)
461 fmt.Sprintf("Received %d pull requests\n", len(pr))))
464 for _, p := range pr {
469 pullq = NewWorkQueue()
471 pullq.ReplaceQueue(plist)
474 type TrashRequest struct {
475 Locator string `json:"locator"`
476 BlockMtime int64 `json:"block_mtime"`
479 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
480 // Reject unauthorized requests.
481 if !IsDataManagerToken(GetApiToken(req)) {
482 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
486 // Parse the request body.
487 var trash []TrashRequest
488 r := json.NewDecoder(req.Body)
489 if err := r.Decode(&trash); err != nil {
490 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
494 // We have a properly formatted trash list sent from the data
495 // manager. Report success and send the list to the trash work
496 // queue for further handling.
497 resp.WriteHeader(http.StatusOK)
499 fmt.Sprintf("Received %d trash requests\n", len(trash))))
502 for _, t := range trash {
507 trashq = NewWorkQueue()
509 trashq.ReplaceQueue(tlist)
512 // ==============================
513 // GetBlock and PutBlock implement lower-level code for handling
514 // blocks by rooting through volumes connected to the local machine.
515 // Once the handler has determined that system policy permits the
516 // request, it calls these methods to perform the actual operation.
518 // TODO(twp): this code would probably be better located in the
519 // VolumeManager interface. As an abstraction, the VolumeManager
520 // should be the only part of the code that cares about which volume a
521 // block is stored on, so it should be responsible for figuring out
522 // which volume to check for fetching blocks, storing blocks, etc.
524 // ==============================
525 // GetBlock fetches and returns the block identified by "hash". If
526 // the update_timestamp argument is true, GetBlock also updates the
527 // block's file modification time (for the sake of PutBlock, which
528 // must update the file's timestamp when the block already exists).
530 // On success, GetBlock returns a byte slice with the block data, and
533 // If the block cannot be found on any volume, returns NotFoundError.
535 // If the block found does not have the correct MD5 hash, returns
539 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
540 // Attempt to read the requested hash from a keep volume.
541 error_to_caller := NotFoundError
543 for _, vol := range KeepVM.Volumes() {
544 if buf, err := vol.Get(hash); err != nil {
545 // IsNotExist is an expected error and may be ignored.
546 // (If all volumes report IsNotExist, we return a NotFoundError)
547 // All other errors should be logged but we continue trying to
550 case os.IsNotExist(err):
553 log.Printf("GetBlock: reading %s: %s\n", hash, err)
556 // Double check the file checksum.
558 filehash := fmt.Sprintf("%x", md5.Sum(buf))
559 if filehash != hash {
560 // TODO(twp): this condition probably represents a bad disk and
561 // should raise major alarm bells for an administrator: e.g.
562 // they should be sent directly to an event manager at high
563 // priority or logged as urgent problems.
565 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
567 error_to_caller = DiskHashError
570 if error_to_caller != NotFoundError {
571 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
574 // Update the timestamp if the caller requested.
575 // If we could not update the timestamp, continue looking on
577 if update_timestamp {
578 if vol.Touch(hash) != nil {
587 if error_to_caller != NotFoundError {
588 log.Printf("%s: checksum mismatch, no good copy found\n", hash)
590 return nil, error_to_caller
593 /* PutBlock(block, hash)
594 Stores the BLOCK (identified by the content id HASH) in Keep.
596 The MD5 checksum of the block must be identical to the content id HASH.
597 If not, an error is returned.
599 PutBlock stores the BLOCK on the first Keep volume with free space.
600 A failure code is returned to the user only if all volumes fail.
602 On success, PutBlock returns nil.
603 On failure, it returns a KeepError with one of the following codes:
606 A different block with the same hash already exists on this
609 The MD5 hash of the BLOCK does not match the argument HASH.
611 There was not enough space left in any Keep volume to store
614 The object could not be stored for some other reason (e.g.
615 all writes failed). The text of the error message should
616 provide as much detail as possible.
619 func PutBlock(block []byte, hash string) error {
620 // Check that BLOCK's checksum matches HASH.
621 blockhash := fmt.Sprintf("%x", md5.Sum(block))
622 if blockhash != hash {
623 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
624 return RequestHashError
627 // If we already have a block on disk under this identifier, return
628 // success (but check for MD5 collisions). While fetching the block,
629 // update its timestamp.
630 // The only errors that GetBlock can return are DiskHashError and NotFoundError.
631 // In either case, we want to write our new (good) block to disk,
632 // so there is nothing special to do if err != nil.
634 if oldblock, err := GetBlock(hash, true); err == nil {
635 if bytes.Compare(block, oldblock) == 0 {
636 // The block already exists; return success.
639 return CollisionError
643 // Choose a Keep volume to write to.
644 // If this volume fails, try all of the volumes in order.
645 vol := KeepVM.Choose()
646 if err := vol.Put(hash, block); err == nil {
647 return nil // success!
650 for _, vol := range KeepVM.Volumes() {
651 err := vol.Put(hash, block)
653 return nil // success!
655 if err != FullError {
656 // The volume is not full but the write did not succeed.
657 // Report the error and continue trying.
659 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
664 log.Printf("all Keep volumes full")
667 log.Printf("all Keep volumes failed")
674 // Return true if the specified string is a valid Keep locator.
675 // When Keep is extended to support hash types other than MD5,
676 // this should be updated to cover those as well.
678 func IsValidLocator(loc string) bool {
679 match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
683 log.Printf("IsValidLocator: %s\n", err)
687 // GetApiToken returns the OAuth2 token from the Authorization
688 // header of a HTTP request, or an empty string if no matching
690 func GetApiToken(req *http.Request) string {
691 if auth, ok := req.Header["Authorization"]; ok {
692 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
694 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
701 // IsExpired returns true if the given Unix timestamp (expressed as a
702 // hexadecimal string) is in the past, or if timestamp_hex cannot be
703 // parsed as a hexadecimal string.
704 func IsExpired(timestamp_hex string) bool {
705 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
707 log.Printf("IsExpired: %s\n", err)
710 return time.Unix(ts, 0).Before(time.Now())
713 // CanDelete returns true if the user identified by api_token is
714 // allowed to delete blocks.
715 func CanDelete(api_token string) bool {
719 // Blocks may be deleted only when Keep has been configured with a
721 if IsDataManagerToken(api_token) {
724 // TODO(twp): look up api_token with the API server
725 // return true if is_admin is true and if the token
726 // has unlimited scope
730 // IsDataManagerToken returns true if api_token represents the data
732 func IsDataManagerToken(api_token string) bool {
733 return data_manager_token != "" && api_token == data_manager_token