3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
16 "github.com/gorilla/mux"
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
32 func MakeRESTRouter() *mux.Router {
33 rest := mux.NewRouter()
36 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
38 `/{hash:[0-9a-f]{32}}+{hints}`,
39 GetBlockHandler).Methods("GET", "HEAD")
41 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
44 // For IndexHandler we support:
45 // /index - returns all locators
46 // /index/{prefix} - returns all locators that begin with {prefix}
47 // {prefix} is a string of hexadecimal digits between 0 and 32 digits.
48 // If {prefix} is the empty string, return an index of all locators
49 // (so /index and /index/ behave identically)
50 // A client may supply a full 32-digit locator string, in which
51 // case the server will return an index with either zero or one
52 // entries. This usage allows a client to check whether a block is
53 // present, and its size and upload time, without retrieving the
56 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
58 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
59 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
61 // The PullHandler and TrashHandler process "PUT /pull" and "PUT
62 // /trash" requests from Data Manager. These requests instruct
63 // Keep to replicate or delete blocks; see
64 // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
67 // Each handler parses the JSON list of block management requests
68 // in the message body, and replaces any existing pull queue or
69 // trash queue with their contentes.
71 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
72 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
74 // Any request which does not match any of these routes gets
76 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
81 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
82 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
85 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
86 hash := mux.Vars(req)["hash"]
88 hints := mux.Vars(req)["hints"]
90 // Parse the locator string and hints from the request.
91 // TODO(twp): implement a Locator type.
92 var signature, timestamp string
94 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
95 for _, hint := range strings.Split(hints, "+") {
96 if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
97 // Server ignores size hints
98 } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
101 } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
102 // Any unknown hint that starts with an uppercase letter is
103 // presumed to be valid and ignored, to permit forward compatibility.
105 // Unknown format; not a valid locator.
106 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
112 // If permission checking is in effect, verify this
113 // request's permission signature.
114 if enforce_permissions {
115 if signature == "" || timestamp == "" {
116 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
118 } else if IsExpired(timestamp) {
119 http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
122 req_locator := req.URL.Path[1:] // strip leading slash
123 if !VerifySignature(req_locator, GetApiToken(req)) {
124 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
130 block, err := GetBlock(hash, false)
132 // Garbage collect after each GET. Fixes #2865.
133 // TODO(twp): review Keep memory usage and see if there's
134 // a better way to do this than blindly garbage collecting
135 // after every block.
139 // This type assertion is safe because the only errors
140 // GetBlock can return are DiskHashError or NotFoundError.
141 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
145 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
147 _, err = resp.Write(block)
152 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
153 // Garbage collect after each PUT. Fixes #2865.
154 // See also GetBlockHandler.
157 hash := mux.Vars(req)["hash"]
159 // Read the block data to be stored.
160 // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
162 if req.ContentLength > BLOCKSIZE {
163 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
167 buf := make([]byte, req.ContentLength)
168 nread, err := io.ReadFull(req.Body, buf)
170 http.Error(resp, err.Error(), 500)
171 } else if int64(nread) < req.ContentLength {
172 http.Error(resp, "request truncated", 500)
174 if err := PutBlock(buf, hash); err == nil {
175 // Success; add a size hint, sign the locator if
176 // possible, and return it to the client.
177 return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
178 api_token := GetApiToken(req)
179 if PermissionSecret != nil && api_token != "" {
180 expiry := time.Now().Add(permission_ttl)
181 return_hash = SignLocator(return_hash, api_token, expiry)
183 resp.Write([]byte(return_hash + "\n"))
185 ke := err.(*KeepError)
186 http.Error(resp, ke.Error(), ke.HTTPCode)
193 // A HandleFunc to address /index and /index/{prefix} requests.
195 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
196 // Reject unauthorized requests.
197 if !IsDataManagerToken(GetApiToken(req)) {
198 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
202 prefix := mux.Vars(req)["prefix"]
205 for _, vol := range KeepVM.AllReadable() {
206 index = index + vol.Index(prefix)
208 resp.Write([]byte(index))
212 // Responds to /status.json requests with the current node status,
213 // described in a JSON structure.
215 // The data given in a status.json response includes:
216 // volumes - a list of Keep volumes currently in use by this server
217 // each volume is an object with the following fields:
219 // * device_num (an integer identifying the underlying filesystem)
223 type VolumeStatus struct {
224 MountPoint string `json:"mount_point"`
225 DeviceNum uint64 `json:"device_num"`
226 BytesFree uint64 `json:"bytes_free"`
227 BytesUsed uint64 `json:"bytes_used"`
230 type NodeStatus struct {
231 Volumes []*VolumeStatus `json:"volumes"`
234 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
235 st := GetNodeStatus()
236 if jstat, err := json.Marshal(st); err == nil {
239 log.Printf("json.Marshal: %s\n", err)
240 log.Printf("NodeStatus = %v\n", st)
241 http.Error(resp, err.Error(), 500)
246 // Returns a NodeStatus struct describing this Keep
247 // node's current status.
249 func GetNodeStatus() *NodeStatus {
250 st := new(NodeStatus)
252 st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
253 for i, vol := range KeepVM.AllReadable() {
254 st.Volumes[i] = vol.Status()
260 // Returns a VolumeStatus describing the requested volume.
262 func GetVolumeStatus(volume string) *VolumeStatus {
263 var fs syscall.Statfs_t
266 if fi, err := os.Stat(volume); err == nil {
267 devnum = fi.Sys().(*syscall.Stat_t).Dev
269 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
273 err := syscall.Statfs(volume, &fs)
275 log.Printf("GetVolumeStatus: statfs: %s\n", err)
278 // These calculations match the way df calculates disk usage:
279 // "free" space is measured by fs.Bavail, but "used" space
280 // uses fs.Blocks - fs.Bfree.
281 free := fs.Bavail * uint64(fs.Bsize)
282 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
283 return &VolumeStatus{volume, devnum, free, used}
286 // DeleteHandler processes DELETE requests.
288 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
289 // from all connected volumes.
291 // Only the Data Manager, or an Arvados admin with scope "all", are
292 // allowed to issue DELETE requests. If a DELETE request is not
293 // authenticated or is issued by a non-admin user, the server returns
294 // a PermissionError.
296 // Upon receiving a valid request from an authorized user,
297 // DeleteHandler deletes all copies of the specified block on local
302 // If the requested blocks was not found on any volume, the response
303 // code is HTTP 404 Not Found.
305 // Otherwise, the response code is 200 OK, with a response body
306 // consisting of the JSON message
308 // {"copies_deleted":d,"copies_failed":f}
310 // where d and f are integers representing the number of blocks that
311 // were successfully and unsuccessfully deleted.
313 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
314 hash := mux.Vars(req)["hash"]
316 // Confirm that this user is an admin and has a token with unlimited scope.
317 var tok = GetApiToken(req)
318 if tok == "" || !CanDelete(tok) {
319 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
324 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
328 // Delete copies of this block from all available volumes.
329 // Report how many blocks were successfully deleted, and how
330 // many were found on writable volumes but not deleted.
332 Deleted int `json:"copies_deleted"`
333 Failed int `json:"copies_failed"`
335 for _, vol := range KeepVM.AllWritable() {
336 if err := vol.Delete(hash); err == nil {
338 } else if os.IsNotExist(err) {
342 log.Println("DeleteHandler:", err)
348 if result.Deleted == 0 && result.Failed == 0 {
349 st = http.StatusNotFound
356 if st == http.StatusOK {
357 if body, err := json.Marshal(result); err == nil {
360 log.Printf("json.Marshal: %s (result = %v)\n", err, result)
361 http.Error(resp, err.Error(), 500)
366 /* PullHandler processes "PUT /pull" requests for the data manager.
367 The request body is a JSON message containing a list of pull
368 requests in the following format:
372 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
374 "keep0.qr1hi.arvadosapi.com:25107",
375 "keep1.qr1hi.arvadosapi.com:25108"
379 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
389 Each pull request in the list consists of a block locator string
390 and an ordered list of servers. Keepstore should try to fetch the
391 block from each server in turn.
393 If the request has not been sent by the Data Manager, return 401
396 If the JSON unmarshalling fails, return 400 Bad Request.
399 type PullRequest struct {
400 Locator string `json:"locator"`
401 Servers []string `json:"servers"`
404 func PullHandler(resp http.ResponseWriter, req *http.Request) {
405 // Reject unauthorized requests.
406 if !IsDataManagerToken(GetApiToken(req)) {
407 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
411 // Parse the request body.
413 r := json.NewDecoder(req.Body)
414 if err := r.Decode(&pr); err != nil {
415 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
419 // We have a properly formatted pull list sent from the data
420 // manager. Report success and send the list to the pull list
421 // manager for further handling.
422 resp.WriteHeader(http.StatusOK)
424 fmt.Sprintf("Received %d pull requests\n", len(pr))))
427 for _, p := range pr {
430 pullq.ReplaceQueue(plist)
433 type TrashRequest struct {
434 Locator string `json:"locator"`
435 BlockMtime int64 `json:"block_mtime"`
438 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
439 // Reject unauthorized requests.
440 if !IsDataManagerToken(GetApiToken(req)) {
441 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
445 // Parse the request body.
446 var trash []TrashRequest
447 r := json.NewDecoder(req.Body)
448 if err := r.Decode(&trash); err != nil {
449 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
453 // We have a properly formatted trash list sent from the data
454 // manager. Report success and send the list to the trash work
455 // queue for further handling.
456 resp.WriteHeader(http.StatusOK)
458 fmt.Sprintf("Received %d trash requests\n", len(trash))))
461 for _, t := range trash {
464 trashq.ReplaceQueue(tlist)
467 // ==============================
468 // GetBlock and PutBlock implement lower-level code for handling
469 // blocks by rooting through volumes connected to the local machine.
470 // Once the handler has determined that system policy permits the
471 // request, it calls these methods to perform the actual operation.
473 // TODO(twp): this code would probably be better located in the
474 // VolumeManager interface. As an abstraction, the VolumeManager
475 // should be the only part of the code that cares about which volume a
476 // block is stored on, so it should be responsible for figuring out
477 // which volume to check for fetching blocks, storing blocks, etc.
479 // ==============================
480 // GetBlock fetches and returns the block identified by "hash". If
481 // the update_timestamp argument is true, GetBlock also updates the
482 // block's file modification time (for the sake of PutBlock, which
483 // must update the file's timestamp when the block already exists).
485 // On success, GetBlock returns a byte slice with the block data, and
488 // If the block cannot be found on any volume, returns NotFoundError.
490 // If the block found does not have the correct MD5 hash, returns
494 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
495 // Attempt to read the requested hash from a keep volume.
496 error_to_caller := NotFoundError
499 if update_timestamp {
500 // Pointless to find the block on an unwritable volume
501 // because Touch() will fail -- this is as good as
502 // "not found" for purposes of callers who need to
504 vols = KeepVM.AllWritable()
506 vols = KeepVM.AllReadable()
509 for _, vol := range vols {
510 buf, err := vol.Get(hash)
512 // IsNotExist is an expected error and may be
513 // ignored. All other errors are logged. In
514 // any case we continue trying to read other
515 // volumes. If all volumes report IsNotExist,
516 // we return a NotFoundError.
517 if !os.IsNotExist(err) {
518 log.Printf("GetBlock: reading %s: %s\n", hash, err)
522 // Check the file checksum.
524 filehash := fmt.Sprintf("%x", md5.Sum(buf))
525 if filehash != hash {
526 // TODO: Try harder to tell a sysadmin about
528 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
530 error_to_caller = DiskHashError
533 if error_to_caller == DiskHashError {
534 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
537 if update_timestamp {
538 if err := vol.Touch(hash); err != nil {
539 error_to_caller = GenericError
540 log.Printf("%s: Touch %s failed: %s",
541 vol, hash, error_to_caller)
547 return nil, error_to_caller
550 /* PutBlock(block, hash)
551 Stores the BLOCK (identified by the content id HASH) in Keep.
553 The MD5 checksum of the block must be identical to the content id HASH.
554 If not, an error is returned.
556 PutBlock stores the BLOCK on the first Keep volume with free space.
557 A failure code is returned to the user only if all volumes fail.
559 On success, PutBlock returns nil.
560 On failure, it returns a KeepError with one of the following codes:
563 A different block with the same hash already exists on this
566 The MD5 hash of the BLOCK does not match the argument HASH.
568 There was not enough space left in any Keep volume to store
571 The object could not be stored for some other reason (e.g.
572 all writes failed). The text of the error message should
573 provide as much detail as possible.
576 func PutBlock(block []byte, hash string) error {
577 // Check that BLOCK's checksum matches HASH.
578 blockhash := fmt.Sprintf("%x", md5.Sum(block))
579 if blockhash != hash {
580 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
581 return RequestHashError
584 // If we already have a block on disk under this identifier, return
585 // success (but check for MD5 collisions). While fetching the block,
586 // update its timestamp.
587 // The only errors that GetBlock can return are DiskHashError and NotFoundError.
588 // In either case, we want to write our new (good) block to disk,
589 // so there is nothing special to do if err != nil.
591 if oldblock, err := GetBlock(hash, true); err == nil {
592 if bytes.Compare(block, oldblock) == 0 {
593 // The block already exists; return success.
596 return CollisionError
600 // Choose a Keep volume to write to.
601 // If this volume fails, try all of the volumes in order.
602 if vol := KeepVM.NextWritable(); vol != nil {
603 if err := vol.Put(hash, block); err == nil {
604 return nil // success!
608 writables := KeepVM.AllWritable()
609 if len(writables) == 0 {
610 log.Print("No writable volumes.")
615 for _, vol := range writables {
616 err := vol.Put(hash, block)
618 return nil // success!
620 if err != FullError {
621 // The volume is not full but the
622 // write did not succeed. Report the
623 // error and continue trying.
625 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
630 log.Print("All volumes are full.")
633 // Already logged the non-full errors.
639 // Return true if the specified string is a valid Keep locator.
640 // When Keep is extended to support hash types other than MD5,
641 // this should be updated to cover those as well.
643 func IsValidLocator(loc string) bool {
644 match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
648 log.Printf("IsValidLocator: %s\n", err)
652 // GetApiToken returns the OAuth2 token from the Authorization
653 // header of a HTTP request, or an empty string if no matching
655 func GetApiToken(req *http.Request) string {
656 if auth, ok := req.Header["Authorization"]; ok {
657 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
659 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
666 // IsExpired returns true if the given Unix timestamp (expressed as a
667 // hexadecimal string) is in the past, or if timestamp_hex cannot be
668 // parsed as a hexadecimal string.
669 func IsExpired(timestamp_hex string) bool {
670 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
672 log.Printf("IsExpired: %s\n", err)
675 return time.Unix(ts, 0).Before(time.Now())
678 // CanDelete returns true if the user identified by api_token is
679 // allowed to delete blocks.
680 func CanDelete(api_token string) bool {
684 // Blocks may be deleted only when Keep has been configured with a
686 if IsDataManagerToken(api_token) {
689 // TODO(twp): look up api_token with the API server
690 // return true if is_admin is true and if the token
691 // has unlimited scope
695 // IsDataManagerToken returns true if api_token represents the data
697 func IsDataManagerToken(api_token string) bool {
698 return data_manager_token != "" && api_token == data_manager_token