3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
16 "github.com/gorilla/mux"
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
32 func MakeRESTRouter() *mux.Router {
33 rest := mux.NewRouter()
36 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
38 `/{hash:[0-9a-f]{32}}+{hints}`,
39 GetBlockHandler).Methods("GET", "HEAD")
41 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
43 // List all blocks stored here. Privileged client only.
44 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
45 // List blocks stored here whose hash has the given prefix.
46 // Privileged client only.
47 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
49 // List volumes: path, device number, bytes used/avail.
50 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
52 // Replace the current pull queue.
53 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
55 // Replace the current trash queue.
56 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
58 // Any request which does not match any of these routes gets
60 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
65 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
66 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
69 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
70 hash := mux.Vars(req)["hash"]
72 hints := mux.Vars(req)["hints"]
74 // Parse the locator string and hints from the request.
75 // TODO(twp): implement a Locator type.
76 var signature, timestamp string
78 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
79 for _, hint := range strings.Split(hints, "+") {
80 if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
81 // Server ignores size hints
82 } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
85 } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
86 // Any unknown hint that starts with an uppercase letter is
87 // presumed to be valid and ignored, to permit forward compatibility.
89 // Unknown format; not a valid locator.
90 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
96 // If permission checking is in effect, verify this
97 // request's permission signature.
98 if enforce_permissions {
99 if signature == "" || timestamp == "" {
100 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
102 } else if IsExpired(timestamp) {
103 http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
106 req_locator := req.URL.Path[1:] // strip leading slash
107 if !VerifySignature(req_locator, GetApiToken(req)) {
108 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
114 block, err := GetBlock(hash, false)
116 // Garbage collect after each GET. Fixes #2865.
117 // TODO(twp): review Keep memory usage and see if there's
118 // a better way to do this than blindly garbage collecting
119 // after every block.
123 // This type assertion is safe because the only errors
124 // GetBlock can return are DiskHashError or NotFoundError.
125 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
129 resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
131 _, err = resp.Write(block)
136 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
137 // Garbage collect after each PUT. Fixes #2865.
138 // See also GetBlockHandler.
141 hash := mux.Vars(req)["hash"]
143 // Detect as many error conditions as possible before reading
144 // the body: avoid transmitting data that will not end up
145 // being written anyway.
147 if req.ContentLength == -1 {
148 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
152 if req.ContentLength > BLOCKSIZE {
153 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
157 if len(KeepVM.AllWritable()) == 0 {
158 http.Error(resp, FullError.Error(), FullError.HTTPCode)
162 buf := make([]byte, req.ContentLength)
163 nread, err := io.ReadFull(req.Body, buf)
165 http.Error(resp, err.Error(), 500)
167 } else if int64(nread) < req.ContentLength {
168 http.Error(resp, "request truncated", 500)
172 err = PutBlock(buf, hash)
174 ke := err.(*KeepError)
175 http.Error(resp, ke.Error(), ke.HTTPCode)
179 // Success; add a size hint, sign the locator if possible, and
180 // return it to the client.
181 return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
182 api_token := GetApiToken(req)
183 if PermissionSecret != nil && api_token != "" {
184 expiry := time.Now().Add(blob_signature_ttl)
185 return_hash = SignLocator(return_hash, api_token, expiry)
187 resp.Write([]byte(return_hash + "\n"))
191 // A HandleFunc to address /index and /index/{prefix} requests.
193 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
194 // Reject unauthorized requests.
195 if !IsDataManagerToken(GetApiToken(req)) {
196 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
200 prefix := mux.Vars(req)["prefix"]
202 for _, vol := range KeepVM.AllReadable() {
203 if err := vol.IndexTo(prefix, resp); err != nil {
204 // The only errors returned by IndexTo are
205 // write errors returned by resp.Write(),
206 // which probably means the client has
207 // disconnected and this error will never be
208 // reported to the client -- but it will
209 // appear in our own error log.
210 http.Error(resp, err.Error(), http.StatusInternalServerError)
217 // Responds to /status.json requests with the current node status,
218 // described in a JSON structure.
220 // The data given in a status.json response includes:
221 // volumes - a list of Keep volumes currently in use by this server
222 // each volume is an object with the following fields:
224 // * device_num (an integer identifying the underlying filesystem)
228 type VolumeStatus struct {
229 MountPoint string `json:"mount_point"`
230 DeviceNum uint64 `json:"device_num"`
231 BytesFree uint64 `json:"bytes_free"`
232 BytesUsed uint64 `json:"bytes_used"`
235 type NodeStatus struct {
236 Volumes []*VolumeStatus `json:"volumes"`
239 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
240 st := GetNodeStatus()
241 if jstat, err := json.Marshal(st); err == nil {
244 log.Printf("json.Marshal: %s\n", err)
245 log.Printf("NodeStatus = %v\n", st)
246 http.Error(resp, err.Error(), 500)
251 // Returns a NodeStatus struct describing this Keep
252 // node's current status.
254 func GetNodeStatus() *NodeStatus {
255 st := new(NodeStatus)
257 st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
258 for i, vol := range KeepVM.AllReadable() {
259 st.Volumes[i] = vol.Status()
265 // Returns a VolumeStatus describing the requested volume.
267 func GetVolumeStatus(volume string) *VolumeStatus {
268 var fs syscall.Statfs_t
271 if fi, err := os.Stat(volume); err == nil {
272 devnum = fi.Sys().(*syscall.Stat_t).Dev
274 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
278 err := syscall.Statfs(volume, &fs)
280 log.Printf("GetVolumeStatus: statfs: %s\n", err)
283 // These calculations match the way df calculates disk usage:
284 // "free" space is measured by fs.Bavail, but "used" space
285 // uses fs.Blocks - fs.Bfree.
286 free := fs.Bavail * uint64(fs.Bsize)
287 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
288 return &VolumeStatus{volume, devnum, free, used}
291 // DeleteHandler processes DELETE requests.
293 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
294 // from all connected volumes.
296 // Only the Data Manager, or an Arvados admin with scope "all", are
297 // allowed to issue DELETE requests. If a DELETE request is not
298 // authenticated or is issued by a non-admin user, the server returns
299 // a PermissionError.
301 // Upon receiving a valid request from an authorized user,
302 // DeleteHandler deletes all copies of the specified block on local
307 // If the requested blocks was not found on any volume, the response
308 // code is HTTP 404 Not Found.
310 // Otherwise, the response code is 200 OK, with a response body
311 // consisting of the JSON message
313 // {"copies_deleted":d,"copies_failed":f}
315 // where d and f are integers representing the number of blocks that
316 // were successfully and unsuccessfully deleted.
318 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
319 hash := mux.Vars(req)["hash"]
321 // Confirm that this user is an admin and has a token with unlimited scope.
322 var tok = GetApiToken(req)
323 if tok == "" || !CanDelete(tok) {
324 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
329 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
333 // Delete copies of this block from all available volumes.
334 // Report how many blocks were successfully deleted, and how
335 // many were found on writable volumes but not deleted.
337 Deleted int `json:"copies_deleted"`
338 Failed int `json:"copies_failed"`
340 for _, vol := range KeepVM.AllWritable() {
341 if err := vol.Delete(hash); err == nil {
343 } else if os.IsNotExist(err) {
347 log.Println("DeleteHandler:", err)
353 if result.Deleted == 0 && result.Failed == 0 {
354 st = http.StatusNotFound
361 if st == http.StatusOK {
362 if body, err := json.Marshal(result); err == nil {
365 log.Printf("json.Marshal: %s (result = %v)\n", err, result)
366 http.Error(resp, err.Error(), 500)
371 /* PullHandler processes "PUT /pull" requests for the data manager.
372 The request body is a JSON message containing a list of pull
373 requests in the following format:
377 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
379 "keep0.qr1hi.arvadosapi.com:25107",
380 "keep1.qr1hi.arvadosapi.com:25108"
384 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
394 Each pull request in the list consists of a block locator string
395 and an ordered list of servers. Keepstore should try to fetch the
396 block from each server in turn.
398 If the request has not been sent by the Data Manager, return 401
401 If the JSON unmarshalling fails, return 400 Bad Request.
404 type PullRequest struct {
405 Locator string `json:"locator"`
406 Servers []string `json:"servers"`
409 func PullHandler(resp http.ResponseWriter, req *http.Request) {
410 // Reject unauthorized requests.
411 if !IsDataManagerToken(GetApiToken(req)) {
412 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
416 // Parse the request body.
418 r := json.NewDecoder(req.Body)
419 if err := r.Decode(&pr); err != nil {
420 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
424 // We have a properly formatted pull list sent from the data
425 // manager. Report success and send the list to the pull list
426 // manager for further handling.
427 resp.WriteHeader(http.StatusOK)
429 fmt.Sprintf("Received %d pull requests\n", len(pr))))
432 for _, p := range pr {
435 pullq.ReplaceQueue(plist)
438 type TrashRequest struct {
439 Locator string `json:"locator"`
440 BlockMtime int64 `json:"block_mtime"`
443 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
444 // Reject unauthorized requests.
445 if !IsDataManagerToken(GetApiToken(req)) {
446 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
450 // Parse the request body.
451 var trash []TrashRequest
452 r := json.NewDecoder(req.Body)
453 if err := r.Decode(&trash); err != nil {
454 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
458 // We have a properly formatted trash list sent from the data
459 // manager. Report success and send the list to the trash work
460 // queue for further handling.
461 resp.WriteHeader(http.StatusOK)
463 fmt.Sprintf("Received %d trash requests\n", len(trash))))
466 for _, t := range trash {
469 trashq.ReplaceQueue(tlist)
472 // ==============================
473 // GetBlock and PutBlock implement lower-level code for handling
474 // blocks by rooting through volumes connected to the local machine.
475 // Once the handler has determined that system policy permits the
476 // request, it calls these methods to perform the actual operation.
478 // TODO(twp): this code would probably be better located in the
479 // VolumeManager interface. As an abstraction, the VolumeManager
480 // should be the only part of the code that cares about which volume a
481 // block is stored on, so it should be responsible for figuring out
482 // which volume to check for fetching blocks, storing blocks, etc.
484 // ==============================
485 // GetBlock fetches and returns the block identified by "hash". If
486 // the update_timestamp argument is true, GetBlock also updates the
487 // block's file modification time (for the sake of PutBlock, which
488 // must update the file's timestamp when the block already exists).
490 // On success, GetBlock returns a byte slice with the block data, and
493 // If the block cannot be found on any volume, returns NotFoundError.
495 // If the block found does not have the correct MD5 hash, returns
499 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
500 // Attempt to read the requested hash from a keep volume.
501 error_to_caller := NotFoundError
504 if update_timestamp {
505 // Pointless to find the block on an unwritable volume
506 // because Touch() will fail -- this is as good as
507 // "not found" for purposes of callers who need to
509 vols = KeepVM.AllWritable()
511 vols = KeepVM.AllReadable()
514 for _, vol := range vols {
515 buf, err := vol.Get(hash)
517 // IsNotExist is an expected error and may be
518 // ignored. All other errors are logged. In
519 // any case we continue trying to read other
520 // volumes. If all volumes report IsNotExist,
521 // we return a NotFoundError.
522 if !os.IsNotExist(err) {
523 log.Printf("GetBlock: reading %s: %s\n", hash, err)
527 // Check the file checksum.
529 filehash := fmt.Sprintf("%x", md5.Sum(buf))
530 if filehash != hash {
531 // TODO: Try harder to tell a sysadmin about
533 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
535 error_to_caller = DiskHashError
538 if error_to_caller == DiskHashError {
539 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
542 if update_timestamp {
543 if err := vol.Touch(hash); err != nil {
544 error_to_caller = GenericError
545 log.Printf("%s: Touch %s failed: %s",
546 vol, hash, error_to_caller)
552 return nil, error_to_caller
555 /* PutBlock(block, hash)
556 Stores the BLOCK (identified by the content id HASH) in Keep.
558 The MD5 checksum of the block must be identical to the content id HASH.
559 If not, an error is returned.
561 PutBlock stores the BLOCK on the first Keep volume with free space.
562 A failure code is returned to the user only if all volumes fail.
564 On success, PutBlock returns nil.
565 On failure, it returns a KeepError with one of the following codes:
568 A different block with the same hash already exists on this
571 The MD5 hash of the BLOCK does not match the argument HASH.
573 There was not enough space left in any Keep volume to store
576 The object could not be stored for some other reason (e.g.
577 all writes failed). The text of the error message should
578 provide as much detail as possible.
581 func PutBlock(block []byte, hash string) error {
582 // Check that BLOCK's checksum matches HASH.
583 blockhash := fmt.Sprintf("%x", md5.Sum(block))
584 if blockhash != hash {
585 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
586 return RequestHashError
589 // If we already have a block on disk under this identifier, return
590 // success (but check for MD5 collisions). While fetching the block,
591 // update its timestamp.
592 // The only errors that GetBlock can return are DiskHashError and NotFoundError.
593 // In either case, we want to write our new (good) block to disk,
594 // so there is nothing special to do if err != nil.
596 if oldblock, err := GetBlock(hash, true); err == nil {
597 if bytes.Compare(block, oldblock) == 0 {
598 // The block already exists; return success.
601 return CollisionError
605 // Choose a Keep volume to write to.
606 // If this volume fails, try all of the volumes in order.
607 if vol := KeepVM.NextWritable(); vol != nil {
608 if err := vol.Put(hash, block); err == nil {
609 return nil // success!
613 writables := KeepVM.AllWritable()
614 if len(writables) == 0 {
615 log.Print("No writable volumes.")
620 for _, vol := range writables {
621 err := vol.Put(hash, block)
623 return nil // success!
625 if err != FullError {
626 // The volume is not full but the
627 // write did not succeed. Report the
628 // error and continue trying.
630 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
635 log.Print("All volumes are full.")
638 // Already logged the non-full errors.
644 // Return true if the specified string is a valid Keep locator.
645 // When Keep is extended to support hash types other than MD5,
646 // this should be updated to cover those as well.
648 func IsValidLocator(loc string) bool {
649 match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
653 log.Printf("IsValidLocator: %s\n", err)
657 // GetApiToken returns the OAuth2 token from the Authorization
658 // header of a HTTP request, or an empty string if no matching
660 func GetApiToken(req *http.Request) string {
661 if auth, ok := req.Header["Authorization"]; ok {
662 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
664 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
671 // IsExpired returns true if the given Unix timestamp (expressed as a
672 // hexadecimal string) is in the past, or if timestamp_hex cannot be
673 // parsed as a hexadecimal string.
674 func IsExpired(timestamp_hex string) bool {
675 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
677 log.Printf("IsExpired: %s\n", err)
680 return time.Unix(ts, 0).Before(time.Now())
683 // CanDelete returns true if the user identified by api_token is
684 // allowed to delete blocks.
685 func CanDelete(api_token string) bool {
689 // Blocks may be deleted only when Keep has been configured with a
691 if IsDataManagerToken(api_token) {
694 // TODO(twp): look up api_token with the API server
695 // return true if is_admin is true and if the token
696 // has unlimited scope
700 // IsDataManagerToken returns true if api_token represents the data
702 func IsDataManagerToken(api_token string) bool {
703 return data_manager_token != "" && api_token == data_manager_token