3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
17 "github.com/gorilla/mux"
30 // MakeRESTRouter returns a new mux.Router that forwards all Keep
31 // requests to the appropriate handlers.
33 func MakeRESTRouter() *mux.Router {
34 rest := mux.NewRouter()
37 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
39 `/{hash:[0-9a-f]{32}}+{hints}`,
40 GetBlockHandler).Methods("GET", "HEAD")
42 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
43 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
45 // For IndexHandler we support:
46 // /index - returns all locators
47 // /index/{prefix} - returns all locators that begin with {prefix}
48 // {prefix} is a string of hexadecimal digits between 0 and 32 digits.
49 // If {prefix} is the empty string, return an index of all locators
50 // (so /index and /index/ behave identically)
51 // A client may supply a full 32-digit locator string, in which
52 // case the server will return an index with either zero or one
53 // entries. This usage allows a client to check whether a block is
54 // present, and its size and upload time, without retrieving the
57 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
59 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
60 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
62 // The PullHandler and TrashHandler process "PUT /pull" and "PUT
63 // /trash" requests from Data Manager. These requests instruct
64 // Keep to replicate or delete blocks; see
65 // https://arvados.org/projects/orvos-private/wiki/Keep_Design_Doc
68 // Each handler parses the JSON list of block management requests
69 // in the message body, and delivers them to the pull queue or
70 // trash queue, respectively.
71 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
72 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
74 // Any request which does not match any of these routes gets
76 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
81 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
82 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
85 // FindKeepVolumes scans all mounted volumes on the system for Keep
86 // volumes, and returns a list of matching paths.
88 // A device is assumed to be a Keep volume if it is a normal or tmpfs
89 // volume and has a "/keep" directory directly underneath the mount
92 func FindKeepVolumes() []string {
93 vols := make([]string, 0)
95 if f, err := os.Open(PROC_MOUNTS); err != nil {
96 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
98 scanner := bufio.NewScanner(f)
100 args := strings.Fields(scanner.Text())
101 dev, mount := args[0], args[1]
103 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
104 keep := mount + "/keep"
105 if st, err := os.Stat(keep); err == nil && st.IsDir() {
106 vols = append(vols, keep)
110 if err := scanner.Err(); err != nil {
117 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
118 hash := mux.Vars(req)["hash"]
120 log.Printf("%s %s", req.Method, hash)
122 hints := mux.Vars(req)["hints"]
124 // Parse the locator string and hints from the request.
125 // TODO(twp): implement a Locator type.
126 var signature, timestamp string
128 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
129 for _, hint := range strings.Split(hints, "+") {
130 if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
131 // Server ignores size hints
132 } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
135 } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
136 // Any unknown hint that starts with an uppercase letter is
137 // presumed to be valid and ignored, to permit forward compatibility.
139 // Unknown format; not a valid locator.
140 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
146 // If permission checking is in effect, verify this
147 // request's permission signature.
148 if enforce_permissions {
149 if signature == "" || timestamp == "" {
150 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
152 } else if IsExpired(timestamp) {
153 http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
156 req_locator := req.URL.Path[1:] // strip leading slash
157 if !VerifySignature(req_locator, GetApiToken(req)) {
158 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
164 block, err := GetBlock(hash, false)
166 // Garbage collect after each GET. Fixes #2865.
167 // TODO(twp): review Keep memory usage and see if there's
168 // a better way to do this than blindly garbage collecting
169 // after every block.
173 // This type assertion is safe because the only errors
174 // GetBlock can return are DiskHashError or NotFoundError.
175 if err == NotFoundError {
176 log.Printf("%s: not found, giving up\n", hash)
178 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
182 resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
184 _, err = resp.Write(block)
186 log.Printf("GetBlockHandler: writing response: %s", err)
192 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
193 // Garbage collect after each PUT. Fixes #2865.
194 // See also GetBlockHandler.
197 hash := mux.Vars(req)["hash"]
199 log.Printf("%s %s", req.Method, hash)
201 // Read the block data to be stored.
202 // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
204 if req.ContentLength > BLOCKSIZE {
205 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
209 buf := make([]byte, req.ContentLength)
210 nread, err := io.ReadFull(req.Body, buf)
212 http.Error(resp, err.Error(), 500)
213 } else if int64(nread) < req.ContentLength {
214 http.Error(resp, "request truncated", 500)
216 if err := PutBlock(buf, hash); err == nil {
217 // Success; add a size hint, sign the locator if
218 // possible, and return it to the client.
219 return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
220 api_token := GetApiToken(req)
221 if PermissionSecret != nil && api_token != "" {
222 expiry := time.Now().Add(permission_ttl)
223 return_hash = SignLocator(return_hash, api_token, expiry)
225 resp.Write([]byte(return_hash + "\n"))
227 ke := err.(*KeepError)
228 http.Error(resp, ke.Error(), ke.HTTPCode)
235 // A HandleFunc to address /index and /index/{prefix} requests.
237 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
238 prefix := mux.Vars(req)["prefix"]
240 // Only the data manager may issue /index requests,
241 // and only if enforce_permissions is enabled.
242 // All other requests return 403 Forbidden.
243 api_token := GetApiToken(req)
244 if !enforce_permissions ||
246 data_manager_token != api_token {
247 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
251 for _, vol := range KeepVM.Volumes() {
252 index = index + vol.Index(prefix)
254 resp.Write([]byte(index))
258 // Responds to /status.json requests with the current node status,
259 // described in a JSON structure.
261 // The data given in a status.json response includes:
262 // volumes - a list of Keep volumes currently in use by this server
263 // each volume is an object with the following fields:
265 // * device_num (an integer identifying the underlying filesystem)
269 type VolumeStatus struct {
270 MountPoint string `json:"mount_point"`
271 DeviceNum uint64 `json:"device_num"`
272 BytesFree uint64 `json:"bytes_free"`
273 BytesUsed uint64 `json:"bytes_used"`
276 type NodeStatus struct {
277 Volumes []*VolumeStatus `json:"volumes"`
280 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
281 st := GetNodeStatus()
282 if jstat, err := json.Marshal(st); err == nil {
285 log.Printf("json.Marshal: %s\n", err)
286 log.Printf("NodeStatus = %v\n", st)
287 http.Error(resp, err.Error(), 500)
292 // Returns a NodeStatus struct describing this Keep
293 // node's current status.
295 func GetNodeStatus() *NodeStatus {
296 st := new(NodeStatus)
298 st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
299 for i, vol := range KeepVM.Volumes() {
300 st.Volumes[i] = vol.Status()
306 // Returns a VolumeStatus describing the requested volume.
308 func GetVolumeStatus(volume string) *VolumeStatus {
309 var fs syscall.Statfs_t
312 if fi, err := os.Stat(volume); err == nil {
313 devnum = fi.Sys().(*syscall.Stat_t).Dev
315 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
319 err := syscall.Statfs(volume, &fs)
321 log.Printf("GetVolumeStatus: statfs: %s\n", err)
324 // These calculations match the way df calculates disk usage:
325 // "free" space is measured by fs.Bavail, but "used" space
326 // uses fs.Blocks - fs.Bfree.
327 free := fs.Bavail * uint64(fs.Bsize)
328 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
329 return &VolumeStatus{volume, devnum, free, used}
332 // DeleteHandler processes DELETE requests.
334 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
335 // from all connected volumes.
337 // Only the Data Manager, or an Arvados admin with scope "all", are
338 // allowed to issue DELETE requests. If a DELETE request is not
339 // authenticated or is issued by a non-admin user, the server returns
340 // a PermissionError.
342 // Upon receiving a valid request from an authorized user,
343 // DeleteHandler deletes all copies of the specified block on local
348 // If the requested blocks was not found on any volume, the response
349 // code is HTTP 404 Not Found.
351 // Otherwise, the response code is 200 OK, with a response body
352 // consisting of the JSON message
354 // {"copies_deleted":d,"copies_failed":f}
356 // where d and f are integers representing the number of blocks that
357 // were successfully and unsuccessfully deleted.
359 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
360 hash := mux.Vars(req)["hash"]
361 log.Printf("%s %s", req.Method, hash)
363 // Confirm that this user is an admin and has a token with unlimited scope.
364 var tok = GetApiToken(req)
365 if tok == "" || !CanDelete(tok) {
366 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
371 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
375 // Delete copies of this block from all available volumes. Report
376 // how many blocks were successfully and unsuccessfully
379 Deleted int `json:"copies_deleted"`
380 Failed int `json:"copies_failed"`
382 for _, vol := range KeepVM.Volumes() {
383 if err := vol.Delete(hash); err == nil {
385 } else if os.IsNotExist(err) {
389 log.Println("DeleteHandler:", err)
395 if result.Deleted == 0 && result.Failed == 0 {
396 st = http.StatusNotFound
403 if st == http.StatusOK {
404 if body, err := json.Marshal(result); err == nil {
407 log.Printf("json.Marshal: %s (result = %v)\n", err, result)
408 http.Error(resp, err.Error(), 500)
413 /* PullHandler processes "PUT /pull" requests for the data manager.
414 The request body is a JSON message containing a list of pull
415 requests in the following format:
419 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
421 "keep0.qr1hi.arvadosapi.com:25107",
422 "keep1.qr1hi.arvadosapi.com:25108"
426 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
436 Each pull request in the list consists of a block locator string
437 and an ordered list of servers. Keepstore should try to fetch the
438 block from each server in turn.
440 If the request has not been sent by the Data Manager, return 401
443 If the JSON unmarshalling fails, return 400 Bad Request.
446 type PullRequest struct {
447 Locator string `json:"locator"`
448 Servers []string `json:"servers"`
451 func PullHandler(resp http.ResponseWriter, req *http.Request) {
452 // Reject unauthorized requests.
453 api_token := GetApiToken(req)
454 if !IsDataManagerToken(api_token) {
455 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
456 log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
460 // Parse the request body.
462 r := json.NewDecoder(req.Body)
463 if err := r.Decode(&pr); err != nil {
464 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
465 log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
469 // We have a properly formatted pull list sent from the data
470 // manager. Report success and send the list to the pull list
471 // manager for further handling.
472 log.Printf("%s %s: received %v\n", req.Method, req.URL, pr)
473 resp.WriteHeader(http.StatusOK)
475 fmt.Sprintf("Received %d pull requests\n", len(pr))))
478 for _, p := range pr {
483 pullq = NewWorkQueue()
485 pullq.ReplaceQueue(plist)
488 type TrashRequest struct {
489 Locator string `json:"locator"`
490 BlockMtime int64 `json:"block_mtime"`
493 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
494 // Reject unauthorized requests.
495 api_token := GetApiToken(req)
496 if !IsDataManagerToken(api_token) {
497 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
498 log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
502 // Parse the request body.
503 var trash []TrashRequest
504 r := json.NewDecoder(req.Body)
505 if err := r.Decode(&trash); err != nil {
506 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
507 log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
511 // We have a properly formatted trash list sent from the data
512 // manager. Report success and send the list to the trash work
513 // queue for further handling.
514 log.Printf("%s %s: received %v\n", req.Method, req.URL, trash)
515 resp.WriteHeader(http.StatusOK)
517 fmt.Sprintf("Received %d trash requests\n", len(trash))))
520 for _, t := range trash {
525 trashq = NewWorkQueue()
527 trashq.ReplaceQueue(tlist)
530 // ==============================
531 // GetBlock and PutBlock implement lower-level code for handling
532 // blocks by rooting through volumes connected to the local machine.
533 // Once the handler has determined that system policy permits the
534 // request, it calls these methods to perform the actual operation.
536 // TODO(twp): this code would probably be better located in the
537 // VolumeManager interface. As an abstraction, the VolumeManager
538 // should be the only part of the code that cares about which volume a
539 // block is stored on, so it should be responsible for figuring out
540 // which volume to check for fetching blocks, storing blocks, etc.
542 // ==============================
543 // GetBlock fetches and returns the block identified by "hash". If
544 // the update_timestamp argument is true, GetBlock also updates the
545 // block's file modification time (for the sake of PutBlock, which
546 // must update the file's timestamp when the block already exists).
548 // On success, GetBlock returns a byte slice with the block data, and
551 // If the block cannot be found on any volume, returns NotFoundError.
553 // If the block found does not have the correct MD5 hash, returns
557 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
558 // Attempt to read the requested hash from a keep volume.
559 error_to_caller := NotFoundError
561 for _, vol := range KeepVM.Volumes() {
562 if buf, err := vol.Get(hash); err != nil {
563 // IsNotExist is an expected error and may be ignored.
564 // (If all volumes report IsNotExist, we return a NotFoundError)
565 // All other errors should be logged but we continue trying to
568 case os.IsNotExist(err):
571 log.Printf("GetBlock: reading %s: %s\n", hash, err)
574 // Double check the file checksum.
576 filehash := fmt.Sprintf("%x", md5.Sum(buf))
577 if filehash != hash {
578 // TODO(twp): this condition probably represents a bad disk and
579 // should raise major alarm bells for an administrator: e.g.
580 // they should be sent directly to an event manager at high
581 // priority or logged as urgent problems.
583 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
585 error_to_caller = DiskHashError
588 if error_to_caller != NotFoundError {
589 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
592 // Update the timestamp if the caller requested.
593 // If we could not update the timestamp, continue looking on
595 if update_timestamp {
596 if vol.Touch(hash) != nil {
605 if error_to_caller != NotFoundError {
606 log.Printf("%s: checksum mismatch, no good copy found\n", hash)
608 return nil, error_to_caller
611 /* PutBlock(block, hash)
612 Stores the BLOCK (identified by the content id HASH) in Keep.
614 The MD5 checksum of the block must be identical to the content id HASH.
615 If not, an error is returned.
617 PutBlock stores the BLOCK on the first Keep volume with free space.
618 A failure code is returned to the user only if all volumes fail.
620 On success, PutBlock returns nil.
621 On failure, it returns a KeepError with one of the following codes:
624 A different block with the same hash already exists on this
627 The MD5 hash of the BLOCK does not match the argument HASH.
629 There was not enough space left in any Keep volume to store
632 The object could not be stored for some other reason (e.g.
633 all writes failed). The text of the error message should
634 provide as much detail as possible.
637 func PutBlock(block []byte, hash string) error {
638 // Check that BLOCK's checksum matches HASH.
639 blockhash := fmt.Sprintf("%x", md5.Sum(block))
640 if blockhash != hash {
641 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
642 return RequestHashError
645 // If we already have a block on disk under this identifier, return
646 // success (but check for MD5 collisions). While fetching the block,
647 // update its timestamp.
648 // The only errors that GetBlock can return are DiskHashError and NotFoundError.
649 // In either case, we want to write our new (good) block to disk,
650 // so there is nothing special to do if err != nil.
652 if oldblock, err := GetBlock(hash, true); err == nil {
653 if bytes.Compare(block, oldblock) == 0 {
654 // The block already exists; return success.
657 return CollisionError
661 // Choose a Keep volume to write to.
662 // If this volume fails, try all of the volumes in order.
663 vol := KeepVM.Choose()
664 if err := vol.Put(hash, block); err == nil {
665 return nil // success!
668 for _, vol := range KeepVM.Volumes() {
669 err := vol.Put(hash, block)
671 return nil // success!
673 if err != FullError {
674 // The volume is not full but the write did not succeed.
675 // Report the error and continue trying.
677 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
682 log.Printf("all Keep volumes full")
685 log.Printf("all Keep volumes failed")
692 // Return true if the specified string is a valid Keep locator.
693 // When Keep is extended to support hash types other than MD5,
694 // this should be updated to cover those as well.
696 func IsValidLocator(loc string) bool {
697 match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
701 log.Printf("IsValidLocator: %s\n", err)
705 // GetApiToken returns the OAuth2 token from the Authorization
706 // header of a HTTP request, or an empty string if no matching
708 func GetApiToken(req *http.Request) string {
709 if auth, ok := req.Header["Authorization"]; ok {
710 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
712 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
719 // IsExpired returns true if the given Unix timestamp (expressed as a
720 // hexadecimal string) is in the past, or if timestamp_hex cannot be
721 // parsed as a hexadecimal string.
722 func IsExpired(timestamp_hex string) bool {
723 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
725 log.Printf("IsExpired: %s\n", err)
728 return time.Unix(ts, 0).Before(time.Now())
731 // CanDelete returns true if the user identified by api_token is
732 // allowed to delete blocks.
733 func CanDelete(api_token string) bool {
737 // Blocks may be deleted only when Keep has been configured with a
739 if IsDataManagerToken(api_token) {
742 // TODO(twp): look up api_token with the API server
743 // return true if is_admin is true and if the token
744 // has unlimited scope
748 // IsDataManagerToken returns true if api_token represents the data
750 func IsDataManagerToken(api_token string) bool {
751 return data_manager_token != "" && api_token == data_manager_token