3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
17 "github.com/gorilla/mux"
30 // MakeRESTRouter returns a new mux.Router that forwards all Keep
31 // requests to the appropriate handlers.
33 func MakeRESTRouter() *mux.Router {
34 rest := mux.NewRouter()
37 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
39 `/{hash:[0-9a-f]{32}}+{hints}`,
40 GetBlockHandler).Methods("GET", "HEAD")
42 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
43 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
45 // For IndexHandler we support:
46 // /index - returns all locators
47 // /index/{prefix} - returns all locators that begin with {prefix}
48 // {prefix} is a string of hexadecimal digits between 0 and 32 digits.
49 // If {prefix} is the empty string, return an index of all locators
50 // (so /index and /index/ behave identically)
51 // A client may supply a full 32-digit locator string, in which
52 // case the server will return an index with either zero or one
53 // entries. This usage allows a client to check whether a block is
54 // present, and its size and upload time, without retrieving the
57 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
59 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
60 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
62 // The PullHandler and TrashHandler process "PUT /pull" and "PUT
63 // /trash" requests from Data Manager. These requests instruct
64 // Keep to replicate or delete blocks; see
65 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
68 // Each handler parses the JSON list of block management requests
69 // in the message body, and replaces any existing pull queue or
70 // trash queue with their contentes.
72 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
73 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
75 // Any request which does not match any of these routes gets
77 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
82 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
83 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
86 // FindKeepVolumes scans all mounted volumes on the system for Keep
87 // volumes, and returns a list of matching paths.
89 // A device is assumed to be a Keep volume if it is a normal or tmpfs
90 // volume and has a "/keep" directory directly underneath the mount
93 func FindKeepVolumes() []string {
94 vols := make([]string, 0)
96 if f, err := os.Open(PROC_MOUNTS); err != nil {
97 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
99 scanner := bufio.NewScanner(f)
101 args := strings.Fields(scanner.Text())
102 dev, mount := args[0], args[1]
104 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
105 keep := mount + "/keep"
106 if st, err := os.Stat(keep); err == nil && st.IsDir() {
107 vols = append(vols, keep)
111 if err := scanner.Err(); err != nil {
118 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
119 hash := mux.Vars(req)["hash"]
121 hints := mux.Vars(req)["hints"]
123 // Parse the locator string and hints from the request.
124 // TODO(twp): implement a Locator type.
125 var signature, timestamp string
127 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
128 for _, hint := range strings.Split(hints, "+") {
129 if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
130 // Server ignores size hints
131 } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
134 } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
135 // Any unknown hint that starts with an uppercase letter is
136 // presumed to be valid and ignored, to permit forward compatibility.
138 // Unknown format; not a valid locator.
139 log.Printf("%s %s %d %s", req.Method, hash, BadRequestError.HTTPCode, "-")
140 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
146 // If permission checking is in effect, verify this
147 // request's permission signature.
148 if enforce_permissions {
149 if signature == "" || timestamp == "" {
150 log.Printf("%s %s %d %s", req.Method, hash, PermissionError.HTTPCode, "-")
151 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
153 } else if IsExpired(timestamp) {
154 log.Printf("%s %s %d %s", req.Method, hash, ExpiredError.HTTPCode, "-")
155 http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
158 req_locator := req.URL.Path[1:] // strip leading slash
159 if !VerifySignature(req_locator, GetApiToken(req)) {
160 log.Printf("%s %s %d %s", req.Method, hash, PermissionError.HTTPCode, "-")
161 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
167 block, err := GetBlock(hash, false)
169 // Garbage collect after each GET. Fixes #2865.
170 // TODO(twp): review Keep memory usage and see if there's
171 // a better way to do this than blindly garbage collecting
172 // after every block.
176 // This type assertion is safe because the only errors
177 // GetBlock can return are DiskHashError or NotFoundError.
178 if err == NotFoundError {
179 log.Printf("%s: not found, giving up\n", hash)
181 log.Printf("%s %s %d %s", req.Method, hash, err.(*KeepError).HTTPCode, "-")
182 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
186 resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
188 _, err = resp.Write(block)
190 log.Printf("%s %s %d %s", req.Method, hash, err.(*KeepError).HTTPCode, len(block), "-")
192 log.Printf("%s %s %d %d", req.Method, hash, 200, len(block))
198 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
199 // Garbage collect after each PUT. Fixes #2865.
200 // See also GetBlockHandler.
203 hash := mux.Vars(req)["hash"]
205 // Read the block data to be stored.
206 // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
208 if req.ContentLength > BLOCKSIZE {
209 log.Printf("%s %s %d %d", req.Method, hash, TooLongError.HTTPCode, req.ContentLength)
210 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
214 buf := make([]byte, req.ContentLength)
215 nread, err := io.ReadFull(req.Body, buf)
217 log.Printf("%s %s %d %d", req.Method, hash, 500, req.ContentLength)
218 http.Error(resp, err.Error(), 500)
219 } else if int64(nread) < req.ContentLength {
220 log.Printf("%s %s %d %d", req.Method, hash, 500, req.ContentLength)
221 http.Error(resp, "request truncated", 500)
223 if err := PutBlock(buf, hash); err == nil {
224 // Success; add a size hint, sign the locator if
225 // possible, and return it to the client.
226 return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
227 api_token := GetApiToken(req)
228 if PermissionSecret != nil && api_token != "" {
229 expiry := time.Now().Add(permission_ttl)
230 return_hash = SignLocator(return_hash, api_token, expiry)
232 log.Printf("%s %s %d %d", req.Method, hash, 200, req.ContentLength)
233 resp.Write([]byte(return_hash + "\n"))
235 ke := err.(*KeepError)
236 log.Printf("%s %s %d %d", req.Method, hash, ke.HTTPCode, req.ContentLength)
237 http.Error(resp, ke.Error(), ke.HTTPCode)
244 // A HandleFunc to address /index and /index/{prefix} requests.
246 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
247 prefix := mux.Vars(req)["prefix"]
249 // Only the data manager may issue /index requests,
250 // and only if enforce_permissions is enabled.
251 // All other requests return 403 Forbidden.
252 api_token := GetApiToken(req)
253 if !enforce_permissions ||
255 data_manager_token != api_token {
256 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
260 for _, vol := range KeepVM.Volumes() {
261 index = index + vol.Index(prefix)
263 resp.Write([]byte(index))
267 // Responds to /status.json requests with the current node status,
268 // described in a JSON structure.
270 // The data given in a status.json response includes:
271 // volumes - a list of Keep volumes currently in use by this server
272 // each volume is an object with the following fields:
274 // * device_num (an integer identifying the underlying filesystem)
278 type VolumeStatus struct {
279 MountPoint string `json:"mount_point"`
280 DeviceNum uint64 `json:"device_num"`
281 BytesFree uint64 `json:"bytes_free"`
282 BytesUsed uint64 `json:"bytes_used"`
285 type NodeStatus struct {
286 Volumes []*VolumeStatus `json:"volumes"`
289 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
290 st := GetNodeStatus()
291 if jstat, err := json.Marshal(st); err == nil {
294 log.Printf("json.Marshal: %s\n", err)
295 log.Printf("NodeStatus = %v\n", st)
296 http.Error(resp, err.Error(), 500)
301 // Returns a NodeStatus struct describing this Keep
302 // node's current status.
304 func GetNodeStatus() *NodeStatus {
305 st := new(NodeStatus)
307 st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
308 for i, vol := range KeepVM.Volumes() {
309 st.Volumes[i] = vol.Status()
315 // Returns a VolumeStatus describing the requested volume.
317 func GetVolumeStatus(volume string) *VolumeStatus {
318 var fs syscall.Statfs_t
321 if fi, err := os.Stat(volume); err == nil {
322 devnum = fi.Sys().(*syscall.Stat_t).Dev
324 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
328 err := syscall.Statfs(volume, &fs)
330 log.Printf("GetVolumeStatus: statfs: %s\n", err)
333 // These calculations match the way df calculates disk usage:
334 // "free" space is measured by fs.Bavail, but "used" space
335 // uses fs.Blocks - fs.Bfree.
336 free := fs.Bavail * uint64(fs.Bsize)
337 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
338 return &VolumeStatus{volume, devnum, free, used}
341 // DeleteHandler processes DELETE requests.
343 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
344 // from all connected volumes.
346 // Only the Data Manager, or an Arvados admin with scope "all", are
347 // allowed to issue DELETE requests. If a DELETE request is not
348 // authenticated or is issued by a non-admin user, the server returns
349 // a PermissionError.
351 // Upon receiving a valid request from an authorized user,
352 // DeleteHandler deletes all copies of the specified block on local
357 // If the requested blocks was not found on any volume, the response
358 // code is HTTP 404 Not Found.
360 // Otherwise, the response code is 200 OK, with a response body
361 // consisting of the JSON message
363 // {"copies_deleted":d,"copies_failed":f}
365 // where d and f are integers representing the number of blocks that
366 // were successfully and unsuccessfully deleted.
368 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
369 hash := mux.Vars(req)["hash"]
370 log.Printf("%s %s", req.Method, hash)
372 // Confirm that this user is an admin and has a token with unlimited scope.
373 var tok = GetApiToken(req)
374 if tok == "" || !CanDelete(tok) {
375 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
380 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
384 // Delete copies of this block from all available volumes. Report
385 // how many blocks were successfully and unsuccessfully
388 Deleted int `json:"copies_deleted"`
389 Failed int `json:"copies_failed"`
391 for _, vol := range KeepVM.Volumes() {
392 if err := vol.Delete(hash); err == nil {
394 } else if os.IsNotExist(err) {
398 log.Println("DeleteHandler:", err)
404 if result.Deleted == 0 && result.Failed == 0 {
405 st = http.StatusNotFound
412 if st == http.StatusOK {
413 if body, err := json.Marshal(result); err == nil {
416 log.Printf("json.Marshal: %s (result = %v)\n", err, result)
417 http.Error(resp, err.Error(), 500)
422 /* PullHandler processes "PUT /pull" requests for the data manager.
423 The request body is a JSON message containing a list of pull
424 requests in the following format:
428 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
430 "keep0.qr1hi.arvadosapi.com:25107",
431 "keep1.qr1hi.arvadosapi.com:25108"
435 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
445 Each pull request in the list consists of a block locator string
446 and an ordered list of servers. Keepstore should try to fetch the
447 block from each server in turn.
449 If the request has not been sent by the Data Manager, return 401
452 If the JSON unmarshalling fails, return 400 Bad Request.
455 type PullRequest struct {
456 Locator string `json:"locator"`
457 Servers []string `json:"servers"`
460 func PullHandler(resp http.ResponseWriter, req *http.Request) {
461 // Reject unauthorized requests.
462 if !IsDataManagerToken(GetApiToken(req)) {
463 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
464 log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
468 // Parse the request body.
470 r := json.NewDecoder(req.Body)
471 if err := r.Decode(&pr); err != nil {
472 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
473 log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
477 // We have a properly formatted pull list sent from the data
478 // manager. Report success and send the list to the pull list
479 // manager for further handling.
480 log.Printf("%s %s: received %v\n", req.Method, req.URL, pr)
481 resp.WriteHeader(http.StatusOK)
483 fmt.Sprintf("Received %d pull requests\n", len(pr))))
486 for _, p := range pr {
491 pullq = NewWorkQueue()
493 pullq.ReplaceQueue(plist)
496 type TrashRequest struct {
497 Locator string `json:"locator"`
498 BlockMtime int64 `json:"block_mtime"`
501 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
502 // Reject unauthorized requests.
503 if !IsDataManagerToken(GetApiToken(req)) {
504 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
505 log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
509 // Parse the request body.
510 var trash []TrashRequest
511 r := json.NewDecoder(req.Body)
512 if err := r.Decode(&trash); err != nil {
513 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
514 log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
518 // We have a properly formatted trash list sent from the data
519 // manager. Report success and send the list to the trash work
520 // queue for further handling.
521 log.Printf("%s %s: received %v\n", req.Method, req.URL, trash)
522 resp.WriteHeader(http.StatusOK)
524 fmt.Sprintf("Received %d trash requests\n", len(trash))))
527 for _, t := range trash {
532 trashq = NewWorkQueue()
534 trashq.ReplaceQueue(tlist)
537 // ==============================
538 // GetBlock and PutBlock implement lower-level code for handling
539 // blocks by rooting through volumes connected to the local machine.
540 // Once the handler has determined that system policy permits the
541 // request, it calls these methods to perform the actual operation.
543 // TODO(twp): this code would probably be better located in the
544 // VolumeManager interface. As an abstraction, the VolumeManager
545 // should be the only part of the code that cares about which volume a
546 // block is stored on, so it should be responsible for figuring out
547 // which volume to check for fetching blocks, storing blocks, etc.
549 // ==============================
550 // GetBlock fetches and returns the block identified by "hash". If
551 // the update_timestamp argument is true, GetBlock also updates the
552 // block's file modification time (for the sake of PutBlock, which
553 // must update the file's timestamp when the block already exists).
555 // On success, GetBlock returns a byte slice with the block data, and
558 // If the block cannot be found on any volume, returns NotFoundError.
560 // If the block found does not have the correct MD5 hash, returns
564 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
565 // Attempt to read the requested hash from a keep volume.
566 error_to_caller := NotFoundError
568 for _, vol := range KeepVM.Volumes() {
569 if buf, err := vol.Get(hash); err != nil {
570 // IsNotExist is an expected error and may be ignored.
571 // (If all volumes report IsNotExist, we return a NotFoundError)
572 // All other errors should be logged but we continue trying to
575 case os.IsNotExist(err):
578 log.Printf("GetBlock: reading %s: %s\n", hash, err)
581 // Double check the file checksum.
583 filehash := fmt.Sprintf("%x", md5.Sum(buf))
584 if filehash != hash {
585 // TODO(twp): this condition probably represents a bad disk and
586 // should raise major alarm bells for an administrator: e.g.
587 // they should be sent directly to an event manager at high
588 // priority or logged as urgent problems.
590 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
592 error_to_caller = DiskHashError
595 if error_to_caller != NotFoundError {
596 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
599 // Update the timestamp if the caller requested.
600 // If we could not update the timestamp, continue looking on
602 if update_timestamp {
603 if vol.Touch(hash) != nil {
612 if error_to_caller != NotFoundError {
613 log.Printf("%s: checksum mismatch, no good copy found\n", hash)
615 return nil, error_to_caller
618 /* PutBlock(block, hash)
619 Stores the BLOCK (identified by the content id HASH) in Keep.
621 The MD5 checksum of the block must be identical to the content id HASH.
622 If not, an error is returned.
624 PutBlock stores the BLOCK on the first Keep volume with free space.
625 A failure code is returned to the user only if all volumes fail.
627 On success, PutBlock returns nil.
628 On failure, it returns a KeepError with one of the following codes:
631 A different block with the same hash already exists on this
634 The MD5 hash of the BLOCK does not match the argument HASH.
636 There was not enough space left in any Keep volume to store
639 The object could not be stored for some other reason (e.g.
640 all writes failed). The text of the error message should
641 provide as much detail as possible.
644 func PutBlock(block []byte, hash string) error {
645 // Check that BLOCK's checksum matches HASH.
646 blockhash := fmt.Sprintf("%x", md5.Sum(block))
647 if blockhash != hash {
648 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
649 return RequestHashError
652 // If we already have a block on disk under this identifier, return
653 // success (but check for MD5 collisions). While fetching the block,
654 // update its timestamp.
655 // The only errors that GetBlock can return are DiskHashError and NotFoundError.
656 // In either case, we want to write our new (good) block to disk,
657 // so there is nothing special to do if err != nil.
659 if oldblock, err := GetBlock(hash, true); err == nil {
660 if bytes.Compare(block, oldblock) == 0 {
661 // The block already exists; return success.
664 return CollisionError
668 // Choose a Keep volume to write to.
669 // If this volume fails, try all of the volumes in order.
670 vol := KeepVM.Choose()
671 if err := vol.Put(hash, block); err == nil {
672 return nil // success!
675 for _, vol := range KeepVM.Volumes() {
676 err := vol.Put(hash, block)
678 return nil // success!
680 if err != FullError {
681 // The volume is not full but the write did not succeed.
682 // Report the error and continue trying.
684 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
689 log.Printf("all Keep volumes full")
692 log.Printf("all Keep volumes failed")
699 // Return true if the specified string is a valid Keep locator.
700 // When Keep is extended to support hash types other than MD5,
701 // this should be updated to cover those as well.
703 func IsValidLocator(loc string) bool {
704 match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
708 log.Printf("IsValidLocator: %s\n", err)
712 // GetApiToken returns the OAuth2 token from the Authorization
713 // header of a HTTP request, or an empty string if no matching
715 func GetApiToken(req *http.Request) string {
716 if auth, ok := req.Header["Authorization"]; ok {
717 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
719 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
726 // IsExpired returns true if the given Unix timestamp (expressed as a
727 // hexadecimal string) is in the past, or if timestamp_hex cannot be
728 // parsed as a hexadecimal string.
729 func IsExpired(timestamp_hex string) bool {
730 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
732 log.Printf("IsExpired: %s\n", err)
735 return time.Unix(ts, 0).Before(time.Now())
738 // CanDelete returns true if the user identified by api_token is
739 // allowed to delete blocks.
740 func CanDelete(api_token string) bool {
744 // Blocks may be deleted only when Keep has been configured with a
746 if IsDataManagerToken(api_token) {
749 // TODO(twp): look up api_token with the API server
750 // return true if is_admin is true and if the token
751 // has unlimited scope
755 // IsDataManagerToken returns true if api_token represents the data
757 func IsDataManagerToken(api_token string) bool {
758 return data_manager_token != "" && api_token == data_manager_token