3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
16 "github.com/gorilla/mux"
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
32 func MakeRESTRouter() *mux.Router {
33 rest := mux.NewRouter()
36 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
38 `/{hash:[0-9a-f]{32}}+{hints}`,
39 GetBlockHandler).Methods("GET", "HEAD")
41 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
44 // For IndexHandler we support:
45 // /index - returns all locators
46 // /index/{prefix} - returns all locators that begin with {prefix}
47 // {prefix} is a string of hexadecimal digits between 0 and 32 digits.
48 // If {prefix} is the empty string, return an index of all locators
49 // (so /index and /index/ behave identically)
50 // A client may supply a full 32-digit locator string, in which
51 // case the server will return an index with either zero or one
52 // entries. This usage allows a client to check whether a block is
53 // present, and its size and upload time, without retrieving the
56 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
58 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
59 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
61 // The PullHandler and TrashHandler process "PUT /pull" and "PUT
62 // /trash" requests from Data Manager. These requests instruct
63 // Keep to replicate or delete blocks; see
64 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
67 // Each handler parses the JSON list of block management requests
68 // in the message body, and replaces any existing pull queue or
69 // trash queue with their contentes.
71 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
72 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
74 // Any request which does not match any of these routes gets
76 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
81 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
82 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
85 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
86 hash := mux.Vars(req)["hash"]
88 hints := mux.Vars(req)["hints"]
90 // Parse the locator string and hints from the request.
91 // TODO(twp): implement a Locator type.
92 var signature, timestamp string
94 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
95 for _, hint := range strings.Split(hints, "+") {
96 if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
97 // Server ignores size hints
98 } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
101 } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
102 // Any unknown hint that starts with an uppercase letter is
103 // presumed to be valid and ignored, to permit forward compatibility.
105 // Unknown format; not a valid locator.
106 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
112 // If permission checking is in effect, verify this
113 // request's permission signature.
114 if enforce_permissions {
115 if signature == "" || timestamp == "" {
116 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
118 } else if IsExpired(timestamp) {
119 http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
122 req_locator := req.URL.Path[1:] // strip leading slash
123 if !VerifySignature(req_locator, GetApiToken(req)) {
124 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
130 block, err := GetBlock(hash, false)
132 // Garbage collect after each GET. Fixes #2865.
133 // TODO(twp): review Keep memory usage and see if there's
134 // a better way to do this than blindly garbage collecting
135 // after every block.
139 // This type assertion is safe because the only errors
140 // GetBlock can return are DiskHashError or NotFoundError.
141 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
145 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
147 _, err = resp.Write(block)
152 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
153 // Garbage collect after each PUT. Fixes #2865.
154 // See also GetBlockHandler.
157 hash := mux.Vars(req)["hash"]
159 // Detect as many error conditions as possible before reading
160 // the body: avoid transmitting data that will not end up
161 // being written anyway.
163 if req.ContentLength == -1 {
164 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
168 if req.ContentLength > BLOCKSIZE {
169 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
173 if len(KeepVM.AllWritable()) == 0 {
174 http.Error(resp, FullError.Error(), FullError.HTTPCode)
178 buf := make([]byte, req.ContentLength)
179 nread, err := io.ReadFull(req.Body, buf)
181 http.Error(resp, err.Error(), 500)
183 } else if int64(nread) < req.ContentLength {
184 http.Error(resp, "request truncated", 500)
188 err = PutBlock(buf, hash)
190 ke := err.(*KeepError)
191 http.Error(resp, ke.Error(), ke.HTTPCode)
195 // Success; add a size hint, sign the locator if possible, and
196 // return it to the client.
197 return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
198 api_token := GetApiToken(req)
199 if PermissionSecret != nil && api_token != "" {
200 expiry := time.Now().Add(permission_ttl)
201 return_hash = SignLocator(return_hash, api_token, expiry)
203 resp.Write([]byte(return_hash + "\n"))
207 // A HandleFunc to address /index and /index/{prefix} requests.
209 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
210 // Reject unauthorized requests.
211 if !IsDataManagerToken(GetApiToken(req)) {
212 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
216 prefix := mux.Vars(req)["prefix"]
219 for _, vol := range KeepVM.AllReadable() {
220 index = index + vol.Index(prefix)
222 resp.Write([]byte(index))
226 // Responds to /status.json requests with the current node status,
227 // described in a JSON structure.
229 // The data given in a status.json response includes:
230 // volumes - a list of Keep volumes currently in use by this server
231 // each volume is an object with the following fields:
233 // * device_num (an integer identifying the underlying filesystem)
237 type VolumeStatus struct {
238 MountPoint string `json:"mount_point"`
239 DeviceNum uint64 `json:"device_num"`
240 BytesFree uint64 `json:"bytes_free"`
241 BytesUsed uint64 `json:"bytes_used"`
244 type NodeStatus struct {
245 Volumes []*VolumeStatus `json:"volumes"`
248 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
249 st := GetNodeStatus()
250 if jstat, err := json.Marshal(st); err == nil {
253 log.Printf("json.Marshal: %s\n", err)
254 log.Printf("NodeStatus = %v\n", st)
255 http.Error(resp, err.Error(), 500)
260 // Returns a NodeStatus struct describing this Keep
261 // node's current status.
263 func GetNodeStatus() *NodeStatus {
264 st := new(NodeStatus)
266 st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
267 for i, vol := range KeepVM.AllReadable() {
268 st.Volumes[i] = vol.Status()
274 // Returns a VolumeStatus describing the requested volume.
276 func GetVolumeStatus(volume string) *VolumeStatus {
277 var fs syscall.Statfs_t
280 if fi, err := os.Stat(volume); err == nil {
281 devnum = fi.Sys().(*syscall.Stat_t).Dev
283 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
287 err := syscall.Statfs(volume, &fs)
289 log.Printf("GetVolumeStatus: statfs: %s\n", err)
292 // These calculations match the way df calculates disk usage:
293 // "free" space is measured by fs.Bavail, but "used" space
294 // uses fs.Blocks - fs.Bfree.
295 free := fs.Bavail * uint64(fs.Bsize)
296 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
297 return &VolumeStatus{volume, devnum, free, used}
300 // DeleteHandler processes DELETE requests.
302 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
303 // from all connected volumes.
305 // Only the Data Manager, or an Arvados admin with scope "all", are
306 // allowed to issue DELETE requests. If a DELETE request is not
307 // authenticated or is issued by a non-admin user, the server returns
308 // a PermissionError.
310 // Upon receiving a valid request from an authorized user,
311 // DeleteHandler deletes all copies of the specified block on local
316 // If the requested blocks was not found on any volume, the response
317 // code is HTTP 404 Not Found.
319 // Otherwise, the response code is 200 OK, with a response body
320 // consisting of the JSON message
322 // {"copies_deleted":d,"copies_failed":f}
324 // where d and f are integers representing the number of blocks that
325 // were successfully and unsuccessfully deleted.
327 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
328 hash := mux.Vars(req)["hash"]
330 // Confirm that this user is an admin and has a token with unlimited scope.
331 var tok = GetApiToken(req)
332 if tok == "" || !CanDelete(tok) {
333 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
338 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
342 // Delete copies of this block from all available volumes.
343 // Report how many blocks were successfully deleted, and how
344 // many were found on writable volumes but not deleted.
346 Deleted int `json:"copies_deleted"`
347 Failed int `json:"copies_failed"`
349 for _, vol := range KeepVM.AllWritable() {
350 if err := vol.Delete(hash); err == nil {
352 } else if os.IsNotExist(err) {
356 log.Println("DeleteHandler:", err)
362 if result.Deleted == 0 && result.Failed == 0 {
363 st = http.StatusNotFound
370 if st == http.StatusOK {
371 if body, err := json.Marshal(result); err == nil {
374 log.Printf("json.Marshal: %s (result = %v)\n", err, result)
375 http.Error(resp, err.Error(), 500)
380 /* PullHandler processes "PUT /pull" requests for the data manager.
381 The request body is a JSON message containing a list of pull
382 requests in the following format:
386 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
388 "keep0.qr1hi.arvadosapi.com:25107",
389 "keep1.qr1hi.arvadosapi.com:25108"
393 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
403 Each pull request in the list consists of a block locator string
404 and an ordered list of servers. Keepstore should try to fetch the
405 block from each server in turn.
407 If the request has not been sent by the Data Manager, return 401
410 If the JSON unmarshalling fails, return 400 Bad Request.
413 type PullRequest struct {
414 Locator string `json:"locator"`
415 Servers []string `json:"servers"`
418 func PullHandler(resp http.ResponseWriter, req *http.Request) {
419 // Reject unauthorized requests.
420 if !IsDataManagerToken(GetApiToken(req)) {
421 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
425 // Parse the request body.
427 r := json.NewDecoder(req.Body)
428 if err := r.Decode(&pr); err != nil {
429 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
433 // We have a properly formatted pull list sent from the data
434 // manager. Report success and send the list to the pull list
435 // manager for further handling.
436 resp.WriteHeader(http.StatusOK)
438 fmt.Sprintf("Received %d pull requests\n", len(pr))))
441 for _, p := range pr {
444 pullq.ReplaceQueue(plist)
447 type TrashRequest struct {
448 Locator string `json:"locator"`
449 BlockMtime int64 `json:"block_mtime"`
452 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
453 // Reject unauthorized requests.
454 if !IsDataManagerToken(GetApiToken(req)) {
455 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
459 // Parse the request body.
460 var trash []TrashRequest
461 r := json.NewDecoder(req.Body)
462 if err := r.Decode(&trash); err != nil {
463 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
467 // We have a properly formatted trash list sent from the data
468 // manager. Report success and send the list to the trash work
469 // queue for further handling.
470 resp.WriteHeader(http.StatusOK)
472 fmt.Sprintf("Received %d trash requests\n", len(trash))))
475 for _, t := range trash {
478 trashq.ReplaceQueue(tlist)
481 // ==============================
482 // GetBlock and PutBlock implement lower-level code for handling
483 // blocks by rooting through volumes connected to the local machine.
484 // Once the handler has determined that system policy permits the
485 // request, it calls these methods to perform the actual operation.
487 // TODO(twp): this code would probably be better located in the
488 // VolumeManager interface. As an abstraction, the VolumeManager
489 // should be the only part of the code that cares about which volume a
490 // block is stored on, so it should be responsible for figuring out
491 // which volume to check for fetching blocks, storing blocks, etc.
493 // ==============================
494 // GetBlock fetches and returns the block identified by "hash". If
495 // the update_timestamp argument is true, GetBlock also updates the
496 // block's file modification time (for the sake of PutBlock, which
497 // must update the file's timestamp when the block already exists).
499 // On success, GetBlock returns a byte slice with the block data, and
502 // If the block cannot be found on any volume, returns NotFoundError.
504 // If the block found does not have the correct MD5 hash, returns
508 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
509 // Attempt to read the requested hash from a keep volume.
510 error_to_caller := NotFoundError
513 if update_timestamp {
514 // Pointless to find the block on an unwritable volume
515 // because Touch() will fail -- this is as good as
516 // "not found" for purposes of callers who need to
518 vols = KeepVM.AllWritable()
520 vols = KeepVM.AllReadable()
523 for _, vol := range vols {
524 buf, err := vol.Get(hash)
526 // IsNotExist is an expected error and may be
527 // ignored. All other errors are logged. In
528 // any case we continue trying to read other
529 // volumes. If all volumes report IsNotExist,
530 // we return a NotFoundError.
531 if !os.IsNotExist(err) {
532 log.Printf("GetBlock: reading %s: %s\n", hash, err)
536 // Check the file checksum.
538 filehash := fmt.Sprintf("%x", md5.Sum(buf))
539 if filehash != hash {
540 // TODO: Try harder to tell a sysadmin about
542 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
544 error_to_caller = DiskHashError
547 if error_to_caller == DiskHashError {
548 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
551 if update_timestamp {
552 if err := vol.Touch(hash); err != nil {
553 error_to_caller = GenericError
554 log.Printf("%s: Touch %s failed: %s",
555 vol, hash, error_to_caller)
561 return nil, error_to_caller
564 /* PutBlock(block, hash)
565 Stores the BLOCK (identified by the content id HASH) in Keep.
567 The MD5 checksum of the block must be identical to the content id HASH.
568 If not, an error is returned.
570 PutBlock stores the BLOCK on the first Keep volume with free space.
571 A failure code is returned to the user only if all volumes fail.
573 On success, PutBlock returns nil.
574 On failure, it returns a KeepError with one of the following codes:
577 A different block with the same hash already exists on this
580 The MD5 hash of the BLOCK does not match the argument HASH.
582 There was not enough space left in any Keep volume to store
585 The object could not be stored for some other reason (e.g.
586 all writes failed). The text of the error message should
587 provide as much detail as possible.
590 func PutBlock(block []byte, hash string) error {
591 // Check that BLOCK's checksum matches HASH.
592 blockhash := fmt.Sprintf("%x", md5.Sum(block))
593 if blockhash != hash {
594 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
595 return RequestHashError
598 // If we already have a block on disk under this identifier, return
599 // success (but check for MD5 collisions). While fetching the block,
600 // update its timestamp.
601 // The only errors that GetBlock can return are DiskHashError and NotFoundError.
602 // In either case, we want to write our new (good) block to disk,
603 // so there is nothing special to do if err != nil.
605 if oldblock, err := GetBlock(hash, true); err == nil {
606 if bytes.Compare(block, oldblock) == 0 {
607 // The block already exists; return success.
610 return CollisionError
614 // Choose a Keep volume to write to.
615 // If this volume fails, try all of the volumes in order.
616 if vol := KeepVM.NextWritable(); vol != nil {
617 if err := vol.Put(hash, block); err == nil {
618 return nil // success!
622 writables := KeepVM.AllWritable()
623 if len(writables) == 0 {
624 log.Print("No writable volumes.")
629 for _, vol := range writables {
630 err := vol.Put(hash, block)
632 return nil // success!
634 if err != FullError {
635 // The volume is not full but the
636 // write did not succeed. Report the
637 // error and continue trying.
639 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
644 log.Print("All volumes are full.")
647 // Already logged the non-full errors.
653 // Return true if the specified string is a valid Keep locator.
654 // When Keep is extended to support hash types other than MD5,
655 // this should be updated to cover those as well.
657 func IsValidLocator(loc string) bool {
658 match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
662 log.Printf("IsValidLocator: %s\n", err)
666 // GetApiToken returns the OAuth2 token from the Authorization
667 // header of a HTTP request, or an empty string if no matching
669 func GetApiToken(req *http.Request) string {
670 if auth, ok := req.Header["Authorization"]; ok {
671 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
673 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
680 // IsExpired returns true if the given Unix timestamp (expressed as a
681 // hexadecimal string) is in the past, or if timestamp_hex cannot be
682 // parsed as a hexadecimal string.
683 func IsExpired(timestamp_hex string) bool {
684 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
686 log.Printf("IsExpired: %s\n", err)
689 return time.Unix(ts, 0).Before(time.Now())
692 // CanDelete returns true if the user identified by api_token is
693 // allowed to delete blocks.
694 func CanDelete(api_token string) bool {
698 // Blocks may be deleted only when Keep has been configured with a
700 if IsDataManagerToken(api_token) {
703 // TODO(twp): look up api_token with the API server
704 // return true if is_admin is true and if the token
705 // has unlimited scope
709 // IsDataManagerToken returns true if api_token represents the data
711 func IsDataManagerToken(api_token string) bool {
712 return data_manager_token != "" && api_token == data_manager_token