3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
16 "github.com/gorilla/mux"
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
32 func MakeRESTRouter() *mux.Router {
33 rest := mux.NewRouter()
36 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
38 `/{hash:[0-9a-f]{32}}+{hints}`,
39 GetBlockHandler).Methods("GET", "HEAD")
41 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
43 // List all blocks stored here. Privileged client only.
44 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
45 // List blocks stored here whose hash has the given prefix.
46 // Privileged client only.
47 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
49 // List volumes: path, device number, bytes used/avail.
50 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
52 // Replace the current pull queue.
53 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
55 // Replace the current trash queue.
56 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
58 // Any request which does not match any of these routes gets
60 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
65 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
66 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
69 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
70 hash := mux.Vars(req)["hash"]
72 hints := mux.Vars(req)["hints"]
74 // Parse the locator string and hints from the request.
75 // TODO(twp): implement a Locator type.
76 var signature, timestamp string
78 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
79 for _, hint := range strings.Split(hints, "+") {
80 if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
81 // Server ignores size hints
82 } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
85 } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
86 // Any unknown hint that starts with an uppercase letter is
87 // presumed to be valid and ignored, to permit forward compatibility.
89 // Unknown format; not a valid locator.
90 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
96 // If permission checking is in effect, verify this
97 // request's permission signature.
98 if enforce_permissions {
99 if signature == "" || timestamp == "" {
100 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
102 } else if IsExpired(timestamp) {
103 http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
106 req_locator := req.URL.Path[1:] // strip leading slash
107 if !VerifySignature(req_locator, GetApiToken(req)) {
108 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
114 block, err := GetBlock(hash, false)
116 // This type assertion is safe because the only errors
117 // GetBlock can return are DiskHashError or NotFoundError.
118 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
121 defer bufs.Put(block)
123 resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
124 resp.Header().Set("Content-Type", "application/octet-stream")
128 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
129 // Garbage collect after each PUT. Fixes #2865.
130 // See also GetBlockHandler.
133 hash := mux.Vars(req)["hash"]
135 // Detect as many error conditions as possible before reading
136 // the body: avoid transmitting data that will not end up
137 // being written anyway.
139 if req.ContentLength == -1 {
140 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
144 if req.ContentLength > BLOCKSIZE {
145 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
149 if len(KeepVM.AllWritable()) == 0 {
150 http.Error(resp, FullError.Error(), FullError.HTTPCode)
154 buf := bufs.Get(int(req.ContentLength))
155 _, err := io.ReadFull(req.Body, buf)
157 http.Error(resp, err.Error(), 500)
162 err = PutBlock(buf, hash)
166 ke := err.(*KeepError)
167 http.Error(resp, ke.Error(), ke.HTTPCode)
171 // Success; add a size hint, sign the locator if possible, and
172 // return it to the client.
173 return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
174 api_token := GetApiToken(req)
175 if PermissionSecret != nil && api_token != "" {
176 expiry := time.Now().Add(blob_signature_ttl)
177 return_hash = SignLocator(return_hash, api_token, expiry)
179 resp.Write([]byte(return_hash + "\n"))
183 // A HandleFunc to address /index and /index/{prefix} requests.
185 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
186 // Reject unauthorized requests.
187 if !IsDataManagerToken(GetApiToken(req)) {
188 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
192 prefix := mux.Vars(req)["prefix"]
194 for _, vol := range KeepVM.AllReadable() {
195 if err := vol.IndexTo(prefix, resp); err != nil {
196 // The only errors returned by IndexTo are
197 // write errors returned by resp.Write(),
198 // which probably means the client has
199 // disconnected and this error will never be
200 // reported to the client -- but it will
201 // appear in our own error log.
202 http.Error(resp, err.Error(), http.StatusInternalServerError)
209 // Responds to /status.json requests with the current node status,
210 // described in a JSON structure.
212 // The data given in a status.json response includes:
213 // volumes - a list of Keep volumes currently in use by this server
214 // each volume is an object with the following fields:
216 // * device_num (an integer identifying the underlying filesystem)
220 type VolumeStatus struct {
221 MountPoint string `json:"mount_point"`
222 DeviceNum uint64 `json:"device_num"`
223 BytesFree uint64 `json:"bytes_free"`
224 BytesUsed uint64 `json:"bytes_used"`
227 type NodeStatus struct {
228 Volumes []*VolumeStatus `json:"volumes"`
231 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
232 st := GetNodeStatus()
233 if jstat, err := json.Marshal(st); err == nil {
236 log.Printf("json.Marshal: %s\n", err)
237 log.Printf("NodeStatus = %v\n", st)
238 http.Error(resp, err.Error(), 500)
243 // Returns a NodeStatus struct describing this Keep
244 // node's current status.
246 func GetNodeStatus() *NodeStatus {
247 st := new(NodeStatus)
249 st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
250 for i, vol := range KeepVM.AllReadable() {
251 st.Volumes[i] = vol.Status()
257 // Returns a VolumeStatus describing the requested volume.
259 func GetVolumeStatus(volume string) *VolumeStatus {
260 var fs syscall.Statfs_t
263 if fi, err := os.Stat(volume); err == nil {
264 devnum = fi.Sys().(*syscall.Stat_t).Dev
266 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
270 err := syscall.Statfs(volume, &fs)
272 log.Printf("GetVolumeStatus: statfs: %s\n", err)
275 // These calculations match the way df calculates disk usage:
276 // "free" space is measured by fs.Bavail, but "used" space
277 // uses fs.Blocks - fs.Bfree.
278 free := fs.Bavail * uint64(fs.Bsize)
279 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
280 return &VolumeStatus{volume, devnum, free, used}
283 // DeleteHandler processes DELETE requests.
285 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
286 // from all connected volumes.
288 // Only the Data Manager, or an Arvados admin with scope "all", are
289 // allowed to issue DELETE requests. If a DELETE request is not
290 // authenticated or is issued by a non-admin user, the server returns
291 // a PermissionError.
293 // Upon receiving a valid request from an authorized user,
294 // DeleteHandler deletes all copies of the specified block on local
299 // If the requested blocks was not found on any volume, the response
300 // code is HTTP 404 Not Found.
302 // Otherwise, the response code is 200 OK, with a response body
303 // consisting of the JSON message
305 // {"copies_deleted":d,"copies_failed":f}
307 // where d and f are integers representing the number of blocks that
308 // were successfully and unsuccessfully deleted.
310 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
311 hash := mux.Vars(req)["hash"]
313 // Confirm that this user is an admin and has a token with unlimited scope.
314 var tok = GetApiToken(req)
315 if tok == "" || !CanDelete(tok) {
316 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
321 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
325 // Delete copies of this block from all available volumes.
326 // Report how many blocks were successfully deleted, and how
327 // many were found on writable volumes but not deleted.
329 Deleted int `json:"copies_deleted"`
330 Failed int `json:"copies_failed"`
332 for _, vol := range KeepVM.AllWritable() {
333 if err := vol.Delete(hash); err == nil {
335 } else if os.IsNotExist(err) {
339 log.Println("DeleteHandler:", err)
345 if result.Deleted == 0 && result.Failed == 0 {
346 st = http.StatusNotFound
353 if st == http.StatusOK {
354 if body, err := json.Marshal(result); err == nil {
357 log.Printf("json.Marshal: %s (result = %v)\n", err, result)
358 http.Error(resp, err.Error(), 500)
363 /* PullHandler processes "PUT /pull" requests for the data manager.
364 The request body is a JSON message containing a list of pull
365 requests in the following format:
369 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
371 "keep0.qr1hi.arvadosapi.com:25107",
372 "keep1.qr1hi.arvadosapi.com:25108"
376 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
386 Each pull request in the list consists of a block locator string
387 and an ordered list of servers. Keepstore should try to fetch the
388 block from each server in turn.
390 If the request has not been sent by the Data Manager, return 401
393 If the JSON unmarshalling fails, return 400 Bad Request.
396 type PullRequest struct {
397 Locator string `json:"locator"`
398 Servers []string `json:"servers"`
401 func PullHandler(resp http.ResponseWriter, req *http.Request) {
402 // Reject unauthorized requests.
403 if !IsDataManagerToken(GetApiToken(req)) {
404 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
408 // Parse the request body.
410 r := json.NewDecoder(req.Body)
411 if err := r.Decode(&pr); err != nil {
412 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
416 // We have a properly formatted pull list sent from the data
417 // manager. Report success and send the list to the pull list
418 // manager for further handling.
419 resp.WriteHeader(http.StatusOK)
421 fmt.Sprintf("Received %d pull requests\n", len(pr))))
424 for _, p := range pr {
427 pullq.ReplaceQueue(plist)
430 type TrashRequest struct {
431 Locator string `json:"locator"`
432 BlockMtime int64 `json:"block_mtime"`
435 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
436 // Reject unauthorized requests.
437 if !IsDataManagerToken(GetApiToken(req)) {
438 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
442 // Parse the request body.
443 var trash []TrashRequest
444 r := json.NewDecoder(req.Body)
445 if err := r.Decode(&trash); err != nil {
446 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
450 // We have a properly formatted trash list sent from the data
451 // manager. Report success and send the list to the trash work
452 // queue for further handling.
453 resp.WriteHeader(http.StatusOK)
455 fmt.Sprintf("Received %d trash requests\n", len(trash))))
458 for _, t := range trash {
461 trashq.ReplaceQueue(tlist)
464 // ==============================
465 // GetBlock and PutBlock implement lower-level code for handling
466 // blocks by rooting through volumes connected to the local machine.
467 // Once the handler has determined that system policy permits the
468 // request, it calls these methods to perform the actual operation.
470 // TODO(twp): this code would probably be better located in the
471 // VolumeManager interface. As an abstraction, the VolumeManager
472 // should be the only part of the code that cares about which volume a
473 // block is stored on, so it should be responsible for figuring out
474 // which volume to check for fetching blocks, storing blocks, etc.
476 // ==============================
477 // GetBlock fetches and returns the block identified by "hash". If
478 // the update_timestamp argument is true, GetBlock also updates the
479 // block's file modification time (for the sake of PutBlock, which
480 // must update the file's timestamp when the block already exists).
482 // On success, GetBlock returns a byte slice with the block data, and
485 // If the block cannot be found on any volume, returns NotFoundError.
487 // If the block found does not have the correct MD5 hash, returns
491 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
492 // Attempt to read the requested hash from a keep volume.
493 error_to_caller := NotFoundError
496 if update_timestamp {
497 // Pointless to find the block on an unwritable volume
498 // because Touch() will fail -- this is as good as
499 // "not found" for purposes of callers who need to
501 vols = KeepVM.AllWritable()
503 vols = KeepVM.AllReadable()
506 for _, vol := range vols {
507 buf, err := vol.Get(hash)
509 // IsNotExist is an expected error and may be
510 // ignored. All other errors are logged. In
511 // any case we continue trying to read other
512 // volumes. If all volumes report IsNotExist,
513 // we return a NotFoundError.
514 if !os.IsNotExist(err) {
515 log.Printf("GetBlock: reading %s: %s\n", hash, err)
519 // Check the file checksum.
521 filehash := fmt.Sprintf("%x", md5.Sum(buf))
522 if filehash != hash {
523 // TODO: Try harder to tell a sysadmin about
525 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
527 error_to_caller = DiskHashError
530 if error_to_caller == DiskHashError {
531 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
534 if update_timestamp {
535 if err := vol.Touch(hash); err != nil {
536 error_to_caller = GenericError
537 log.Printf("%s: Touch %s failed: %s",
538 vol, hash, error_to_caller)
544 return nil, error_to_caller
547 /* PutBlock(block, hash)
548 Stores the BLOCK (identified by the content id HASH) in Keep.
550 The MD5 checksum of the block must be identical to the content id HASH.
551 If not, an error is returned.
553 PutBlock stores the BLOCK on the first Keep volume with free space.
554 A failure code is returned to the user only if all volumes fail.
556 On success, PutBlock returns nil.
557 On failure, it returns a KeepError with one of the following codes:
560 A different block with the same hash already exists on this
563 The MD5 hash of the BLOCK does not match the argument HASH.
565 There was not enough space left in any Keep volume to store
568 The object could not be stored for some other reason (e.g.
569 all writes failed). The text of the error message should
570 provide as much detail as possible.
573 func PutBlock(block []byte, hash string) error {
574 // Check that BLOCK's checksum matches HASH.
575 blockhash := fmt.Sprintf("%x", md5.Sum(block))
576 if blockhash != hash {
577 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
578 return RequestHashError
581 // If we already have a block on disk under this identifier, return
582 // success (but check for MD5 collisions). While fetching the block,
583 // update its timestamp.
584 // The only errors that GetBlock can return are DiskHashError and NotFoundError.
585 // In either case, we want to write our new (good) block to disk,
586 // so there is nothing special to do if err != nil.
588 if oldblock, err := GetBlock(hash, true); err == nil {
589 if bytes.Compare(block, oldblock) == 0 {
590 // The block already exists; return success.
593 return CollisionError
597 // Choose a Keep volume to write to.
598 // If this volume fails, try all of the volumes in order.
599 if vol := KeepVM.NextWritable(); vol != nil {
600 if err := vol.Put(hash, block); err == nil {
601 return nil // success!
605 writables := KeepVM.AllWritable()
606 if len(writables) == 0 {
607 log.Print("No writable volumes.")
612 for _, vol := range writables {
613 err := vol.Put(hash, block)
615 return nil // success!
617 if err != FullError {
618 // The volume is not full but the
619 // write did not succeed. Report the
620 // error and continue trying.
622 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
627 log.Print("All volumes are full.")
630 // Already logged the non-full errors.
636 // Return true if the specified string is a valid Keep locator.
637 // When Keep is extended to support hash types other than MD5,
638 // this should be updated to cover those as well.
640 func IsValidLocator(loc string) bool {
641 match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
645 log.Printf("IsValidLocator: %s\n", err)
649 // GetApiToken returns the OAuth2 token from the Authorization
650 // header of a HTTP request, or an empty string if no matching
652 func GetApiToken(req *http.Request) string {
653 if auth, ok := req.Header["Authorization"]; ok {
654 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
656 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
663 // IsExpired returns true if the given Unix timestamp (expressed as a
664 // hexadecimal string) is in the past, or if timestamp_hex cannot be
665 // parsed as a hexadecimal string.
666 func IsExpired(timestamp_hex string) bool {
667 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
669 log.Printf("IsExpired: %s\n", err)
672 return time.Unix(ts, 0).Before(time.Now())
675 // CanDelete returns true if the user identified by api_token is
676 // allowed to delete blocks.
677 func CanDelete(api_token string) bool {
681 // Blocks may be deleted only when Keep has been configured with a
683 if IsDataManagerToken(api_token) {
686 // TODO(twp): look up api_token with the API server
687 // return true if is_admin is true and if the token
688 // has unlimited scope
692 // IsDataManagerToken returns true if api_token represents the data
694 func IsDataManagerToken(api_token string) bool {
695 return data_manager_token != "" && api_token == data_manager_token