3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
16 "github.com/gorilla/mux"
28 // MakeRESTRouter returns a new mux.Router that forwards all Keep
29 // requests to the appropriate handlers.
31 func MakeRESTRouter() *mux.Router {
32 rest := mux.NewRouter()
35 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37 `/{hash:[0-9a-f]{32}}+{hints}`,
38 GetBlockHandler).Methods("GET", "HEAD")
40 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
41 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
42 // List all blocks stored here. Privileged client only.
43 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
44 // List blocks stored here whose hash has the given prefix.
45 // Privileged client only.
46 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
48 // List volumes: path, device number, bytes used/avail.
49 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
51 // Replace the current pull queue.
52 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
54 // Replace the current trash queue.
55 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
57 // Any request which does not match any of these routes gets
59 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
64 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
65 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
68 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
69 hash := mux.Vars(req)["hash"]
71 hints := mux.Vars(req)["hints"]
73 // Parse the locator string and hints from the request.
74 // TODO(twp): implement a Locator type.
75 var signature, timestamp string
77 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
78 for _, hint := range strings.Split(hints, "+") {
79 if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
80 // Server ignores size hints
81 } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
84 } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
85 // Any unknown hint that starts with an uppercase letter is
86 // presumed to be valid and ignored, to permit forward compatibility.
88 // Unknown format; not a valid locator.
89 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
95 // If permission checking is in effect, verify this
96 // request's permission signature.
97 if enforce_permissions {
98 if signature == "" || timestamp == "" {
99 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
101 } else if IsExpired(timestamp) {
102 http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
105 req_locator := req.URL.Path[1:] // strip leading slash
106 if !VerifySignature(req_locator, GetApiToken(req)) {
107 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
113 block, err := GetBlock(hash, false)
115 // This type assertion is safe because the only errors
116 // GetBlock can return are DiskHashError or NotFoundError.
117 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
120 defer bufs.Put(block)
122 resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
123 resp.Header().Set("Content-Type", "application/octet-stream")
127 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
128 hash := mux.Vars(req)["hash"]
130 // Detect as many error conditions as possible before reading
131 // the body: avoid transmitting data that will not end up
132 // being written anyway.
134 if req.ContentLength == -1 {
135 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
139 if req.ContentLength > BLOCKSIZE {
140 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
144 if len(KeepVM.AllWritable()) == 0 {
145 http.Error(resp, FullError.Error(), FullError.HTTPCode)
149 buf := bufs.Get(int(req.ContentLength))
150 _, err := io.ReadFull(req.Body, buf)
152 http.Error(resp, err.Error(), 500)
157 err = PutBlock(buf, hash)
161 ke := err.(*KeepError)
162 http.Error(resp, ke.Error(), ke.HTTPCode)
166 // Success; add a size hint, sign the locator if possible, and
167 // return it to the client.
168 return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
169 api_token := GetApiToken(req)
170 if PermissionSecret != nil && api_token != "" {
171 expiry := time.Now().Add(blob_signature_ttl)
172 return_hash = SignLocator(return_hash, api_token, expiry)
174 resp.Write([]byte(return_hash + "\n"))
178 // A HandleFunc to address /index and /index/{prefix} requests.
180 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
181 // Reject unauthorized requests.
182 if !IsDataManagerToken(GetApiToken(req)) {
183 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
187 prefix := mux.Vars(req)["prefix"]
189 for _, vol := range KeepVM.AllReadable() {
190 if err := vol.IndexTo(prefix, resp); err != nil {
191 // The only errors returned by IndexTo are
192 // write errors returned by resp.Write(),
193 // which probably means the client has
194 // disconnected and this error will never be
195 // reported to the client -- but it will
196 // appear in our own error log.
197 http.Error(resp, err.Error(), http.StatusInternalServerError)
204 // Responds to /status.json requests with the current node status,
205 // described in a JSON structure.
207 // The data given in a status.json response includes:
208 // volumes - a list of Keep volumes currently in use by this server
209 // each volume is an object with the following fields:
211 // * device_num (an integer identifying the underlying filesystem)
215 type VolumeStatus struct {
216 MountPoint string `json:"mount_point"`
217 DeviceNum uint64 `json:"device_num"`
218 BytesFree uint64 `json:"bytes_free"`
219 BytesUsed uint64 `json:"bytes_used"`
222 type NodeStatus struct {
223 Volumes []*VolumeStatus `json:"volumes"`
226 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
227 st := GetNodeStatus()
228 if jstat, err := json.Marshal(st); err == nil {
231 log.Printf("json.Marshal: %s\n", err)
232 log.Printf("NodeStatus = %v\n", st)
233 http.Error(resp, err.Error(), 500)
238 // Returns a NodeStatus struct describing this Keep
239 // node's current status.
241 func GetNodeStatus() *NodeStatus {
242 st := new(NodeStatus)
244 st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
245 for i, vol := range KeepVM.AllReadable() {
246 st.Volumes[i] = vol.Status()
252 // Returns a VolumeStatus describing the requested volume.
254 func GetVolumeStatus(volume string) *VolumeStatus {
255 var fs syscall.Statfs_t
258 if fi, err := os.Stat(volume); err == nil {
259 devnum = fi.Sys().(*syscall.Stat_t).Dev
261 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
265 err := syscall.Statfs(volume, &fs)
267 log.Printf("GetVolumeStatus: statfs: %s\n", err)
270 // These calculations match the way df calculates disk usage:
271 // "free" space is measured by fs.Bavail, but "used" space
272 // uses fs.Blocks - fs.Bfree.
273 free := fs.Bavail * uint64(fs.Bsize)
274 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
275 return &VolumeStatus{volume, devnum, free, used}
278 // DeleteHandler processes DELETE requests.
280 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
281 // from all connected volumes.
283 // Only the Data Manager, or an Arvados admin with scope "all", are
284 // allowed to issue DELETE requests. If a DELETE request is not
285 // authenticated or is issued by a non-admin user, the server returns
286 // a PermissionError.
288 // Upon receiving a valid request from an authorized user,
289 // DeleteHandler deletes all copies of the specified block on local
294 // If the requested blocks was not found on any volume, the response
295 // code is HTTP 404 Not Found.
297 // Otherwise, the response code is 200 OK, with a response body
298 // consisting of the JSON message
300 // {"copies_deleted":d,"copies_failed":f}
302 // where d and f are integers representing the number of blocks that
303 // were successfully and unsuccessfully deleted.
305 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
306 hash := mux.Vars(req)["hash"]
308 // Confirm that this user is an admin and has a token with unlimited scope.
309 var tok = GetApiToken(req)
310 if tok == "" || !CanDelete(tok) {
311 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
316 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
320 // Delete copies of this block from all available volumes.
321 // Report how many blocks were successfully deleted, and how
322 // many were found on writable volumes but not deleted.
324 Deleted int `json:"copies_deleted"`
325 Failed int `json:"copies_failed"`
327 for _, vol := range KeepVM.AllWritable() {
328 if err := vol.Delete(hash); err == nil {
330 } else if os.IsNotExist(err) {
334 log.Println("DeleteHandler:", err)
340 if result.Deleted == 0 && result.Failed == 0 {
341 st = http.StatusNotFound
348 if st == http.StatusOK {
349 if body, err := json.Marshal(result); err == nil {
352 log.Printf("json.Marshal: %s (result = %v)\n", err, result)
353 http.Error(resp, err.Error(), 500)
358 /* PullHandler processes "PUT /pull" requests for the data manager.
359 The request body is a JSON message containing a list of pull
360 requests in the following format:
364 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
366 "keep0.qr1hi.arvadosapi.com:25107",
367 "keep1.qr1hi.arvadosapi.com:25108"
371 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
381 Each pull request in the list consists of a block locator string
382 and an ordered list of servers. Keepstore should try to fetch the
383 block from each server in turn.
385 If the request has not been sent by the Data Manager, return 401
388 If the JSON unmarshalling fails, return 400 Bad Request.
391 type PullRequest struct {
392 Locator string `json:"locator"`
393 Servers []string `json:"servers"`
396 func PullHandler(resp http.ResponseWriter, req *http.Request) {
397 // Reject unauthorized requests.
398 if !IsDataManagerToken(GetApiToken(req)) {
399 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
403 // Parse the request body.
405 r := json.NewDecoder(req.Body)
406 if err := r.Decode(&pr); err != nil {
407 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
411 // We have a properly formatted pull list sent from the data
412 // manager. Report success and send the list to the pull list
413 // manager for further handling.
414 resp.WriteHeader(http.StatusOK)
416 fmt.Sprintf("Received %d pull requests\n", len(pr))))
419 for _, p := range pr {
422 pullq.ReplaceQueue(plist)
425 type TrashRequest struct {
426 Locator string `json:"locator"`
427 BlockMtime int64 `json:"block_mtime"`
430 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
431 // Reject unauthorized requests.
432 if !IsDataManagerToken(GetApiToken(req)) {
433 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
437 // Parse the request body.
438 var trash []TrashRequest
439 r := json.NewDecoder(req.Body)
440 if err := r.Decode(&trash); err != nil {
441 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
445 // We have a properly formatted trash list sent from the data
446 // manager. Report success and send the list to the trash work
447 // queue for further handling.
448 resp.WriteHeader(http.StatusOK)
450 fmt.Sprintf("Received %d trash requests\n", len(trash))))
453 for _, t := range trash {
456 trashq.ReplaceQueue(tlist)
459 // ==============================
460 // GetBlock and PutBlock implement lower-level code for handling
461 // blocks by rooting through volumes connected to the local machine.
462 // Once the handler has determined that system policy permits the
463 // request, it calls these methods to perform the actual operation.
465 // TODO(twp): this code would probably be better located in the
466 // VolumeManager interface. As an abstraction, the VolumeManager
467 // should be the only part of the code that cares about which volume a
468 // block is stored on, so it should be responsible for figuring out
469 // which volume to check for fetching blocks, storing blocks, etc.
471 // ==============================
472 // GetBlock fetches and returns the block identified by "hash". If
473 // the update_timestamp argument is true, GetBlock also updates the
474 // block's file modification time (for the sake of PutBlock, which
475 // must update the file's timestamp when the block already exists).
477 // On success, GetBlock returns a byte slice with the block data, and
480 // If the block cannot be found on any volume, returns NotFoundError.
482 // If the block found does not have the correct MD5 hash, returns
486 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
487 // Attempt to read the requested hash from a keep volume.
488 error_to_caller := NotFoundError
491 if update_timestamp {
492 // Pointless to find the block on an unwritable volume
493 // because Touch() will fail -- this is as good as
494 // "not found" for purposes of callers who need to
496 vols = KeepVM.AllWritable()
498 vols = KeepVM.AllReadable()
501 for _, vol := range vols {
502 buf, err := vol.Get(hash)
504 // IsNotExist is an expected error and may be
505 // ignored. All other errors are logged. In
506 // any case we continue trying to read other
507 // volumes. If all volumes report IsNotExist,
508 // we return a NotFoundError.
509 if !os.IsNotExist(err) {
510 log.Printf("GetBlock: reading %s: %s\n", hash, err)
514 // Check the file checksum.
516 filehash := fmt.Sprintf("%x", md5.Sum(buf))
517 if filehash != hash {
518 // TODO: Try harder to tell a sysadmin about
520 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
522 error_to_caller = DiskHashError
526 if error_to_caller == DiskHashError {
527 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
530 if update_timestamp {
531 if err := vol.Touch(hash); err != nil {
532 error_to_caller = GenericError
533 log.Printf("%s: Touch %s failed: %s",
534 vol, hash, error_to_caller)
541 return nil, error_to_caller
544 /* PutBlock(block, hash)
545 Stores the BLOCK (identified by the content id HASH) in Keep.
547 The MD5 checksum of the block must be identical to the content id HASH.
548 If not, an error is returned.
550 PutBlock stores the BLOCK on the first Keep volume with free space.
551 A failure code is returned to the user only if all volumes fail.
553 On success, PutBlock returns nil.
554 On failure, it returns a KeepError with one of the following codes:
557 A different block with the same hash already exists on this
560 The MD5 hash of the BLOCK does not match the argument HASH.
562 There was not enough space left in any Keep volume to store
565 The object could not be stored for some other reason (e.g.
566 all writes failed). The text of the error message should
567 provide as much detail as possible.
570 func PutBlock(block []byte, hash string) error {
571 // Check that BLOCK's checksum matches HASH.
572 blockhash := fmt.Sprintf("%x", md5.Sum(block))
573 if blockhash != hash {
574 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
575 return RequestHashError
578 // If we already have a block on disk under this identifier, return
579 // success (but check for MD5 collisions). While fetching the block,
580 // update its timestamp.
581 // The only errors that GetBlock can return are DiskHashError and NotFoundError.
582 // In either case, we want to write our new (good) block to disk,
583 // so there is nothing special to do if err != nil.
585 if oldblock, err := GetBlock(hash, true); err == nil {
586 defer bufs.Put(oldblock)
587 if bytes.Compare(block, oldblock) == 0 {
588 // The block already exists; return success.
591 return CollisionError
595 // Choose a Keep volume to write to.
596 // If this volume fails, try all of the volumes in order.
597 if vol := KeepVM.NextWritable(); vol != nil {
598 if err := vol.Put(hash, block); err == nil {
599 return nil // success!
603 writables := KeepVM.AllWritable()
604 if len(writables) == 0 {
605 log.Print("No writable volumes.")
610 for _, vol := range writables {
611 err := vol.Put(hash, block)
613 return nil // success!
615 if err != FullError {
616 // The volume is not full but the
617 // write did not succeed. Report the
618 // error and continue trying.
620 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
625 log.Print("All volumes are full.")
628 // Already logged the non-full errors.
634 // Return true if the specified string is a valid Keep locator.
635 // When Keep is extended to support hash types other than MD5,
636 // this should be updated to cover those as well.
638 func IsValidLocator(loc string) bool {
639 match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
643 log.Printf("IsValidLocator: %s\n", err)
647 // GetApiToken returns the OAuth2 token from the Authorization
648 // header of a HTTP request, or an empty string if no matching
650 func GetApiToken(req *http.Request) string {
651 if auth, ok := req.Header["Authorization"]; ok {
652 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
654 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
661 // IsExpired returns true if the given Unix timestamp (expressed as a
662 // hexadecimal string) is in the past, or if timestamp_hex cannot be
663 // parsed as a hexadecimal string.
664 func IsExpired(timestamp_hex string) bool {
665 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
667 log.Printf("IsExpired: %s\n", err)
670 return time.Unix(ts, 0).Before(time.Now())
673 // CanDelete returns true if the user identified by api_token is
674 // allowed to delete blocks.
675 func CanDelete(api_token string) bool {
679 // Blocks may be deleted only when Keep has been configured with a
681 if IsDataManagerToken(api_token) {
684 // TODO(twp): look up api_token with the API server
685 // return true if is_admin is true and if the token
686 // has unlimited scope
690 // IsDataManagerToken returns true if api_token represents the data
692 func IsDataManagerToken(api_token string) bool {
693 return data_manager_token != "" && api_token == data_manager_token