3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
16 "github.com/gorilla/mux"
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
32 func MakeRESTRouter() *mux.Router {
33 rest := mux.NewRouter()
36 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
38 `/{hash:[0-9a-f]{32}}+{hints}`,
39 GetBlockHandler).Methods("GET", "HEAD")
41 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
43 // List all blocks stored here. Privileged client only.
44 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
45 // List blocks stored here whose hash has the given prefix.
46 // Privileged client only.
47 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
49 // List volumes: path, device number, bytes used/avail.
50 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
52 // Replace the current pull queue.
53 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
55 // Replace the current trash queue.
56 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
58 // Untrash moves blocks from trash back into store
59 rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
61 // Any request which does not match any of these routes gets
63 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
68 // BadRequestHandler is a HandleFunc to address bad requests.
69 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
70 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
73 // GetBlockHandler is a HandleFunc to address Get block requests.
74 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
75 ctx, cancel := contextForResponse(context.TODO(), resp)
78 if theConfig.RequireSignatures {
79 locator := req.URL.Path[1:] // strip leading slash
80 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
81 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
86 // TODO: Probe volumes to check whether the block _might_
87 // exist. Some volumes/types could support a quick existence
88 // check without causing other operations to suffer. If all
89 // volumes support that, and assure us the block definitely
90 // isn't here, we can return 404 now instead of waiting for a
93 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
95 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
100 size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
102 code := http.StatusInternalServerError
103 if err, ok := err.(*KeepError); ok {
106 http.Error(resp, err.Error(), code)
110 resp.Header().Set("Content-Length", strconv.Itoa(size))
111 resp.Header().Set("Content-Type", "application/octet-stream")
112 resp.Write(buf[:size])
115 // Return a new context that gets cancelled by resp's CloseNotifier.
116 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
117 ctx, cancel := context.WithCancel(parent)
118 if cn, ok := resp.(http.CloseNotifier); ok {
119 go func(c <-chan bool) {
122 theConfig.debugLogf("cancel context")
131 // Get a buffer from the pool -- but give up and return a non-nil
132 // error if ctx ends before we get a buffer.
133 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
134 bufReady := make(chan []byte)
136 bufReady <- bufs.Get(bufSize)
139 case buf := <-bufReady:
143 // Even if closeNotifier happened first, we
144 // need to keep waiting for our buf so we can
145 // return it to the pool.
148 return nil, ErrClientDisconnect
152 // PutBlockHandler is a HandleFunc to address Put block requests.
153 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
154 ctx, cancel := contextForResponse(context.TODO(), resp)
157 hash := mux.Vars(req)["hash"]
159 // Detect as many error conditions as possible before reading
160 // the body: avoid transmitting data that will not end up
161 // being written anyway.
163 if req.ContentLength == -1 {
164 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
168 if req.ContentLength > BlockSize {
169 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
173 if len(KeepVM.AllWritable()) == 0 {
174 http.Error(resp, FullError.Error(), FullError.HTTPCode)
178 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
180 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
184 _, err = io.ReadFull(req.Body, buf)
186 http.Error(resp, err.Error(), 500)
191 replication, err := PutBlock(ctx, buf, hash)
195 code := http.StatusInternalServerError
196 if err, ok := err.(*KeepError); ok {
199 http.Error(resp, err.Error(), code)
203 // Success; add a size hint, sign the locator if possible, and
204 // return it to the client.
205 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
206 apiToken := GetAPIToken(req)
207 if theConfig.blobSigningKey != nil && apiToken != "" {
208 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
209 returnHash = SignLocator(returnHash, apiToken, expiry)
211 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
212 resp.Write([]byte(returnHash + "\n"))
215 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
216 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
217 // Reject unauthorized requests.
218 if !IsSystemAuth(GetAPIToken(req)) {
219 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
223 prefix := mux.Vars(req)["prefix"]
225 for _, vol := range KeepVM.AllReadable() {
226 if err := vol.IndexTo(prefix, resp); err != nil {
227 // The only errors returned by IndexTo are
228 // write errors returned by resp.Write(),
229 // which probably means the client has
230 // disconnected and this error will never be
231 // reported to the client -- but it will
232 // appear in our own error log.
233 http.Error(resp, err.Error(), http.StatusInternalServerError)
237 // An empty line at EOF is the only way the client can be
238 // assured the entire index was received.
239 resp.Write([]byte{'\n'})
243 // Responds to /status.json requests with the current node status,
244 // described in a JSON structure.
246 // The data given in a status.json response includes:
247 // volumes - a list of Keep volumes currently in use by this server
248 // each volume is an object with the following fields:
250 // * device_num (an integer identifying the underlying filesystem)
255 type PoolStatus struct {
256 Alloc uint64 `json:"BytesAllocated"`
257 Cap int `json:"BuffersMax"`
258 Len int `json:"BuffersInUse"`
262 type NodeStatus struct {
263 Volumes []*VolumeStatus `json:"volumes"`
264 BufferPool PoolStatus
265 PullQueue WorkQueueStatus
266 TrashQueue WorkQueueStatus
267 Memory runtime.MemStats
271 var stLock sync.Mutex
273 // StatusHandler addresses /status.json requests.
274 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
277 jstat, err := json.Marshal(&st)
282 log.Printf("json.Marshal: %s", err)
283 log.Printf("NodeStatus = %v", &st)
284 http.Error(resp, err.Error(), 500)
288 // populate the given NodeStatus struct with current values.
289 func readNodeStatus(st *NodeStatus) {
290 vols := KeepVM.AllReadable()
291 if cap(st.Volumes) < len(vols) {
292 st.Volumes = make([]*VolumeStatus, len(vols))
294 st.Volumes = st.Volumes[:0]
295 for _, vol := range vols {
296 if s := vol.Status(); s != nil {
297 st.Volumes = append(st.Volumes, s)
300 st.BufferPool.Alloc = bufs.Alloc()
301 st.BufferPool.Cap = bufs.Cap()
302 st.BufferPool.Len = bufs.Len()
303 st.PullQueue = getWorkQueueStatus(pullq)
304 st.TrashQueue = getWorkQueueStatus(trashq)
305 runtime.ReadMemStats(&st.Memory)
308 // return a WorkQueueStatus for the given queue. If q is nil (which
309 // should never happen except in test suites), return a zero status
310 // value instead of crashing.
311 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
313 // This should only happen during tests.
314 return WorkQueueStatus{}
319 // DeleteHandler processes DELETE requests.
321 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
322 // from all connected volumes.
324 // Only the Data Manager, or an Arvados admin with scope "all", are
325 // allowed to issue DELETE requests. If a DELETE request is not
326 // authenticated or is issued by a non-admin user, the server returns
327 // a PermissionError.
329 // Upon receiving a valid request from an authorized user,
330 // DeleteHandler deletes all copies of the specified block on local
335 // If the requested blocks was not found on any volume, the response
336 // code is HTTP 404 Not Found.
338 // Otherwise, the response code is 200 OK, with a response body
339 // consisting of the JSON message
341 // {"copies_deleted":d,"copies_failed":f}
343 // where d and f are integers representing the number of blocks that
344 // were successfully and unsuccessfully deleted.
346 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
347 hash := mux.Vars(req)["hash"]
349 // Confirm that this user is an admin and has a token with unlimited scope.
350 var tok = GetAPIToken(req)
351 if tok == "" || !CanDelete(tok) {
352 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
356 if !theConfig.EnableDelete {
357 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
361 // Delete copies of this block from all available volumes.
362 // Report how many blocks were successfully deleted, and how
363 // many were found on writable volumes but not deleted.
365 Deleted int `json:"copies_deleted"`
366 Failed int `json:"copies_failed"`
368 for _, vol := range KeepVM.AllWritable() {
369 if err := vol.Trash(hash); err == nil {
371 } else if os.IsNotExist(err) {
375 log.Println("DeleteHandler:", err)
381 if result.Deleted == 0 && result.Failed == 0 {
382 st = http.StatusNotFound
389 if st == http.StatusOK {
390 if body, err := json.Marshal(result); err == nil {
393 log.Printf("json.Marshal: %s (result = %v)", err, result)
394 http.Error(resp, err.Error(), 500)
399 /* PullHandler processes "PUT /pull" requests for the data manager.
400 The request body is a JSON message containing a list of pull
401 requests in the following format:
405 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
407 "keep0.qr1hi.arvadosapi.com:25107",
408 "keep1.qr1hi.arvadosapi.com:25108"
412 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
422 Each pull request in the list consists of a block locator string
423 and an ordered list of servers. Keepstore should try to fetch the
424 block from each server in turn.
426 If the request has not been sent by the Data Manager, return 401
429 If the JSON unmarshalling fails, return 400 Bad Request.
432 // PullRequest consists of a block locator and an ordered list of servers
433 type PullRequest struct {
434 Locator string `json:"locator"`
435 Servers []string `json:"servers"`
438 // PullHandler processes "PUT /pull" requests for the data manager.
439 func PullHandler(resp http.ResponseWriter, req *http.Request) {
440 // Reject unauthorized requests.
441 if !IsSystemAuth(GetAPIToken(req)) {
442 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
446 // Parse the request body.
448 r := json.NewDecoder(req.Body)
449 if err := r.Decode(&pr); err != nil {
450 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
454 // We have a properly formatted pull list sent from the data
455 // manager. Report success and send the list to the pull list
456 // manager for further handling.
457 resp.WriteHeader(http.StatusOK)
459 fmt.Sprintf("Received %d pull requests\n", len(pr))))
462 for _, p := range pr {
465 pullq.ReplaceQueue(plist)
468 // TrashRequest consists of a block locator and it's Mtime
469 type TrashRequest struct {
470 Locator string `json:"locator"`
471 BlockMtime int64 `json:"block_mtime"`
474 // TrashHandler processes /trash requests.
475 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
476 // Reject unauthorized requests.
477 if !IsSystemAuth(GetAPIToken(req)) {
478 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
482 // Parse the request body.
483 var trash []TrashRequest
484 r := json.NewDecoder(req.Body)
485 if err := r.Decode(&trash); err != nil {
486 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
490 // We have a properly formatted trash list sent from the data
491 // manager. Report success and send the list to the trash work
492 // queue for further handling.
493 resp.WriteHeader(http.StatusOK)
495 fmt.Sprintf("Received %d trash requests\n", len(trash))))
498 for _, t := range trash {
501 trashq.ReplaceQueue(tlist)
504 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
505 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
506 // Reject unauthorized requests.
507 if !IsSystemAuth(GetAPIToken(req)) {
508 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
512 hash := mux.Vars(req)["hash"]
514 if len(KeepVM.AllWritable()) == 0 {
515 http.Error(resp, "No writable volumes", http.StatusNotFound)
519 var untrashedOn, failedOn []string
521 for _, vol := range KeepVM.AllWritable() {
522 err := vol.Untrash(hash)
524 if os.IsNotExist(err) {
526 } else if err != nil {
527 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
528 failedOn = append(failedOn, vol.String())
530 log.Printf("Untrashed %v on volume %v", hash, vol.String())
531 untrashedOn = append(untrashedOn, vol.String())
535 if numNotFound == len(KeepVM.AllWritable()) {
536 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
540 if len(failedOn) == len(KeepVM.AllWritable()) {
541 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
543 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
544 if len(failedOn) > 0 {
545 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
547 resp.Write([]byte(respBody))
551 // GetBlock and PutBlock implement lower-level code for handling
552 // blocks by rooting through volumes connected to the local machine.
553 // Once the handler has determined that system policy permits the
554 // request, it calls these methods to perform the actual operation.
556 // TODO(twp): this code would probably be better located in the
557 // VolumeManager interface. As an abstraction, the VolumeManager
558 // should be the only part of the code that cares about which volume a
559 // block is stored on, so it should be responsible for figuring out
560 // which volume to check for fetching blocks, storing blocks, etc.
562 // GetBlock fetches the block identified by "hash" into the provided
563 // buf, and returns the data size.
565 // If the block cannot be found on any volume, returns NotFoundError.
567 // If the block found does not have the correct MD5 hash, returns
570 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
571 // Attempt to read the requested hash from a keep volume.
572 errorToCaller := NotFoundError
574 for _, vol := range KeepVM.AllReadable() {
575 size, err := vol.Get(ctx, hash, buf)
578 return 0, ErrClientDisconnect
582 // IsNotExist is an expected error and may be
583 // ignored. All other errors are logged. In
584 // any case we continue trying to read other
585 // volumes. If all volumes report IsNotExist,
586 // we return a NotFoundError.
587 if !os.IsNotExist(err) {
588 log.Printf("%s: Get(%s): %s", vol, hash, err)
592 // Check the file checksum.
594 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
595 if filehash != hash {
596 // TODO: Try harder to tell a sysadmin about
598 log.Printf("%s: checksum mismatch for request %s (actual %s)",
600 errorToCaller = DiskHashError
603 if errorToCaller == DiskHashError {
604 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
609 return 0, errorToCaller
612 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
614 // PutBlock(ctx, block, hash)
615 // Stores the BLOCK (identified by the content id HASH) in Keep.
617 // The MD5 checksum of the block must be identical to the content id HASH.
618 // If not, an error is returned.
620 // PutBlock stores the BLOCK on the first Keep volume with free space.
621 // A failure code is returned to the user only if all volumes fail.
623 // On success, PutBlock returns nil.
624 // On failure, it returns a KeepError with one of the following codes:
627 // A different block with the same hash already exists on this
630 // The MD5 hash of the BLOCK does not match the argument HASH.
632 // There was not enough space left in any Keep volume to store
635 // The object could not be stored for some other reason (e.g.
636 // all writes failed). The text of the error message should
637 // provide as much detail as possible.
639 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
640 // Check that BLOCK's checksum matches HASH.
641 blockhash := fmt.Sprintf("%x", md5.Sum(block))
642 if blockhash != hash {
643 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
644 return 0, RequestHashError
647 // If we already have this data, it's intact on disk, and we
648 // can update its timestamp, return success. If we have
649 // different data with the same hash, return failure.
650 if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
652 } else if ctx.Err() != nil {
653 return 0, ErrClientDisconnect
656 // Choose a Keep volume to write to.
657 // If this volume fails, try all of the volumes in order.
658 if vol := KeepVM.NextWritable(); vol != nil {
659 if err := vol.Put(ctx, hash, block); err == nil {
660 return vol.Replication(), nil // success!
662 if ctx.Err() != nil {
663 return 0, ErrClientDisconnect
667 writables := KeepVM.AllWritable()
668 if len(writables) == 0 {
669 log.Print("No writable volumes.")
674 for _, vol := range writables {
675 err := vol.Put(ctx, hash, block)
676 if ctx.Err() != nil {
677 return 0, ErrClientDisconnect
680 return vol.Replication(), nil // success!
682 if err != FullError {
683 // The volume is not full but the
684 // write did not succeed. Report the
685 // error and continue trying.
687 log.Printf("%s: Write(%s): %s", vol, hash, err)
692 log.Print("All volumes are full.")
695 // Already logged the non-full errors.
696 return 0, GenericError
699 // CompareAndTouch returns the current replication level if one of the
700 // volumes already has the given content and it successfully updates
701 // the relevant block's modification time in order to protect it from
702 // premature garbage collection. Otherwise, it returns a non-nil
704 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
705 var bestErr error = NotFoundError
706 for _, vol := range KeepVM.AllWritable() {
707 err := vol.Compare(ctx, hash, buf)
708 if ctx.Err() != nil {
710 } else if err == CollisionError {
711 // Stop if we have a block with same hash but
712 // different content. (It will be impossible
713 // to tell which one is wanted if we have
714 // both, so there's no point writing it even
715 // on a different volume.)
716 log.Printf("%s: Compare(%s): %s", vol, hash, err)
718 } else if os.IsNotExist(err) {
719 // Block does not exist. This is the only
720 // "normal" error: we don't log anything.
722 } else if err != nil {
723 // Couldn't open file, data is corrupt on
724 // disk, etc.: log this abnormal condition,
725 // and try the next volume.
726 log.Printf("%s: Compare(%s): %s", vol, hash, err)
729 if err := vol.Touch(hash); err != nil {
730 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
734 // Compare and Touch both worked --> done.
735 return vol.Replication(), nil
740 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
742 // IsValidLocator returns true if the specified string is a valid Keep locator.
743 // When Keep is extended to support hash types other than MD5,
744 // this should be updated to cover those as well.
746 func IsValidLocator(loc string) bool {
747 return validLocatorRe.MatchString(loc)
750 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
752 // GetAPIToken returns the OAuth2 token from the Authorization
753 // header of a HTTP request, or an empty string if no matching
755 func GetAPIToken(req *http.Request) string {
756 if auth, ok := req.Header["Authorization"]; ok {
757 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
764 // IsExpired returns true if the given Unix timestamp (expressed as a
765 // hexadecimal string) is in the past, or if timestampHex cannot be
766 // parsed as a hexadecimal string.
767 func IsExpired(timestampHex string) bool {
768 ts, err := strconv.ParseInt(timestampHex, 16, 0)
770 log.Printf("IsExpired: %s", err)
773 return time.Unix(ts, 0).Before(time.Now())
776 // CanDelete returns true if the user identified by apiToken is
777 // allowed to delete blocks.
778 func CanDelete(apiToken string) bool {
782 // Blocks may be deleted only when Keep has been configured with a
784 if IsSystemAuth(apiToken) {
787 // TODO(twp): look up apiToken with the API server
788 // return true if is_admin is true and if the token
789 // has unlimited scope
793 // IsSystemAuth returns true if the given token is allowed to perform
794 // system level actions like deleting data.
795 func IsSystemAuth(token string) bool {
796 return token != "" && token == theConfig.systemAuthToken