3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
17 "github.com/gorilla/mux"
30 // MakeRESTRouter returns a new mux.Router that forwards all Keep
31 // requests to the appropriate handlers.
33 func MakeRESTRouter() *mux.Router {
34 rest := mux.NewRouter()
37 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
39 `/{hash:[0-9a-f]{32}}+{hints}`,
40 GetBlockHandler).Methods("GET", "HEAD")
42 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
43 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
45 // For IndexHandler we support:
46 // /index - returns all locators
47 // /index/{prefix} - returns all locators that begin with {prefix}
48 // {prefix} is a string of hexadecimal digits between 0 and 32 digits.
49 // If {prefix} is the empty string, return an index of all locators
50 // (so /index and /index/ behave identically)
51 // A client may supply a full 32-digit locator string, in which
52 // case the server will return an index with either zero or one
53 // entries. This usage allows a client to check whether a block is
54 // present, and its size and upload time, without retrieving the
57 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
59 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
60 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
62 // The PullHandler processes "PUT /pull" commands from Data Manager.
63 // It parses the JSON list of pull requests in the request body, and
64 // delivers them to the pull list manager for replication.
65 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
67 // Any request which does not match any of these routes gets
69 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
74 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
75 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
78 // FindKeepVolumes scans all mounted volumes on the system for Keep
79 // volumes, and returns a list of matching paths.
81 // A device is assumed to be a Keep volume if it is a normal or tmpfs
82 // volume and has a "/keep" directory directly underneath the mount
85 func FindKeepVolumes() []string {
86 vols := make([]string, 0)
88 if f, err := os.Open(PROC_MOUNTS); err != nil {
89 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
91 scanner := bufio.NewScanner(f)
93 args := strings.Fields(scanner.Text())
94 dev, mount := args[0], args[1]
96 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
97 keep := mount + "/keep"
98 if st, err := os.Stat(keep); err == nil && st.IsDir() {
99 vols = append(vols, keep)
103 if err := scanner.Err(); err != nil {
110 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
111 hash := mux.Vars(req)["hash"]
113 hints := mux.Vars(req)["hints"]
115 // Parse the locator string and hints from the request.
116 // TODO(twp): implement a Locator type.
117 var signature, timestamp string
119 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
120 for _, hint := range strings.Split(hints, "+") {
121 if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
122 // Server ignores size hints
123 } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
126 } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
127 // Any unknown hint that starts with an uppercase letter is
128 // presumed to be valid and ignored, to permit forward compatibility.
130 // Unknown format; not a valid locator.
131 log.Printf("%s %s %d %s", req.Method, hash, BadRequestError.HTTPCode, "-")
132 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
138 // If permission checking is in effect, verify this
139 // request's permission signature.
140 if enforce_permissions {
141 if signature == "" || timestamp == "" {
142 log.Printf("%s %s %d %s", req.Method, hash, PermissionError.HTTPCode, "-")
143 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
145 } else if IsExpired(timestamp) {
146 log.Printf("%s %s %d %s", req.Method, hash, ExpiredError.HTTPCode, "-")
147 http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
150 req_locator := req.URL.Path[1:] // strip leading slash
151 if !VerifySignature(req_locator, GetApiToken(req)) {
152 log.Printf("%s %s %d %s", req.Method, hash, PermissionError.HTTPCode, "-")
153 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
159 block, err := GetBlock(hash, false)
161 // Garbage collect after each GET. Fixes #2865.
162 // TODO(twp): review Keep memory usage and see if there's
163 // a better way to do this than blindly garbage collecting
164 // after every block.
168 // This type assertion is safe because the only errors
169 // GetBlock can return are DiskHashError or NotFoundError.
170 if err == NotFoundError {
171 log.Printf("%s: not found, giving up\n", hash)
173 log.Printf("%s %s %d %s", req.Method, hash, err.(*KeepError).HTTPCode, "-")
174 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
178 resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
180 _, err = resp.Write(block)
182 log.Printf("%s %s %d %s", req.Method, hash, err.(*KeepError).HTTPCode, len(block), "-")
184 log.Printf("%s %s %d %d", req.Method, hash, 200, len(block))
190 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
191 // Garbage collect after each PUT. Fixes #2865.
192 // See also GetBlockHandler.
195 hash := mux.Vars(req)["hash"]
197 // Read the block data to be stored.
198 // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
200 if req.ContentLength > BLOCKSIZE {
201 log.Printf("%s %s %d %d", req.Method, hash, TooLongError.HTTPCode, req.ContentLength)
202 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
206 buf := make([]byte, req.ContentLength)
207 nread, err := io.ReadFull(req.Body, buf)
209 log.Printf("%s %s %d %d", req.Method, hash, 500, req.ContentLength)
210 http.Error(resp, err.Error(), 500)
211 } else if int64(nread) < req.ContentLength {
212 log.Printf("%s %s %d %d", req.Method, hash, 500, req.ContentLength)
213 http.Error(resp, "request truncated", 500)
215 if err := PutBlock(buf, hash); err == nil {
216 // Success; add a size hint, sign the locator if
217 // possible, and return it to the client.
218 return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
219 api_token := GetApiToken(req)
220 if PermissionSecret != nil && api_token != "" {
221 expiry := time.Now().Add(permission_ttl)
222 return_hash = SignLocator(return_hash, api_token, expiry)
224 log.Printf("%s %s %d %d", req.Method, hash, 200, req.ContentLength)
225 resp.Write([]byte(return_hash + "\n"))
227 ke := err.(*KeepError)
228 log.Printf("%s %s %d %d", req.Method, hash, ke.HTTPCode, req.ContentLength)
229 http.Error(resp, ke.Error(), ke.HTTPCode)
236 // A HandleFunc to address /index and /index/{prefix} requests.
238 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
239 prefix := mux.Vars(req)["prefix"]
241 // Only the data manager may issue /index requests,
242 // and only if enforce_permissions is enabled.
243 // All other requests return 403 Forbidden.
244 api_token := GetApiToken(req)
245 if !enforce_permissions ||
247 data_manager_token != api_token {
248 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
252 for _, vol := range KeepVM.Volumes() {
253 index = index + vol.Index(prefix)
255 resp.Write([]byte(index))
259 // Responds to /status.json requests with the current node status,
260 // described in a JSON structure.
262 // The data given in a status.json response includes:
263 // volumes - a list of Keep volumes currently in use by this server
264 // each volume is an object with the following fields:
266 // * device_num (an integer identifying the underlying filesystem)
270 type VolumeStatus struct {
271 MountPoint string `json:"mount_point"`
272 DeviceNum uint64 `json:"device_num"`
273 BytesFree uint64 `json:"bytes_free"`
274 BytesUsed uint64 `json:"bytes_used"`
277 type NodeStatus struct {
278 Volumes []*VolumeStatus `json:"volumes"`
281 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
282 st := GetNodeStatus()
283 if jstat, err := json.Marshal(st); err == nil {
286 log.Printf("json.Marshal: %s\n", err)
287 log.Printf("NodeStatus = %v\n", st)
288 http.Error(resp, err.Error(), 500)
293 // Returns a NodeStatus struct describing this Keep
294 // node's current status.
296 func GetNodeStatus() *NodeStatus {
297 st := new(NodeStatus)
299 st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
300 for i, vol := range KeepVM.Volumes() {
301 st.Volumes[i] = vol.Status()
307 // Returns a VolumeStatus describing the requested volume.
309 func GetVolumeStatus(volume string) *VolumeStatus {
310 var fs syscall.Statfs_t
313 if fi, err := os.Stat(volume); err == nil {
314 devnum = fi.Sys().(*syscall.Stat_t).Dev
316 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
320 err := syscall.Statfs(volume, &fs)
322 log.Printf("GetVolumeStatus: statfs: %s\n", err)
325 // These calculations match the way df calculates disk usage:
326 // "free" space is measured by fs.Bavail, but "used" space
327 // uses fs.Blocks - fs.Bfree.
328 free := fs.Bavail * uint64(fs.Bsize)
329 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
330 return &VolumeStatus{volume, devnum, free, used}
333 // DeleteHandler processes DELETE requests.
335 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
336 // from all connected volumes.
338 // Only the Data Manager, or an Arvados admin with scope "all", are
339 // allowed to issue DELETE requests. If a DELETE request is not
340 // authenticated or is issued by a non-admin user, the server returns
341 // a PermissionError.
343 // Upon receiving a valid request from an authorized user,
344 // DeleteHandler deletes all copies of the specified block on local
349 // If the requested blocks was not found on any volume, the response
350 // code is HTTP 404 Not Found.
352 // Otherwise, the response code is 200 OK, with a response body
353 // consisting of the JSON message
355 // {"copies_deleted":d,"copies_failed":f}
357 // where d and f are integers representing the number of blocks that
358 // were successfully and unsuccessfully deleted.
360 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
361 hash := mux.Vars(req)["hash"]
362 log.Printf("%s %s", req.Method, hash)
364 // Confirm that this user is an admin and has a token with unlimited scope.
365 var tok = GetApiToken(req)
366 if tok == "" || !CanDelete(tok) {
367 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
372 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
376 // Delete copies of this block from all available volumes. Report
377 // how many blocks were successfully and unsuccessfully
380 Deleted int `json:"copies_deleted"`
381 Failed int `json:"copies_failed"`
383 for _, vol := range KeepVM.Volumes() {
384 if err := vol.Delete(hash); err == nil {
386 } else if os.IsNotExist(err) {
390 log.Println("DeleteHandler:", err)
396 if result.Deleted == 0 && result.Failed == 0 {
397 st = http.StatusNotFound
404 if st == http.StatusOK {
405 if body, err := json.Marshal(result); err == nil {
408 log.Printf("json.Marshal: %s (result = %v)\n", err, result)
409 http.Error(resp, err.Error(), 500)
414 /* PullHandler processes "PUT /pull" requests for the data manager.
415 The request body is a JSON message containing a list of pull
416 requests in the following format:
420 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
422 "keep0.qr1hi.arvadosapi.com:25107",
423 "keep1.qr1hi.arvadosapi.com:25108"
427 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
437 Each pull request in the list consists of a block locator string
438 and an ordered list of servers. Keepstore should try to fetch the
439 block from each server in turn.
441 If the request has not been sent by the Data Manager, return 401
444 If the JSON unmarshalling fails, return 400 Bad Request.
447 type PullRequest struct {
448 Locator string `json:"locator"`
449 Servers []string `json:"servers"`
452 func PullHandler(resp http.ResponseWriter, req *http.Request) {
453 // Reject unauthorized requests.
454 api_token := GetApiToken(req)
455 if !IsDataManagerToken(api_token) {
456 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
457 log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
461 // Parse the request body.
463 r := json.NewDecoder(req.Body)
464 if err := r.Decode(&pr); err != nil {
465 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
466 log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
470 // We have a properly formatted pull list sent from the data
471 // manager. Report success and send the list to the pull list
472 // manager for further handling.
473 log.Printf("%s %s: received %v\n", req.Method, req.URL, pr)
474 resp.WriteHeader(http.StatusOK)
476 fmt.Sprintf("Received %d pull requests\n", len(pr))))
479 for _, p := range pr {
484 pullq = NewWorkQueue()
486 pullq.ReplaceQueue(plist)
489 // ==============================
490 // GetBlock and PutBlock implement lower-level code for handling
491 // blocks by rooting through volumes connected to the local machine.
492 // Once the handler has determined that system policy permits the
493 // request, it calls these methods to perform the actual operation.
495 // TODO(twp): this code would probably be better located in the
496 // VolumeManager interface. As an abstraction, the VolumeManager
497 // should be the only part of the code that cares about which volume a
498 // block is stored on, so it should be responsible for figuring out
499 // which volume to check for fetching blocks, storing blocks, etc.
501 // ==============================
502 // GetBlock fetches and returns the block identified by "hash". If
503 // the update_timestamp argument is true, GetBlock also updates the
504 // block's file modification time (for the sake of PutBlock, which
505 // must update the file's timestamp when the block already exists).
507 // On success, GetBlock returns a byte slice with the block data, and
510 // If the block cannot be found on any volume, returns NotFoundError.
512 // If the block found does not have the correct MD5 hash, returns
516 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
517 // Attempt to read the requested hash from a keep volume.
518 error_to_caller := NotFoundError
520 for _, vol := range KeepVM.Volumes() {
521 if buf, err := vol.Get(hash); err != nil {
522 // IsNotExist is an expected error and may be ignored.
523 // (If all volumes report IsNotExist, we return a NotFoundError)
524 // All other errors should be logged but we continue trying to
527 case os.IsNotExist(err):
530 log.Printf("GetBlock: reading %s: %s\n", hash, err)
533 // Double check the file checksum.
535 filehash := fmt.Sprintf("%x", md5.Sum(buf))
536 if filehash != hash {
537 // TODO(twp): this condition probably represents a bad disk and
538 // should raise major alarm bells for an administrator: e.g.
539 // they should be sent directly to an event manager at high
540 // priority or logged as urgent problems.
542 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
544 error_to_caller = DiskHashError
547 if error_to_caller != NotFoundError {
548 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
551 // Update the timestamp if the caller requested.
552 // If we could not update the timestamp, continue looking on
554 if update_timestamp {
555 if vol.Touch(hash) != nil {
564 if error_to_caller != NotFoundError {
565 log.Printf("%s: checksum mismatch, no good copy found\n", hash)
567 return nil, error_to_caller
570 /* PutBlock(block, hash)
571 Stores the BLOCK (identified by the content id HASH) in Keep.
573 The MD5 checksum of the block must be identical to the content id HASH.
574 If not, an error is returned.
576 PutBlock stores the BLOCK on the first Keep volume with free space.
577 A failure code is returned to the user only if all volumes fail.
579 On success, PutBlock returns nil.
580 On failure, it returns a KeepError with one of the following codes:
583 A different block with the same hash already exists on this
586 The MD5 hash of the BLOCK does not match the argument HASH.
588 There was not enough space left in any Keep volume to store
591 The object could not be stored for some other reason (e.g.
592 all writes failed). The text of the error message should
593 provide as much detail as possible.
596 func PutBlock(block []byte, hash string) error {
597 // Check that BLOCK's checksum matches HASH.
598 blockhash := fmt.Sprintf("%x", md5.Sum(block))
599 if blockhash != hash {
600 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
601 return RequestHashError
604 // If we already have a block on disk under this identifier, return
605 // success (but check for MD5 collisions). While fetching the block,
606 // update its timestamp.
607 // The only errors that GetBlock can return are DiskHashError and NotFoundError.
608 // In either case, we want to write our new (good) block to disk,
609 // so there is nothing special to do if err != nil.
611 if oldblock, err := GetBlock(hash, true); err == nil {
612 if bytes.Compare(block, oldblock) == 0 {
613 // The block already exists; return success.
616 return CollisionError
620 // Choose a Keep volume to write to.
621 // If this volume fails, try all of the volumes in order.
622 vol := KeepVM.Choose()
623 if err := vol.Put(hash, block); err == nil {
624 return nil // success!
627 for _, vol := range KeepVM.Volumes() {
628 err := vol.Put(hash, block)
630 return nil // success!
632 if err != FullError {
633 // The volume is not full but the write did not succeed.
634 // Report the error and continue trying.
636 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
641 log.Printf("all Keep volumes full")
644 log.Printf("all Keep volumes failed")
651 // Return true if the specified string is a valid Keep locator.
652 // When Keep is extended to support hash types other than MD5,
653 // this should be updated to cover those as well.
655 func IsValidLocator(loc string) bool {
656 match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
660 log.Printf("IsValidLocator: %s\n", err)
664 // GetApiToken returns the OAuth2 token from the Authorization
665 // header of a HTTP request, or an empty string if no matching
667 func GetApiToken(req *http.Request) string {
668 if auth, ok := req.Header["Authorization"]; ok {
669 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
671 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
678 // IsExpired returns true if the given Unix timestamp (expressed as a
679 // hexadecimal string) is in the past, or if timestamp_hex cannot be
680 // parsed as a hexadecimal string.
681 func IsExpired(timestamp_hex string) bool {
682 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
684 log.Printf("IsExpired: %s\n", err)
687 return time.Unix(ts, 0).Before(time.Now())
690 // CanDelete returns true if the user identified by api_token is
691 // allowed to delete blocks.
692 func CanDelete(api_token string) bool {
696 // Blocks may be deleted only when Keep has been configured with a
698 if IsDataManagerToken(api_token) {
701 // TODO(twp): look up api_token with the API server
702 // return true if is_admin is true and if the token
703 // has unlimited scope
707 // IsDataManagerToken returns true if api_token represents the data
709 func IsDataManagerToken(api_token string) bool {
710 return data_manager_token != "" && api_token == data_manager_token