3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
16 "github.com/gorilla/mux"
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
32 func MakeRESTRouter() *mux.Router {
33 rest := mux.NewRouter()
36 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
38 `/{hash:[0-9a-f]{32}}+{hints}`,
39 GetBlockHandler).Methods("GET", "HEAD")
41 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
44 // For IndexHandler we support:
45 // /index - returns all locators
46 // /index/{prefix} - returns all locators that begin with {prefix}
47 // {prefix} is a string of hexadecimal digits between 0 and 32 digits.
48 // If {prefix} is the empty string, return an index of all locators
49 // (so /index and /index/ behave identically)
50 // A client may supply a full 32-digit locator string, in which
51 // case the server will return an index with either zero or one
52 // entries. This usage allows a client to check whether a block is
53 // present, and its size and upload time, without retrieving the
56 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
58 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
59 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
61 // The PullHandler and TrashHandler process "PUT /pull" and "PUT
62 // /trash" requests from Data Manager. These requests instruct
63 // Keep to replicate or delete blocks; see
64 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
67 // Each handler parses the JSON list of block management requests
68 // in the message body, and replaces any existing pull queue or
69 // trash queue with their contentes.
71 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
72 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
74 // Any request which does not match any of these routes gets
76 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
81 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
82 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
85 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
86 hash := mux.Vars(req)["hash"]
88 hints := mux.Vars(req)["hints"]
90 // Parse the locator string and hints from the request.
91 // TODO(twp): implement a Locator type.
92 var signature, timestamp string
94 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
95 for _, hint := range strings.Split(hints, "+") {
96 if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
97 // Server ignores size hints
98 } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
101 } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
102 // Any unknown hint that starts with an uppercase letter is
103 // presumed to be valid and ignored, to permit forward compatibility.
105 // Unknown format; not a valid locator.
106 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
112 // If permission checking is in effect, verify this
113 // request's permission signature.
114 if enforce_permissions {
115 if signature == "" || timestamp == "" {
116 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
118 } else if IsExpired(timestamp) {
119 http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
122 req_locator := req.URL.Path[1:] // strip leading slash
123 if !VerifySignature(req_locator, GetApiToken(req)) {
124 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
130 block, err := GetBlock(hash, false)
132 // Garbage collect after each GET. Fixes #2865.
133 // TODO(twp): review Keep memory usage and see if there's
134 // a better way to do this than blindly garbage collecting
135 // after every block.
139 // This type assertion is safe because the only errors
140 // GetBlock can return are DiskHashError or NotFoundError.
141 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
145 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
147 _, err = resp.Write(block)
152 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
153 // Garbage collect after each PUT. Fixes #2865.
154 // See also GetBlockHandler.
157 hash := mux.Vars(req)["hash"]
159 // Detect as many error conditions as possible before reading
160 // the body: avoid transmitting data that will not end up
161 // being written anyway.
163 if req.ContentLength == -1 {
164 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
168 if req.ContentLength > BLOCKSIZE {
169 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
173 if len(KeepVM.AllWritable()) == 0 {
174 http.Error(resp, FullError.Error(), FullError.HTTPCode)
178 buf := make([]byte, req.ContentLength)
179 nread, err := io.ReadFull(req.Body, buf)
181 http.Error(resp, err.Error(), 500)
183 } else if int64(nread) < req.ContentLength {
184 http.Error(resp, "request truncated", 500)
188 err = PutBlock(buf, hash)
190 ke := err.(*KeepError)
191 http.Error(resp, ke.Error(), ke.HTTPCode)
195 // Success; add a size hint, sign the locator if possible, and
196 // return it to the client.
197 return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
198 api_token := GetApiToken(req)
199 if PermissionSecret != nil && api_token != "" {
200 expiry := time.Now().Add(blob_signature_ttl)
201 return_hash = SignLocator(return_hash, api_token, expiry)
203 resp.Write([]byte(return_hash + "\n"))
207 // A HandleFunc to address /index and /index/{prefix} requests.
209 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
210 // Reject unauthorized requests.
211 if !IsDataManagerToken(GetApiToken(req)) {
212 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
216 prefix := mux.Vars(req)["prefix"]
218 for _, vol := range KeepVM.AllReadable() {
219 if err := vol.IndexTo(prefix, resp); err != nil {
220 // The only errors returned by IndexTo are
221 // write errors returned by resp.Write(),
222 // which probably means the client has
223 // disconnected and this error will never be
224 // reported to the client -- but it will
225 // appear in our own error log.
226 http.Error(resp, err.Error(), http.StatusInternalServerError)
233 // Responds to /status.json requests with the current node status,
234 // described in a JSON structure.
236 // The data given in a status.json response includes:
237 // volumes - a list of Keep volumes currently in use by this server
238 // each volume is an object with the following fields:
240 // * device_num (an integer identifying the underlying filesystem)
244 type VolumeStatus struct {
245 MountPoint string `json:"mount_point"`
246 DeviceNum uint64 `json:"device_num"`
247 BytesFree uint64 `json:"bytes_free"`
248 BytesUsed uint64 `json:"bytes_used"`
251 type NodeStatus struct {
252 Volumes []*VolumeStatus `json:"volumes"`
255 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
256 st := GetNodeStatus()
257 if jstat, err := json.Marshal(st); err == nil {
260 log.Printf("json.Marshal: %s\n", err)
261 log.Printf("NodeStatus = %v\n", st)
262 http.Error(resp, err.Error(), 500)
267 // Returns a NodeStatus struct describing this Keep
268 // node's current status.
270 func GetNodeStatus() *NodeStatus {
271 st := new(NodeStatus)
273 st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
274 for i, vol := range KeepVM.AllReadable() {
275 st.Volumes[i] = vol.Status()
281 // Returns a VolumeStatus describing the requested volume.
283 func GetVolumeStatus(volume string) *VolumeStatus {
284 var fs syscall.Statfs_t
287 if fi, err := os.Stat(volume); err == nil {
288 devnum = fi.Sys().(*syscall.Stat_t).Dev
290 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
294 err := syscall.Statfs(volume, &fs)
296 log.Printf("GetVolumeStatus: statfs: %s\n", err)
299 // These calculations match the way df calculates disk usage:
300 // "free" space is measured by fs.Bavail, but "used" space
301 // uses fs.Blocks - fs.Bfree.
302 free := fs.Bavail * uint64(fs.Bsize)
303 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
304 return &VolumeStatus{volume, devnum, free, used}
307 // DeleteHandler processes DELETE requests.
309 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
310 // from all connected volumes.
312 // Only the Data Manager, or an Arvados admin with scope "all", are
313 // allowed to issue DELETE requests. If a DELETE request is not
314 // authenticated or is issued by a non-admin user, the server returns
315 // a PermissionError.
317 // Upon receiving a valid request from an authorized user,
318 // DeleteHandler deletes all copies of the specified block on local
323 // If the requested blocks was not found on any volume, the response
324 // code is HTTP 404 Not Found.
326 // Otherwise, the response code is 200 OK, with a response body
327 // consisting of the JSON message
329 // {"copies_deleted":d,"copies_failed":f}
331 // where d and f are integers representing the number of blocks that
332 // were successfully and unsuccessfully deleted.
334 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
335 hash := mux.Vars(req)["hash"]
337 // Confirm that this user is an admin and has a token with unlimited scope.
338 var tok = GetApiToken(req)
339 if tok == "" || !CanDelete(tok) {
340 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
345 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
349 // Delete copies of this block from all available volumes.
350 // Report how many blocks were successfully deleted, and how
351 // many were found on writable volumes but not deleted.
353 Deleted int `json:"copies_deleted"`
354 Failed int `json:"copies_failed"`
356 for _, vol := range KeepVM.AllWritable() {
357 if err := vol.Delete(hash); err == nil {
359 } else if os.IsNotExist(err) {
363 log.Println("DeleteHandler:", err)
369 if result.Deleted == 0 && result.Failed == 0 {
370 st = http.StatusNotFound
377 if st == http.StatusOK {
378 if body, err := json.Marshal(result); err == nil {
381 log.Printf("json.Marshal: %s (result = %v)\n", err, result)
382 http.Error(resp, err.Error(), 500)
387 /* PullHandler processes "PUT /pull" requests for the data manager.
388 The request body is a JSON message containing a list of pull
389 requests in the following format:
393 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
395 "keep0.qr1hi.arvadosapi.com:25107",
396 "keep1.qr1hi.arvadosapi.com:25108"
400 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
410 Each pull request in the list consists of a block locator string
411 and an ordered list of servers. Keepstore should try to fetch the
412 block from each server in turn.
414 If the request has not been sent by the Data Manager, return 401
417 If the JSON unmarshalling fails, return 400 Bad Request.
420 type PullRequest struct {
421 Locator string `json:"locator"`
422 Servers []string `json:"servers"`
425 func PullHandler(resp http.ResponseWriter, req *http.Request) {
426 // Reject unauthorized requests.
427 if !IsDataManagerToken(GetApiToken(req)) {
428 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
432 // Parse the request body.
434 r := json.NewDecoder(req.Body)
435 if err := r.Decode(&pr); err != nil {
436 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
440 // We have a properly formatted pull list sent from the data
441 // manager. Report success and send the list to the pull list
442 // manager for further handling.
443 resp.WriteHeader(http.StatusOK)
445 fmt.Sprintf("Received %d pull requests\n", len(pr))))
448 for _, p := range pr {
451 pullq.ReplaceQueue(plist)
454 type TrashRequest struct {
455 Locator string `json:"locator"`
456 BlockMtime int64 `json:"block_mtime"`
459 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
460 // Reject unauthorized requests.
461 if !IsDataManagerToken(GetApiToken(req)) {
462 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
466 // Parse the request body.
467 var trash []TrashRequest
468 r := json.NewDecoder(req.Body)
469 if err := r.Decode(&trash); err != nil {
470 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
474 // We have a properly formatted trash list sent from the data
475 // manager. Report success and send the list to the trash work
476 // queue for further handling.
477 resp.WriteHeader(http.StatusOK)
479 fmt.Sprintf("Received %d trash requests\n", len(trash))))
482 for _, t := range trash {
485 trashq.ReplaceQueue(tlist)
488 // ==============================
489 // GetBlock and PutBlock implement lower-level code for handling
490 // blocks by rooting through volumes connected to the local machine.
491 // Once the handler has determined that system policy permits the
492 // request, it calls these methods to perform the actual operation.
494 // TODO(twp): this code would probably be better located in the
495 // VolumeManager interface. As an abstraction, the VolumeManager
496 // should be the only part of the code that cares about which volume a
497 // block is stored on, so it should be responsible for figuring out
498 // which volume to check for fetching blocks, storing blocks, etc.
500 // ==============================
501 // GetBlock fetches and returns the block identified by "hash". If
502 // the update_timestamp argument is true, GetBlock also updates the
503 // block's file modification time (for the sake of PutBlock, which
504 // must update the file's timestamp when the block already exists).
506 // On success, GetBlock returns a byte slice with the block data, and
509 // If the block cannot be found on any volume, returns NotFoundError.
511 // If the block found does not have the correct MD5 hash, returns
515 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
516 // Attempt to read the requested hash from a keep volume.
517 error_to_caller := NotFoundError
520 if update_timestamp {
521 // Pointless to find the block on an unwritable volume
522 // because Touch() will fail -- this is as good as
523 // "not found" for purposes of callers who need to
525 vols = KeepVM.AllWritable()
527 vols = KeepVM.AllReadable()
530 for _, vol := range vols {
531 buf, err := vol.Get(hash)
533 // IsNotExist is an expected error and may be
534 // ignored. All other errors are logged. In
535 // any case we continue trying to read other
536 // volumes. If all volumes report IsNotExist,
537 // we return a NotFoundError.
538 if !os.IsNotExist(err) {
539 log.Printf("GetBlock: reading %s: %s\n", hash, err)
543 // Check the file checksum.
545 filehash := fmt.Sprintf("%x", md5.Sum(buf))
546 if filehash != hash {
547 // TODO: Try harder to tell a sysadmin about
549 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
551 error_to_caller = DiskHashError
554 if error_to_caller == DiskHashError {
555 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
558 if update_timestamp {
559 if err := vol.Touch(hash); err != nil {
560 error_to_caller = GenericError
561 log.Printf("%s: Touch %s failed: %s",
562 vol, hash, error_to_caller)
568 return nil, error_to_caller
571 /* PutBlock(block, hash)
572 Stores the BLOCK (identified by the content id HASH) in Keep.
574 The MD5 checksum of the block must be identical to the content id HASH.
575 If not, an error is returned.
577 PutBlock stores the BLOCK on the first Keep volume with free space.
578 A failure code is returned to the user only if all volumes fail.
580 On success, PutBlock returns nil.
581 On failure, it returns a KeepError with one of the following codes:
584 A different block with the same hash already exists on this
587 The MD5 hash of the BLOCK does not match the argument HASH.
589 There was not enough space left in any Keep volume to store
592 The object could not be stored for some other reason (e.g.
593 all writes failed). The text of the error message should
594 provide as much detail as possible.
597 func PutBlock(block []byte, hash string) error {
598 // Check that BLOCK's checksum matches HASH.
599 blockhash := fmt.Sprintf("%x", md5.Sum(block))
600 if blockhash != hash {
601 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
602 return RequestHashError
605 // If we already have a block on disk under this identifier, return
606 // success (but check for MD5 collisions). While fetching the block,
607 // update its timestamp.
608 // The only errors that GetBlock can return are DiskHashError and NotFoundError.
609 // In either case, we want to write our new (good) block to disk,
610 // so there is nothing special to do if err != nil.
612 if oldblock, err := GetBlock(hash, true); err == nil {
613 if bytes.Compare(block, oldblock) == 0 {
614 // The block already exists; return success.
617 return CollisionError
621 // Choose a Keep volume to write to.
622 // If this volume fails, try all of the volumes in order.
623 if vol := KeepVM.NextWritable(); vol != nil {
624 if err := vol.Put(hash, block); err == nil {
625 return nil // success!
629 writables := KeepVM.AllWritable()
630 if len(writables) == 0 {
631 log.Print("No writable volumes.")
636 for _, vol := range writables {
637 err := vol.Put(hash, block)
639 return nil // success!
641 if err != FullError {
642 // The volume is not full but the
643 // write did not succeed. Report the
644 // error and continue trying.
646 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
651 log.Print("All volumes are full.")
654 // Already logged the non-full errors.
660 // Return true if the specified string is a valid Keep locator.
661 // When Keep is extended to support hash types other than MD5,
662 // this should be updated to cover those as well.
664 func IsValidLocator(loc string) bool {
665 match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
669 log.Printf("IsValidLocator: %s\n", err)
673 // GetApiToken returns the OAuth2 token from the Authorization
674 // header of a HTTP request, or an empty string if no matching
676 func GetApiToken(req *http.Request) string {
677 if auth, ok := req.Header["Authorization"]; ok {
678 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
680 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
687 // IsExpired returns true if the given Unix timestamp (expressed as a
688 // hexadecimal string) is in the past, or if timestamp_hex cannot be
689 // parsed as a hexadecimal string.
690 func IsExpired(timestamp_hex string) bool {
691 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
693 log.Printf("IsExpired: %s\n", err)
696 return time.Unix(ts, 0).Before(time.Now())
699 // CanDelete returns true if the user identified by api_token is
700 // allowed to delete blocks.
701 func CanDelete(api_token string) bool {
705 // Blocks may be deleted only when Keep has been configured with a
707 if IsDataManagerToken(api_token) {
710 // TODO(twp): look up api_token with the API server
711 // return true if is_admin is true and if the token
712 // has unlimited scope
716 // IsDataManagerToken returns true if api_token represents the data
718 func IsDataManagerToken(api_token string) bool {
719 return data_manager_token != "" && api_token == data_manager_token