3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
16 "github.com/gorilla/mux"
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
32 func MakeRESTRouter() *mux.Router {
33 rest := mux.NewRouter()
36 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
38 `/{hash:[0-9a-f]{32}}+{hints}`,
39 GetBlockHandler).Methods("GET", "HEAD")
41 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
44 // For IndexHandler we support:
45 // /index - returns all locators
46 // /index/{prefix} - returns all locators that begin with {prefix}
47 // {prefix} is a string of hexadecimal digits between 0 and 32 digits.
48 // If {prefix} is the empty string, return an index of all locators
49 // (so /index and /index/ behave identically)
50 // A client may supply a full 32-digit locator string, in which
51 // case the server will return an index with either zero or one
52 // entries. This usage allows a client to check whether a block is
53 // present, and its size and upload time, without retrieving the
56 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
58 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
59 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
61 // The PullHandler and TrashHandler process "PUT /pull" and "PUT
62 // /trash" requests from Data Manager. These requests instruct
63 // Keep to replicate or delete blocks; see
64 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
67 // Each handler parses the JSON list of block management requests
68 // in the message body, and replaces any existing pull queue or
69 // trash queue with their contentes.
71 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
72 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
74 // Any request which does not match any of these routes gets
76 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
81 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
82 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
85 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
86 hash := mux.Vars(req)["hash"]
88 hints := mux.Vars(req)["hints"]
90 // Parse the locator string and hints from the request.
91 // TODO(twp): implement a Locator type.
92 var signature, timestamp string
94 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
95 for _, hint := range strings.Split(hints, "+") {
96 if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
97 // Server ignores size hints
98 } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
101 } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
102 // Any unknown hint that starts with an uppercase letter is
103 // presumed to be valid and ignored, to permit forward compatibility.
105 // Unknown format; not a valid locator.
106 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
112 // If permission checking is in effect, verify this
113 // request's permission signature.
114 if enforce_permissions {
115 if signature == "" || timestamp == "" {
116 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
118 } else if IsExpired(timestamp) {
119 http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
122 req_locator := req.URL.Path[1:] // strip leading slash
123 if !VerifySignature(req_locator, GetApiToken(req)) {
124 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
130 block, err := GetBlock(hash, false)
132 // Garbage collect after each GET. Fixes #2865.
133 // TODO(twp): review Keep memory usage and see if there's
134 // a better way to do this than blindly garbage collecting
135 // after every block.
139 // This type assertion is safe because the only errors
140 // GetBlock can return are DiskHashError or NotFoundError.
141 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
145 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
147 _, err = resp.Write(block)
152 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
153 // Garbage collect after each PUT. Fixes #2865.
154 // See also GetBlockHandler.
157 hash := mux.Vars(req)["hash"]
159 // Detect as many error conditions as possible before reading
160 // the body: avoid transmitting data that will not end up
161 // being written anyway.
163 if req.ContentLength > BLOCKSIZE {
164 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
168 if len(KeepVM.AllWritable()) == 0 {
169 http.Error(resp, FullError.Error(), FullError.HTTPCode)
173 buf := make([]byte, req.ContentLength)
174 nread, err := io.ReadFull(req.Body, buf)
176 http.Error(resp, err.Error(), 500)
178 } else if int64(nread) < req.ContentLength {
179 http.Error(resp, "request truncated", 500)
183 err = PutBlock(buf, hash)
185 ke := err.(*KeepError)
186 http.Error(resp, ke.Error(), ke.HTTPCode)
190 // Success; add a size hint, sign the locator if possible, and
191 // return it to the client.
192 return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
193 api_token := GetApiToken(req)
194 if PermissionSecret != nil && api_token != "" {
195 expiry := time.Now().Add(permission_ttl)
196 return_hash = SignLocator(return_hash, api_token, expiry)
198 resp.Write([]byte(return_hash + "\n"))
202 // A HandleFunc to address /index and /index/{prefix} requests.
204 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
205 // Reject unauthorized requests.
206 if !IsDataManagerToken(GetApiToken(req)) {
207 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
211 prefix := mux.Vars(req)["prefix"]
214 for _, vol := range KeepVM.AllReadable() {
215 index = index + vol.Index(prefix)
217 resp.Write([]byte(index))
221 // Responds to /status.json requests with the current node status,
222 // described in a JSON structure.
224 // The data given in a status.json response includes:
225 // volumes - a list of Keep volumes currently in use by this server
226 // each volume is an object with the following fields:
228 // * device_num (an integer identifying the underlying filesystem)
232 type VolumeStatus struct {
233 MountPoint string `json:"mount_point"`
234 DeviceNum uint64 `json:"device_num"`
235 BytesFree uint64 `json:"bytes_free"`
236 BytesUsed uint64 `json:"bytes_used"`
239 type NodeStatus struct {
240 Volumes []*VolumeStatus `json:"volumes"`
243 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
244 st := GetNodeStatus()
245 if jstat, err := json.Marshal(st); err == nil {
248 log.Printf("json.Marshal: %s\n", err)
249 log.Printf("NodeStatus = %v\n", st)
250 http.Error(resp, err.Error(), 500)
255 // Returns a NodeStatus struct describing this Keep
256 // node's current status.
258 func GetNodeStatus() *NodeStatus {
259 st := new(NodeStatus)
261 st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
262 for i, vol := range KeepVM.AllReadable() {
263 st.Volumes[i] = vol.Status()
269 // Returns a VolumeStatus describing the requested volume.
271 func GetVolumeStatus(volume string) *VolumeStatus {
272 var fs syscall.Statfs_t
275 if fi, err := os.Stat(volume); err == nil {
276 devnum = fi.Sys().(*syscall.Stat_t).Dev
278 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
282 err := syscall.Statfs(volume, &fs)
284 log.Printf("GetVolumeStatus: statfs: %s\n", err)
287 // These calculations match the way df calculates disk usage:
288 // "free" space is measured by fs.Bavail, but "used" space
289 // uses fs.Blocks - fs.Bfree.
290 free := fs.Bavail * uint64(fs.Bsize)
291 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
292 return &VolumeStatus{volume, devnum, free, used}
295 // DeleteHandler processes DELETE requests.
297 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
298 // from all connected volumes.
300 // Only the Data Manager, or an Arvados admin with scope "all", are
301 // allowed to issue DELETE requests. If a DELETE request is not
302 // authenticated or is issued by a non-admin user, the server returns
303 // a PermissionError.
305 // Upon receiving a valid request from an authorized user,
306 // DeleteHandler deletes all copies of the specified block on local
311 // If the requested blocks was not found on any volume, the response
312 // code is HTTP 404 Not Found.
314 // Otherwise, the response code is 200 OK, with a response body
315 // consisting of the JSON message
317 // {"copies_deleted":d,"copies_failed":f}
319 // where d and f are integers representing the number of blocks that
320 // were successfully and unsuccessfully deleted.
322 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
323 hash := mux.Vars(req)["hash"]
325 // Confirm that this user is an admin and has a token with unlimited scope.
326 var tok = GetApiToken(req)
327 if tok == "" || !CanDelete(tok) {
328 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
333 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
337 // Delete copies of this block from all available volumes.
338 // Report how many blocks were successfully deleted, and how
339 // many were found on writable volumes but not deleted.
341 Deleted int `json:"copies_deleted"`
342 Failed int `json:"copies_failed"`
344 for _, vol := range KeepVM.AllWritable() {
345 if err := vol.Delete(hash); err == nil {
347 } else if os.IsNotExist(err) {
351 log.Println("DeleteHandler:", err)
357 if result.Deleted == 0 && result.Failed == 0 {
358 st = http.StatusNotFound
365 if st == http.StatusOK {
366 if body, err := json.Marshal(result); err == nil {
369 log.Printf("json.Marshal: %s (result = %v)\n", err, result)
370 http.Error(resp, err.Error(), 500)
375 /* PullHandler processes "PUT /pull" requests for the data manager.
376 The request body is a JSON message containing a list of pull
377 requests in the following format:
381 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
383 "keep0.qr1hi.arvadosapi.com:25107",
384 "keep1.qr1hi.arvadosapi.com:25108"
388 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
398 Each pull request in the list consists of a block locator string
399 and an ordered list of servers. Keepstore should try to fetch the
400 block from each server in turn.
402 If the request has not been sent by the Data Manager, return 401
405 If the JSON unmarshalling fails, return 400 Bad Request.
408 type PullRequest struct {
409 Locator string `json:"locator"`
410 Servers []string `json:"servers"`
413 func PullHandler(resp http.ResponseWriter, req *http.Request) {
414 // Reject unauthorized requests.
415 if !IsDataManagerToken(GetApiToken(req)) {
416 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
420 // Parse the request body.
422 r := json.NewDecoder(req.Body)
423 if err := r.Decode(&pr); err != nil {
424 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
428 // We have a properly formatted pull list sent from the data
429 // manager. Report success and send the list to the pull list
430 // manager for further handling.
431 resp.WriteHeader(http.StatusOK)
433 fmt.Sprintf("Received %d pull requests\n", len(pr))))
436 for _, p := range pr {
439 pullq.ReplaceQueue(plist)
442 type TrashRequest struct {
443 Locator string `json:"locator"`
444 BlockMtime int64 `json:"block_mtime"`
447 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
448 // Reject unauthorized requests.
449 if !IsDataManagerToken(GetApiToken(req)) {
450 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
454 // Parse the request body.
455 var trash []TrashRequest
456 r := json.NewDecoder(req.Body)
457 if err := r.Decode(&trash); err != nil {
458 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
462 // We have a properly formatted trash list sent from the data
463 // manager. Report success and send the list to the trash work
464 // queue for further handling.
465 resp.WriteHeader(http.StatusOK)
467 fmt.Sprintf("Received %d trash requests\n", len(trash))))
470 for _, t := range trash {
473 trashq.ReplaceQueue(tlist)
476 // ==============================
477 // GetBlock and PutBlock implement lower-level code for handling
478 // blocks by rooting through volumes connected to the local machine.
479 // Once the handler has determined that system policy permits the
480 // request, it calls these methods to perform the actual operation.
482 // TODO(twp): this code would probably be better located in the
483 // VolumeManager interface. As an abstraction, the VolumeManager
484 // should be the only part of the code that cares about which volume a
485 // block is stored on, so it should be responsible for figuring out
486 // which volume to check for fetching blocks, storing blocks, etc.
488 // ==============================
489 // GetBlock fetches and returns the block identified by "hash". If
490 // the update_timestamp argument is true, GetBlock also updates the
491 // block's file modification time (for the sake of PutBlock, which
492 // must update the file's timestamp when the block already exists).
494 // On success, GetBlock returns a byte slice with the block data, and
497 // If the block cannot be found on any volume, returns NotFoundError.
499 // If the block found does not have the correct MD5 hash, returns
503 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
504 // Attempt to read the requested hash from a keep volume.
505 error_to_caller := NotFoundError
508 if update_timestamp {
509 // Pointless to find the block on an unwritable volume
510 // because Touch() will fail -- this is as good as
511 // "not found" for purposes of callers who need to
513 vols = KeepVM.AllWritable()
515 vols = KeepVM.AllReadable()
518 for _, vol := range vols {
519 buf, err := vol.Get(hash)
521 // IsNotExist is an expected error and may be
522 // ignored. All other errors are logged. In
523 // any case we continue trying to read other
524 // volumes. If all volumes report IsNotExist,
525 // we return a NotFoundError.
526 if !os.IsNotExist(err) {
527 log.Printf("GetBlock: reading %s: %s\n", hash, err)
531 // Check the file checksum.
533 filehash := fmt.Sprintf("%x", md5.Sum(buf))
534 if filehash != hash {
535 // TODO: Try harder to tell a sysadmin about
537 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
539 error_to_caller = DiskHashError
542 if error_to_caller == DiskHashError {
543 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
546 if update_timestamp {
547 if err := vol.Touch(hash); err != nil {
548 error_to_caller = GenericError
549 log.Printf("%s: Touch %s failed: %s",
550 vol, hash, error_to_caller)
556 return nil, error_to_caller
559 /* PutBlock(block, hash)
560 Stores the BLOCK (identified by the content id HASH) in Keep.
562 The MD5 checksum of the block must be identical to the content id HASH.
563 If not, an error is returned.
565 PutBlock stores the BLOCK on the first Keep volume with free space.
566 A failure code is returned to the user only if all volumes fail.
568 On success, PutBlock returns nil.
569 On failure, it returns a KeepError with one of the following codes:
572 A different block with the same hash already exists on this
575 The MD5 hash of the BLOCK does not match the argument HASH.
577 There was not enough space left in any Keep volume to store
580 The object could not be stored for some other reason (e.g.
581 all writes failed). The text of the error message should
582 provide as much detail as possible.
585 func PutBlock(block []byte, hash string) error {
586 // Check that BLOCK's checksum matches HASH.
587 blockhash := fmt.Sprintf("%x", md5.Sum(block))
588 if blockhash != hash {
589 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
590 return RequestHashError
593 // If we already have a block on disk under this identifier, return
594 // success (but check for MD5 collisions). While fetching the block,
595 // update its timestamp.
596 // The only errors that GetBlock can return are DiskHashError and NotFoundError.
597 // In either case, we want to write our new (good) block to disk,
598 // so there is nothing special to do if err != nil.
600 if oldblock, err := GetBlock(hash, true); err == nil {
601 if bytes.Compare(block, oldblock) == 0 {
602 // The block already exists; return success.
605 return CollisionError
609 // Choose a Keep volume to write to.
610 // If this volume fails, try all of the volumes in order.
611 if vol := KeepVM.NextWritable(); vol != nil {
612 if err := vol.Put(hash, block); err == nil {
613 return nil // success!
617 writables := KeepVM.AllWritable()
618 if len(writables) == 0 {
619 log.Print("No writable volumes.")
624 for _, vol := range writables {
625 err := vol.Put(hash, block)
627 return nil // success!
629 if err != FullError {
630 // The volume is not full but the
631 // write did not succeed. Report the
632 // error and continue trying.
634 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
639 log.Print("All volumes are full.")
642 // Already logged the non-full errors.
648 // Return true if the specified string is a valid Keep locator.
649 // When Keep is extended to support hash types other than MD5,
650 // this should be updated to cover those as well.
652 func IsValidLocator(loc string) bool {
653 match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
657 log.Printf("IsValidLocator: %s\n", err)
661 // GetApiToken returns the OAuth2 token from the Authorization
662 // header of a HTTP request, or an empty string if no matching
664 func GetApiToken(req *http.Request) string {
665 if auth, ok := req.Header["Authorization"]; ok {
666 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
668 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
675 // IsExpired returns true if the given Unix timestamp (expressed as a
676 // hexadecimal string) is in the past, or if timestamp_hex cannot be
677 // parsed as a hexadecimal string.
678 func IsExpired(timestamp_hex string) bool {
679 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
681 log.Printf("IsExpired: %s\n", err)
684 return time.Unix(ts, 0).Before(time.Now())
687 // CanDelete returns true if the user identified by api_token is
688 // allowed to delete blocks.
689 func CanDelete(api_token string) bool {
693 // Blocks may be deleted only when Keep has been configured with a
695 if IsDataManagerToken(api_token) {
698 // TODO(twp): look up api_token with the API server
699 // return true if is_admin is true and if the token
700 // has unlimited scope
704 // IsDataManagerToken returns true if api_token represents the data
706 func IsDataManagerToken(api_token string) bool {
707 return data_manager_token != "" && api_token == data_manager_token