3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
16 "github.com/gorilla/mux"
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
32 func MakeRESTRouter() *mux.Router {
33 rest := mux.NewRouter()
36 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
38 `/{hash:[0-9a-f]{32}}+{hints}`,
39 GetBlockHandler).Methods("GET", "HEAD")
41 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
44 // For IndexHandler we support:
45 // /index - returns all locators
46 // /index/{prefix} - returns all locators that begin with {prefix}
47 // {prefix} is a string of hexadecimal digits between 0 and 32 digits.
48 // If {prefix} is the empty string, return an index of all locators
49 // (so /index and /index/ behave identically)
50 // A client may supply a full 32-digit locator string, in which
51 // case the server will return an index with either zero or one
52 // entries. This usage allows a client to check whether a block is
53 // present, and its size and upload time, without retrieving the
56 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
58 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
59 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
61 // Any request which does not match any of these routes gets
63 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
68 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
69 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
72 // FindKeepVolumes scans all mounted volumes on the system for Keep
73 // volumes, and returns a list of matching paths.
75 // A device is assumed to be a Keep volume if it is a normal or tmpfs
76 // volume and has a "/keep" directory directly underneath the mount
79 func FindKeepVolumes() []string {
80 vols := make([]string, 0)
82 if f, err := os.Open(PROC_MOUNTS); err != nil {
83 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
85 scanner := bufio.NewScanner(f)
87 args := strings.Fields(scanner.Text())
88 dev, mount := args[0], args[1]
90 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
91 keep := mount + "/keep"
92 if st, err := os.Stat(keep); err == nil && st.IsDir() {
93 vols = append(vols, keep)
97 if err := scanner.Err(); err != nil {
104 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
105 hash := mux.Vars(req)["hash"]
107 log.Printf("%s %s", req.Method, hash)
109 hints := mux.Vars(req)["hints"]
111 // Parse the locator string and hints from the request.
112 // TODO(twp): implement a Locator type.
113 var signature, timestamp string
115 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
116 for _, hint := range strings.Split(hints, "+") {
117 if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
118 // Server ignores size hints
119 } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
122 } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
123 // Any unknown hint that starts with an uppercase letter is
124 // presumed to be valid and ignored, to permit forward compatibility.
126 // Unknown format; not a valid locator.
127 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
133 // If permission checking is in effect, verify this
134 // request's permission signature.
135 if enforce_permissions {
136 if signature == "" || timestamp == "" {
137 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
139 } else if IsExpired(timestamp) {
140 http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
143 req_locator := req.URL.Path[1:] // strip leading slash
144 if !VerifySignature(req_locator, GetApiToken(req)) {
145 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
151 block, err := GetBlock(hash, false)
153 // Garbage collect after each GET. Fixes #2865.
154 // TODO(twp): review Keep memory usage and see if there's
155 // a better way to do this than blindly garbage collecting
156 // after every block.
160 // This type assertion is safe because the only errors
161 // GetBlock can return are DiskHashError or NotFoundError.
162 if err == NotFoundError {
163 log.Printf("%s: not found, giving up\n", hash)
165 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
169 resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
171 _, err = resp.Write(block)
173 log.Printf("GetBlockHandler: writing response: %s", err)
179 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
180 // Garbage collect after each PUT. Fixes #2865.
181 // See also GetBlockHandler.
184 hash := mux.Vars(req)["hash"]
186 log.Printf("%s %s", req.Method, hash)
188 // Read the block data to be stored.
189 // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
191 if req.ContentLength > BLOCKSIZE {
192 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
196 buf := make([]byte, req.ContentLength)
197 nread, err := io.ReadFull(req.Body, buf)
199 http.Error(resp, err.Error(), 500)
200 } else if int64(nread) < req.ContentLength {
201 http.Error(resp, "request truncated", 500)
203 if err := PutBlock(buf, hash); err == nil {
204 // Success; add a size hint, sign the locator if
205 // possible, and return it to the client.
206 return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
207 api_token := GetApiToken(req)
208 if PermissionSecret != nil && api_token != "" {
209 expiry := time.Now().Add(permission_ttl)
210 return_hash = SignLocator(return_hash, api_token, expiry)
212 resp.Write([]byte(return_hash + "\n"))
214 ke := err.(*KeepError)
215 http.Error(resp, ke.Error(), ke.HTTPCode)
222 // A HandleFunc to address /index and /index/{prefix} requests.
224 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
225 prefix := mux.Vars(req)["prefix"]
227 // Only the data manager may issue /index requests,
228 // and only if enforce_permissions is enabled.
229 // All other requests return 403 Forbidden.
230 api_token := GetApiToken(req)
231 if !enforce_permissions ||
233 data_manager_token != api_token {
234 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
238 for _, vol := range KeepVM.Volumes() {
239 index = index + vol.Index(prefix)
241 resp.Write([]byte(index))
245 // Responds to /status.json requests with the current node status,
246 // described in a JSON structure.
248 // The data given in a status.json response includes:
249 // volumes - a list of Keep volumes currently in use by this server
250 // each volume is an object with the following fields:
252 // * device_num (an integer identifying the underlying filesystem)
256 type VolumeStatus struct {
257 MountPoint string `json:"mount_point"`
258 DeviceNum uint64 `json:"device_num"`
259 BytesFree uint64 `json:"bytes_free"`
260 BytesUsed uint64 `json:"bytes_used"`
263 type NodeStatus struct {
264 Volumes []*VolumeStatus `json:"volumes"`
267 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
268 st := GetNodeStatus()
269 if jstat, err := json.Marshal(st); err == nil {
272 log.Printf("json.Marshal: %s\n", err)
273 log.Printf("NodeStatus = %v\n", st)
274 http.Error(resp, err.Error(), 500)
279 // Returns a NodeStatus struct describing this Keep
280 // node's current status.
282 func GetNodeStatus() *NodeStatus {
283 st := new(NodeStatus)
285 st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
286 for i, vol := range KeepVM.Volumes() {
287 st.Volumes[i] = vol.Status()
293 // Returns a VolumeStatus describing the requested volume.
295 func GetVolumeStatus(volume string) *VolumeStatus {
296 var fs syscall.Statfs_t
299 if fi, err := os.Stat(volume); err == nil {
300 devnum = fi.Sys().(*syscall.Stat_t).Dev
302 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
306 err := syscall.Statfs(volume, &fs)
308 log.Printf("GetVolumeStatus: statfs: %s\n", err)
311 // These calculations match the way df calculates disk usage:
312 // "free" space is measured by fs.Bavail, but "used" space
313 // uses fs.Blocks - fs.Bfree.
314 free := fs.Bavail * uint64(fs.Bsize)
315 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
316 return &VolumeStatus{volume, devnum, free, used}
319 // DeleteHandler processes DELETE requests.
321 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
322 // from all connected volumes.
324 // Only the Data Manager, or an Arvados admin with scope "all", are
325 // allowed to issue DELETE requests. If a DELETE request is not
326 // authenticated or is issued by a non-admin user, the server returns
327 // a PermissionError.
329 // Upon receiving a valid request from an authorized user,
330 // DeleteHandler deletes all copies of the specified block on local
335 // If the requested blocks was not found on any volume, the response
336 // code is HTTP 404 Not Found.
338 // Otherwise, the response code is 200 OK, with a response body
339 // consisting of the JSON message
341 // {"copies_deleted":d,"copies_failed":f}
343 // where d and f are integers representing the number of blocks that
344 // were successfully and unsuccessfully deleted.
346 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
347 hash := mux.Vars(req)["hash"]
348 log.Printf("%s %s", req.Method, hash)
350 // Confirm that this user is an admin and has a token with unlimited scope.
351 var tok = GetApiToken(req)
352 if tok == "" || !CanDelete(tok) {
353 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
358 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
362 // Delete copies of this block from all available volumes. Report
363 // how many blocks were successfully and unsuccessfully
366 Deleted int `json:"copies_deleted"`
367 Failed int `json:"copies_failed"`
369 for _, vol := range KeepVM.Volumes() {
370 if err := vol.Delete(hash); err == nil {
372 } else if os.IsNotExist(err) {
376 log.Println("DeleteHandler:", err)
382 if result.Deleted == 0 && result.Failed == 0 {
383 st = http.StatusNotFound
390 if st == http.StatusOK {
391 if body, err := json.Marshal(result); err == nil {
394 log.Printf("json.Marshal: %s (result = %v)\n", err, result)
395 http.Error(resp, err.Error(), 500)
400 // ==============================
401 // GetBlock and PutBlock implement lower-level code for handling
402 // blocks by rooting through volumes connected to the local machine.
403 // Once the handler has determined that system policy permits the
404 // request, it calls these methods to perform the actual operation.
406 // TODO(twp): this code would probably be better located in the
407 // VolumeManager interface. As an abstraction, the VolumeManager
408 // should be the only part of the code that cares about which volume a
409 // block is stored on, so it should be responsible for figuring out
410 // which volume to check for fetching blocks, storing blocks, etc.
412 // ==============================
413 // GetBlock fetches and returns the block identified by "hash". If
414 // the update_timestamp argument is true, GetBlock also updates the
415 // block's file modification time (for the sake of PutBlock, which
416 // must update the file's timestamp when the block already exists).
418 // On success, GetBlock returns a byte slice with the block data, and
421 // If the block cannot be found on any volume, returns NotFoundError.
423 // If the block found does not have the correct MD5 hash, returns
427 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
428 // Attempt to read the requested hash from a keep volume.
429 error_to_caller := NotFoundError
431 for _, vol := range KeepVM.Volumes() {
432 if buf, err := vol.Get(hash); err != nil {
433 // IsNotExist is an expected error and may be ignored.
434 // (If all volumes report IsNotExist, we return a NotFoundError)
435 // All other errors should be logged but we continue trying to
438 case os.IsNotExist(err):
441 log.Printf("GetBlock: reading %s: %s\n", hash, err)
444 // Double check the file checksum.
446 filehash := fmt.Sprintf("%x", md5.Sum(buf))
447 if filehash != hash {
448 // TODO(twp): this condition probably represents a bad disk and
449 // should raise major alarm bells for an administrator: e.g.
450 // they should be sent directly to an event manager at high
451 // priority or logged as urgent problems.
453 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
455 error_to_caller = DiskHashError
458 if error_to_caller != NotFoundError {
459 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
462 // Update the timestamp if the caller requested.
463 // If we could not update the timestamp, continue looking on
465 if update_timestamp {
466 if vol.Touch(hash) != nil {
475 if error_to_caller != NotFoundError {
476 log.Printf("%s: checksum mismatch, no good copy found\n", hash)
478 return nil, error_to_caller
481 /* PutBlock(block, hash)
482 Stores the BLOCK (identified by the content id HASH) in Keep.
484 The MD5 checksum of the block must be identical to the content id HASH.
485 If not, an error is returned.
487 PutBlock stores the BLOCK on the first Keep volume with free space.
488 A failure code is returned to the user only if all volumes fail.
490 On success, PutBlock returns nil.
491 On failure, it returns a KeepError with one of the following codes:
494 A different block with the same hash already exists on this
497 The MD5 hash of the BLOCK does not match the argument HASH.
499 There was not enough space left in any Keep volume to store
502 The object could not be stored for some other reason (e.g.
503 all writes failed). The text of the error message should
504 provide as much detail as possible.
507 func PutBlock(block []byte, hash string) error {
508 // Check that BLOCK's checksum matches HASH.
509 blockhash := fmt.Sprintf("%x", md5.Sum(block))
510 if blockhash != hash {
511 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
512 return RequestHashError
515 // If we already have a block on disk under this identifier, return
516 // success (but check for MD5 collisions). While fetching the block,
517 // update its timestamp.
518 // The only errors that GetBlock can return are DiskHashError and NotFoundError.
519 // In either case, we want to write our new (good) block to disk,
520 // so there is nothing special to do if err != nil.
522 if oldblock, err := GetBlock(hash, true); err == nil {
523 if bytes.Compare(block, oldblock) == 0 {
524 // The block already exists; return success.
527 return CollisionError
531 // Choose a Keep volume to write to.
532 // If this volume fails, try all of the volumes in order.
533 vol := KeepVM.Choose()
534 if err := vol.Put(hash, block); err == nil {
535 return nil // success!
538 for _, vol := range KeepVM.Volumes() {
539 err := vol.Put(hash, block)
541 return nil // success!
543 if err != FullError {
544 // The volume is not full but the write did not succeed.
545 // Report the error and continue trying.
547 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
552 log.Printf("all Keep volumes full")
555 log.Printf("all Keep volumes failed")
562 // Return true if the specified string is a valid Keep locator.
563 // When Keep is extended to support hash types other than MD5,
564 // this should be updated to cover those as well.
566 func IsValidLocator(loc string) bool {
567 match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
571 log.Printf("IsValidLocator: %s\n", err)
575 // GetApiToken returns the OAuth2 token from the Authorization
576 // header of a HTTP request, or an empty string if no matching
578 func GetApiToken(req *http.Request) string {
579 if auth, ok := req.Header["Authorization"]; ok {
580 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
582 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
589 // IsExpired returns true if the given Unix timestamp (expressed as a
590 // hexadecimal string) is in the past, or if timestamp_hex cannot be
591 // parsed as a hexadecimal string.
592 func IsExpired(timestamp_hex string) bool {
593 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
595 log.Printf("IsExpired: %s\n", err)
598 return time.Unix(ts, 0).Before(time.Now())
601 // CanDelete returns true if the user identified by api_token is
602 // allowed to delete blocks.
603 func CanDelete(api_token string) bool {
607 // Blocks may be deleted only when Keep has been configured with a
609 if data_manager_token == "" {
612 if api_token == data_manager_token {
615 // TODO(twp): look up api_token with the API server
616 // return true if is_admin is true and if the token
617 // has unlimited scope