3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
15 "github.com/gorilla/mux"
28 // MakeRESTRouter returns a new mux.Router that forwards all Keep
29 // requests to the appropriate handlers.
31 func MakeRESTRouter() *mux.Router {
32 rest := mux.NewRouter()
35 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37 `/{hash:[0-9a-f]{32}}+{hints}`,
38 GetBlockHandler).Methods("GET", "HEAD")
40 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
41 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
42 // List all blocks stored here. Privileged client only.
43 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
44 // List blocks stored here whose hash has the given prefix.
45 // Privileged client only.
46 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
48 // List volumes: path, device number, bytes used/avail.
49 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
51 // Replace the current pull queue.
52 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
54 // Replace the current trash queue.
55 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
57 // Untrash moves blocks from trash back into store
58 rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
60 // Any request which does not match any of these routes gets
62 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
67 // BadRequestHandler is a HandleFunc to address bad requests.
68 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
69 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
72 // GetBlockHandler is a HandleFunc to address Get block requests.
73 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
74 if enforcePermissions {
75 locator := req.URL.Path[1:] // strip leading slash
76 if err := VerifySignature(locator, GetApiToken(req)); err != nil {
77 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
82 block, err := GetBlock(mux.Vars(req)["hash"])
84 // This type assertion is safe because the only errors
85 // GetBlock can return are DiskHashError or NotFoundError.
86 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
91 resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
92 resp.Header().Set("Content-Type", "application/octet-stream")
96 // PutBlockHandler is a HandleFunc to address Put block requests.
97 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
98 hash := mux.Vars(req)["hash"]
100 // Detect as many error conditions as possible before reading
101 // the body: avoid transmitting data that will not end up
102 // being written anyway.
104 if req.ContentLength == -1 {
105 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
109 if req.ContentLength > BlockSize {
110 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
114 if len(KeepVM.AllWritable()) == 0 {
115 http.Error(resp, FullError.Error(), FullError.HTTPCode)
119 buf := bufs.Get(int(req.ContentLength))
120 _, err := io.ReadFull(req.Body, buf)
122 http.Error(resp, err.Error(), 500)
127 replication, err := PutBlock(buf, hash)
131 ke := err.(*KeepError)
132 http.Error(resp, ke.Error(), ke.HTTPCode)
136 // Success; add a size hint, sign the locator if possible, and
137 // return it to the client.
138 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
139 apiToken := GetApiToken(req)
140 if PermissionSecret != nil && apiToken != "" {
141 expiry := time.Now().Add(blobSignatureTTL)
142 returnHash = SignLocator(returnHash, apiToken, expiry)
144 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
145 resp.Write([]byte(returnHash + "\n"))
148 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
149 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
150 // Reject unauthorized requests.
151 if !IsDataManagerToken(GetApiToken(req)) {
152 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
156 prefix := mux.Vars(req)["prefix"]
158 for _, vol := range KeepVM.AllReadable() {
159 if err := vol.IndexTo(prefix, resp); err != nil {
160 // The only errors returned by IndexTo are
161 // write errors returned by resp.Write(),
162 // which probably means the client has
163 // disconnected and this error will never be
164 // reported to the client -- but it will
165 // appear in our own error log.
166 http.Error(resp, err.Error(), http.StatusInternalServerError)
170 // An empty line at EOF is the only way the client can be
171 // assured the entire index was received.
172 resp.Write([]byte{'\n'})
176 // Responds to /status.json requests with the current node status,
177 // described in a JSON structure.
179 // The data given in a status.json response includes:
180 // volumes - a list of Keep volumes currently in use by this server
181 // each volume is an object with the following fields:
183 // * device_num (an integer identifying the underlying filesystem)
188 type PoolStatus struct {
189 Alloc uint64 `json:"BytesAllocated"`
190 Cap int `json:"BuffersMax"`
191 Len int `json:"BuffersInUse"`
195 type NodeStatus struct {
196 Volumes []*VolumeStatus `json:"volumes"`
197 BufferPool PoolStatus
198 PullQueue WorkQueueStatus
199 TrashQueue WorkQueueStatus
200 Memory runtime.MemStats
204 var stLock sync.Mutex
206 // StatusHandler addresses /status.json requests.
207 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
210 jstat, err := json.Marshal(&st)
215 log.Printf("json.Marshal: %s", err)
216 log.Printf("NodeStatus = %v", &st)
217 http.Error(resp, err.Error(), 500)
221 // populate the given NodeStatus struct with current values.
222 func readNodeStatus(st *NodeStatus) {
223 vols := KeepVM.AllReadable()
224 if cap(st.Volumes) < len(vols) {
225 st.Volumes = make([]*VolumeStatus, len(vols))
227 st.Volumes = st.Volumes[:0]
228 for _, vol := range vols {
229 if s := vol.Status(); s != nil {
230 st.Volumes = append(st.Volumes, s)
233 st.BufferPool.Alloc = bufs.Alloc()
234 st.BufferPool.Cap = bufs.Cap()
235 st.BufferPool.Len = bufs.Len()
236 st.PullQueue = getWorkQueueStatus(pullq)
237 st.TrashQueue = getWorkQueueStatus(trashq)
238 runtime.ReadMemStats(&st.Memory)
241 // return a WorkQueueStatus for the given queue. If q is nil (which
242 // should never happen except in test suites), return a zero status
243 // value instead of crashing.
244 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
246 // This should only happen during tests.
247 return WorkQueueStatus{}
252 // DeleteHandler processes DELETE requests.
254 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
255 // from all connected volumes.
257 // Only the Data Manager, or an Arvados admin with scope "all", are
258 // allowed to issue DELETE requests. If a DELETE request is not
259 // authenticated or is issued by a non-admin user, the server returns
260 // a PermissionError.
262 // Upon receiving a valid request from an authorized user,
263 // DeleteHandler deletes all copies of the specified block on local
268 // If the requested blocks was not found on any volume, the response
269 // code is HTTP 404 Not Found.
271 // Otherwise, the response code is 200 OK, with a response body
272 // consisting of the JSON message
274 // {"copies_deleted":d,"copies_failed":f}
276 // where d and f are integers representing the number of blocks that
277 // were successfully and unsuccessfully deleted.
279 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
280 hash := mux.Vars(req)["hash"]
282 // Confirm that this user is an admin and has a token with unlimited scope.
283 var tok = GetApiToken(req)
284 if tok == "" || !CanDelete(tok) {
285 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
290 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
294 // Delete copies of this block from all available volumes.
295 // Report how many blocks were successfully deleted, and how
296 // many were found on writable volumes but not deleted.
298 Deleted int `json:"copies_deleted"`
299 Failed int `json:"copies_failed"`
301 for _, vol := range KeepVM.AllWritable() {
302 if err := vol.Trash(hash); err == nil {
304 } else if os.IsNotExist(err) {
308 log.Println("DeleteHandler:", err)
314 if result.Deleted == 0 && result.Failed == 0 {
315 st = http.StatusNotFound
322 if st == http.StatusOK {
323 if body, err := json.Marshal(result); err == nil {
326 log.Printf("json.Marshal: %s (result = %v)", err, result)
327 http.Error(resp, err.Error(), 500)
332 /* PullHandler processes "PUT /pull" requests for the data manager.
333 The request body is a JSON message containing a list of pull
334 requests in the following format:
338 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
340 "keep0.qr1hi.arvadosapi.com:25107",
341 "keep1.qr1hi.arvadosapi.com:25108"
345 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
355 Each pull request in the list consists of a block locator string
356 and an ordered list of servers. Keepstore should try to fetch the
357 block from each server in turn.
359 If the request has not been sent by the Data Manager, return 401
362 If the JSON unmarshalling fails, return 400 Bad Request.
365 // PullRequest consists of a block locator and an ordered list of servers
366 type PullRequest struct {
367 Locator string `json:"locator"`
368 Servers []string `json:"servers"`
371 // PullHandler processes "PUT /pull" requests for the data manager.
372 func PullHandler(resp http.ResponseWriter, req *http.Request) {
373 // Reject unauthorized requests.
374 if !IsDataManagerToken(GetApiToken(req)) {
375 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
379 // Parse the request body.
381 r := json.NewDecoder(req.Body)
382 if err := r.Decode(&pr); err != nil {
383 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
387 // We have a properly formatted pull list sent from the data
388 // manager. Report success and send the list to the pull list
389 // manager for further handling.
390 resp.WriteHeader(http.StatusOK)
392 fmt.Sprintf("Received %d pull requests\n", len(pr))))
395 for _, p := range pr {
398 pullq.ReplaceQueue(plist)
401 // TrashRequest consists of a block locator and it's Mtime
402 type TrashRequest struct {
403 Locator string `json:"locator"`
404 BlockMtime int64 `json:"block_mtime"`
407 // TrashHandler processes /trash requests.
408 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
409 // Reject unauthorized requests.
410 if !IsDataManagerToken(GetApiToken(req)) {
411 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
415 // Parse the request body.
416 var trash []TrashRequest
417 r := json.NewDecoder(req.Body)
418 if err := r.Decode(&trash); err != nil {
419 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
423 // We have a properly formatted trash list sent from the data
424 // manager. Report success and send the list to the trash work
425 // queue for further handling.
426 resp.WriteHeader(http.StatusOK)
428 fmt.Sprintf("Received %d trash requests\n", len(trash))))
431 for _, t := range trash {
434 trashq.ReplaceQueue(tlist)
437 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
438 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
439 // Reject unauthorized requests.
440 if !IsDataManagerToken(GetApiToken(req)) {
441 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
445 hash := mux.Vars(req)["hash"]
447 if len(KeepVM.AllWritable()) == 0 {
448 http.Error(resp, "No writable volumes", http.StatusNotFound)
452 var untrashedOn, failedOn []string
454 for _, vol := range KeepVM.AllWritable() {
455 err := vol.Untrash(hash)
456 if err == nil || err == ErrNotImplemented {
457 log.Printf("Untrashed %v on volume %v", hash, vol.String())
458 untrashedOn = append(untrashedOn, vol.String())
460 if os.IsNotExist(err) {
463 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
464 failedOn = append(failedOn, vol.String())
469 if numNotFound == len(KeepVM.AllWritable()) {
470 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
474 if len(failedOn) == len(KeepVM.AllWritable()) {
475 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
477 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
478 if len(failedOn) > 0 {
479 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
481 resp.Write([]byte(respBody))
485 // ==============================
486 // GetBlock and PutBlock implement lower-level code for handling
487 // blocks by rooting through volumes connected to the local machine.
488 // Once the handler has determined that system policy permits the
489 // request, it calls these methods to perform the actual operation.
491 // TODO(twp): this code would probably be better located in the
492 // VolumeManager interface. As an abstraction, the VolumeManager
493 // should be the only part of the code that cares about which volume a
494 // block is stored on, so it should be responsible for figuring out
495 // which volume to check for fetching blocks, storing blocks, etc.
496 // ==============================
498 // GetBlock fetches and returns the block identified by "hash".
500 // On success, GetBlock returns a byte slice with the block data, and
503 // If the block cannot be found on any volume, returns NotFoundError.
505 // If the block found does not have the correct MD5 hash, returns
508 func GetBlock(hash string) ([]byte, error) {
509 // Attempt to read the requested hash from a keep volume.
510 errorToCaller := NotFoundError
512 for _, vol := range KeepVM.AllReadable() {
513 buf, err := vol.Get(hash)
515 // IsNotExist is an expected error and may be
516 // ignored. All other errors are logged. In
517 // any case we continue trying to read other
518 // volumes. If all volumes report IsNotExist,
519 // we return a NotFoundError.
520 if !os.IsNotExist(err) {
521 log.Printf("%s: Get(%s): %s", vol, hash, err)
525 // Check the file checksum.
527 filehash := fmt.Sprintf("%x", md5.Sum(buf))
528 if filehash != hash {
529 // TODO: Try harder to tell a sysadmin about
531 log.Printf("%s: checksum mismatch for request %s (actual %s)",
533 errorToCaller = DiskHashError
537 if errorToCaller == DiskHashError {
538 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
543 return nil, errorToCaller
546 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
548 // PutBlock(block, hash)
549 // Stores the BLOCK (identified by the content id HASH) in Keep.
551 // The MD5 checksum of the block must be identical to the content id HASH.
552 // If not, an error is returned.
554 // PutBlock stores the BLOCK on the first Keep volume with free space.
555 // A failure code is returned to the user only if all volumes fail.
557 // On success, PutBlock returns nil.
558 // On failure, it returns a KeepError with one of the following codes:
561 // A different block with the same hash already exists on this
564 // The MD5 hash of the BLOCK does not match the argument HASH.
566 // There was not enough space left in any Keep volume to store
569 // The object could not be stored for some other reason (e.g.
570 // all writes failed). The text of the error message should
571 // provide as much detail as possible.
573 func PutBlock(block []byte, hash string) (int, error) {
574 // Check that BLOCK's checksum matches HASH.
575 blockhash := fmt.Sprintf("%x", md5.Sum(block))
576 if blockhash != hash {
577 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
578 return 0, RequestHashError
581 // If we already have this data, it's intact on disk, and we
582 // can update its timestamp, return success. If we have
583 // different data with the same hash, return failure.
584 if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
588 // Choose a Keep volume to write to.
589 // If this volume fails, try all of the volumes in order.
590 if vol := KeepVM.NextWritable(); vol != nil {
591 if err := vol.Put(hash, block); err == nil {
592 return vol.Replication(), nil // success!
596 writables := KeepVM.AllWritable()
597 if len(writables) == 0 {
598 log.Print("No writable volumes.")
603 for _, vol := range writables {
604 err := vol.Put(hash, block)
606 return vol.Replication(), nil // success!
608 if err != FullError {
609 // The volume is not full but the
610 // write did not succeed. Report the
611 // error and continue trying.
613 log.Printf("%s: Write(%s): %s", vol, hash, err)
618 log.Print("All volumes are full.")
621 // Already logged the non-full errors.
622 return 0, GenericError
625 // CompareAndTouch returns the current replication level if one of the
626 // volumes already has the given content and it successfully updates
627 // the relevant block's modification time in order to protect it from
628 // premature garbage collection. Otherwise, it returns a non-nil
630 func CompareAndTouch(hash string, buf []byte) (int, error) {
631 var bestErr error = NotFoundError
632 for _, vol := range KeepVM.AllWritable() {
633 if err := vol.Compare(hash, buf); err == CollisionError {
634 // Stop if we have a block with same hash but
635 // different content. (It will be impossible
636 // to tell which one is wanted if we have
637 // both, so there's no point writing it even
638 // on a different volume.)
639 log.Printf("%s: Compare(%s): %s", vol, hash, err)
641 } else if os.IsNotExist(err) {
642 // Block does not exist. This is the only
643 // "normal" error: we don't log anything.
645 } else if err != nil {
646 // Couldn't open file, data is corrupt on
647 // disk, etc.: log this abnormal condition,
648 // and try the next volume.
649 log.Printf("%s: Compare(%s): %s", vol, hash, err)
652 if err := vol.Touch(hash); err != nil {
653 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
657 // Compare and Touch both worked --> done.
658 return vol.Replication(), nil
663 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
665 // IsValidLocator returns true if the specified string is a valid Keep locator.
666 // When Keep is extended to support hash types other than MD5,
667 // this should be updated to cover those as well.
669 func IsValidLocator(loc string) bool {
670 return validLocatorRe.MatchString(loc)
673 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
675 // GetApiToken returns the OAuth2 token from the Authorization
676 // header of a HTTP request, or an empty string if no matching
678 func GetApiToken(req *http.Request) string {
679 if auth, ok := req.Header["Authorization"]; ok {
680 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
687 // IsExpired returns true if the given Unix timestamp (expressed as a
688 // hexadecimal string) is in the past, or if timestampHex cannot be
689 // parsed as a hexadecimal string.
690 func IsExpired(timestampHex string) bool {
691 ts, err := strconv.ParseInt(timestampHex, 16, 0)
693 log.Printf("IsExpired: %s", err)
696 return time.Unix(ts, 0).Before(time.Now())
699 // CanDelete returns true if the user identified by apiToken is
700 // allowed to delete blocks.
701 func CanDelete(apiToken string) bool {
705 // Blocks may be deleted only when Keep has been configured with a
707 if IsDataManagerToken(apiToken) {
710 // TODO(twp): look up apiToken with the API server
711 // return true if is_admin is true and if the token
712 // has unlimited scope
716 // IsDataManagerToken returns true if apiToken represents the data
718 func IsDataManagerToken(apiToken string) bool {
719 return dataManagerToken != "" && apiToken == dataManagerToken