3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
26 "github.com/gorilla/mux"
28 "git.curoverse.com/arvados.git/sdk/go/httpserver"
29 log "github.com/Sirupsen/logrus"
34 limiter httpserver.RequestCounter
37 // MakeRESTRouter returns a new router that forwards all Keep requests
38 // to the appropriate handlers.
39 func MakeRESTRouter() *router {
40 rest := mux.NewRouter()
41 rtr := &router{Router: rest}
44 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
46 `/{hash:[0-9a-f]{32}}+{hints}`,
47 GetBlockHandler).Methods("GET", "HEAD")
49 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
50 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
51 // List all blocks stored here. Privileged client only.
52 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
53 // List blocks stored here whose hash has the given prefix.
54 // Privileged client only.
55 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
57 // Internals/debugging info (runtime.MemStats)
58 rest.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
60 // List volumes: path, device number, bytes used/avail.
61 rest.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
63 // List mounts: UUID, readonly, tier, device ID, ...
64 rest.HandleFunc(`/mounts`, rtr.Mounts).Methods("GET")
66 // Replace the current pull queue.
67 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
69 // Replace the current trash queue.
70 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
72 // Untrash moves blocks from trash back into store
73 rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
75 // Any request which does not match any of these routes gets
77 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
82 // BadRequestHandler is a HandleFunc to address bad requests.
83 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
84 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
87 // GetBlockHandler is a HandleFunc to address Get block requests.
88 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
89 ctx, cancel := contextForResponse(context.TODO(), resp)
92 if theConfig.RequireSignatures {
93 locator := req.URL.Path[1:] // strip leading slash
94 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
95 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
100 // TODO: Probe volumes to check whether the block _might_
101 // exist. Some volumes/types could support a quick existence
102 // check without causing other operations to suffer. If all
103 // volumes support that, and assure us the block definitely
104 // isn't here, we can return 404 now instead of waiting for a
107 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
109 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
114 size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
116 code := http.StatusInternalServerError
117 if err, ok := err.(*KeepError); ok {
120 http.Error(resp, err.Error(), code)
124 resp.Header().Set("Content-Length", strconv.Itoa(size))
125 resp.Header().Set("Content-Type", "application/octet-stream")
126 resp.Write(buf[:size])
129 // Return a new context that gets cancelled by resp's CloseNotifier.
130 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
131 ctx, cancel := context.WithCancel(parent)
132 if cn, ok := resp.(http.CloseNotifier); ok {
133 go func(c <-chan bool) {
136 theConfig.debugLogf("cancel context")
145 // Get a buffer from the pool -- but give up and return a non-nil
146 // error if ctx ends before we get a buffer.
147 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
148 bufReady := make(chan []byte)
150 bufReady <- bufs.Get(bufSize)
153 case buf := <-bufReady:
157 // Even if closeNotifier happened first, we
158 // need to keep waiting for our buf so we can
159 // return it to the pool.
162 return nil, ErrClientDisconnect
166 // PutBlockHandler is a HandleFunc to address Put block requests.
167 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
168 ctx, cancel := contextForResponse(context.TODO(), resp)
171 hash := mux.Vars(req)["hash"]
173 // Detect as many error conditions as possible before reading
174 // the body: avoid transmitting data that will not end up
175 // being written anyway.
177 if req.ContentLength == -1 {
178 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
182 if req.ContentLength > BlockSize {
183 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
187 if len(KeepVM.AllWritable()) == 0 {
188 http.Error(resp, FullError.Error(), FullError.HTTPCode)
192 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
194 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
198 _, err = io.ReadFull(req.Body, buf)
200 http.Error(resp, err.Error(), 500)
205 replication, err := PutBlock(ctx, buf, hash)
209 code := http.StatusInternalServerError
210 if err, ok := err.(*KeepError); ok {
213 http.Error(resp, err.Error(), code)
217 // Success; add a size hint, sign the locator if possible, and
218 // return it to the client.
219 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
220 apiToken := GetAPIToken(req)
221 if theConfig.blobSigningKey != nil && apiToken != "" {
222 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
223 returnHash = SignLocator(returnHash, apiToken, expiry)
225 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
226 resp.Write([]byte(returnHash + "\n"))
229 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
230 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
231 // Reject unauthorized requests.
232 if !IsSystemAuth(GetAPIToken(req)) {
233 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
237 prefix := mux.Vars(req)["prefix"]
239 for _, vol := range KeepVM.AllReadable() {
240 if err := vol.IndexTo(prefix, resp); err != nil {
241 // The only errors returned by IndexTo are
242 // write errors returned by resp.Write(),
243 // which probably means the client has
244 // disconnected and this error will never be
245 // reported to the client -- but it will
246 // appear in our own error log.
247 http.Error(resp, err.Error(), http.StatusInternalServerError)
251 // An empty line at EOF is the only way the client can be
252 // assured the entire index was received.
253 resp.Write([]byte{'\n'})
256 // Mounts responds to "GET /mounts" requests.
257 func (rtr *router) Mounts(resp http.ResponseWriter, req *http.Request) {
258 err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
260 http.Error(resp, err.Error(), http.StatusInternalServerError)
265 type PoolStatus struct {
266 Alloc uint64 `json:"BytesAllocated"`
267 Cap int `json:"BuffersMax"`
268 Len int `json:"BuffersInUse"`
271 type volumeStatusEnt struct {
273 Status *VolumeStatus `json:",omitempty"`
274 VolumeStats *ioStats `json:",omitempty"`
275 InternalStats interface{} `json:",omitempty"`
279 type NodeStatus struct {
280 Volumes []*volumeStatusEnt
281 BufferPool PoolStatus
282 PullQueue WorkQueueStatus
283 TrashQueue WorkQueueStatus
289 var stLock sync.Mutex
291 // DebugHandler addresses /debug.json requests.
292 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
293 type debugStats struct {
294 MemStats runtime.MemStats
297 runtime.ReadMemStats(&ds.MemStats)
298 err := json.NewEncoder(resp).Encode(&ds)
300 http.Error(resp, err.Error(), 500)
304 // StatusHandler addresses /status.json requests.
305 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
307 rtr.readNodeStatus(&st)
308 jstat, err := json.Marshal(&st)
313 log.Printf("json.Marshal: %s", err)
314 log.Printf("NodeStatus = %v", &st)
315 http.Error(resp, err.Error(), 500)
319 // populate the given NodeStatus struct with current values.
320 func (rtr *router) readNodeStatus(st *NodeStatus) {
321 vols := KeepVM.AllReadable()
322 if cap(st.Volumes) < len(vols) {
323 st.Volumes = make([]*volumeStatusEnt, len(vols))
325 st.Volumes = st.Volumes[:0]
326 for _, vol := range vols {
327 var internalStats interface{}
328 if vol, ok := vol.(InternalStatser); ok {
329 internalStats = vol.InternalStats()
331 st.Volumes = append(st.Volumes, &volumeStatusEnt{
333 Status: vol.Status(),
334 InternalStats: internalStats,
335 //VolumeStats: KeepVM.VolumeStats(vol),
338 st.BufferPool.Alloc = bufs.Alloc()
339 st.BufferPool.Cap = bufs.Cap()
340 st.BufferPool.Len = bufs.Len()
341 st.PullQueue = getWorkQueueStatus(pullq)
342 st.TrashQueue = getWorkQueueStatus(trashq)
343 if rtr.limiter != nil {
344 st.RequestsCurrent = rtr.limiter.Current()
345 st.RequestsMax = rtr.limiter.Max()
349 // return a WorkQueueStatus for the given queue. If q is nil (which
350 // should never happen except in test suites), return a zero status
351 // value instead of crashing.
352 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
354 // This should only happen during tests.
355 return WorkQueueStatus{}
360 // DeleteHandler processes DELETE requests.
362 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
363 // from all connected volumes.
365 // Only the Data Manager, or an Arvados admin with scope "all", are
366 // allowed to issue DELETE requests. If a DELETE request is not
367 // authenticated or is issued by a non-admin user, the server returns
368 // a PermissionError.
370 // Upon receiving a valid request from an authorized user,
371 // DeleteHandler deletes all copies of the specified block on local
376 // If the requested blocks was not found on any volume, the response
377 // code is HTTP 404 Not Found.
379 // Otherwise, the response code is 200 OK, with a response body
380 // consisting of the JSON message
382 // {"copies_deleted":d,"copies_failed":f}
384 // where d and f are integers representing the number of blocks that
385 // were successfully and unsuccessfully deleted.
387 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
388 hash := mux.Vars(req)["hash"]
390 // Confirm that this user is an admin and has a token with unlimited scope.
391 var tok = GetAPIToken(req)
392 if tok == "" || !CanDelete(tok) {
393 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
397 if !theConfig.EnableDelete {
398 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
402 // Delete copies of this block from all available volumes.
403 // Report how many blocks were successfully deleted, and how
404 // many were found on writable volumes but not deleted.
406 Deleted int `json:"copies_deleted"`
407 Failed int `json:"copies_failed"`
409 for _, vol := range KeepVM.AllWritable() {
410 if err := vol.Trash(hash); err == nil {
412 } else if os.IsNotExist(err) {
416 log.Println("DeleteHandler:", err)
422 if result.Deleted == 0 && result.Failed == 0 {
423 st = http.StatusNotFound
430 if st == http.StatusOK {
431 if body, err := json.Marshal(result); err == nil {
434 log.Printf("json.Marshal: %s (result = %v)", err, result)
435 http.Error(resp, err.Error(), 500)
440 /* PullHandler processes "PUT /pull" requests for the data manager.
441 The request body is a JSON message containing a list of pull
442 requests in the following format:
446 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
448 "keep0.qr1hi.arvadosapi.com:25107",
449 "keep1.qr1hi.arvadosapi.com:25108"
453 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
463 Each pull request in the list consists of a block locator string
464 and an ordered list of servers. Keepstore should try to fetch the
465 block from each server in turn.
467 If the request has not been sent by the Data Manager, return 401
470 If the JSON unmarshalling fails, return 400 Bad Request.
473 // PullRequest consists of a block locator and an ordered list of servers
474 type PullRequest struct {
475 Locator string `json:"locator"`
476 Servers []string `json:"servers"`
479 // PullHandler processes "PUT /pull" requests for the data manager.
480 func PullHandler(resp http.ResponseWriter, req *http.Request) {
481 // Reject unauthorized requests.
482 if !IsSystemAuth(GetAPIToken(req)) {
483 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
487 // Parse the request body.
489 r := json.NewDecoder(req.Body)
490 if err := r.Decode(&pr); err != nil {
491 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
495 // We have a properly formatted pull list sent from the data
496 // manager. Report success and send the list to the pull list
497 // manager for further handling.
498 resp.WriteHeader(http.StatusOK)
500 fmt.Sprintf("Received %d pull requests\n", len(pr))))
503 for _, p := range pr {
506 pullq.ReplaceQueue(plist)
509 // TrashRequest consists of a block locator and it's Mtime
510 type TrashRequest struct {
511 Locator string `json:"locator"`
512 BlockMtime int64 `json:"block_mtime"`
515 // TrashHandler processes /trash requests.
516 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
517 // Reject unauthorized requests.
518 if !IsSystemAuth(GetAPIToken(req)) {
519 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
523 // Parse the request body.
524 var trash []TrashRequest
525 r := json.NewDecoder(req.Body)
526 if err := r.Decode(&trash); err != nil {
527 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
531 // We have a properly formatted trash list sent from the data
532 // manager. Report success and send the list to the trash work
533 // queue for further handling.
534 resp.WriteHeader(http.StatusOK)
536 fmt.Sprintf("Received %d trash requests\n", len(trash))))
539 for _, t := range trash {
542 trashq.ReplaceQueue(tlist)
545 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
546 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
547 // Reject unauthorized requests.
548 if !IsSystemAuth(GetAPIToken(req)) {
549 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
553 hash := mux.Vars(req)["hash"]
555 if len(KeepVM.AllWritable()) == 0 {
556 http.Error(resp, "No writable volumes", http.StatusNotFound)
560 var untrashedOn, failedOn []string
562 for _, vol := range KeepVM.AllWritable() {
563 err := vol.Untrash(hash)
565 if os.IsNotExist(err) {
567 } else if err != nil {
568 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
569 failedOn = append(failedOn, vol.String())
571 log.Printf("Untrashed %v on volume %v", hash, vol.String())
572 untrashedOn = append(untrashedOn, vol.String())
576 if numNotFound == len(KeepVM.AllWritable()) {
577 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
581 if len(failedOn) == len(KeepVM.AllWritable()) {
582 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
584 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
585 if len(failedOn) > 0 {
586 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
588 resp.Write([]byte(respBody))
592 // GetBlock and PutBlock implement lower-level code for handling
593 // blocks by rooting through volumes connected to the local machine.
594 // Once the handler has determined that system policy permits the
595 // request, it calls these methods to perform the actual operation.
597 // TODO(twp): this code would probably be better located in the
598 // VolumeManager interface. As an abstraction, the VolumeManager
599 // should be the only part of the code that cares about which volume a
600 // block is stored on, so it should be responsible for figuring out
601 // which volume to check for fetching blocks, storing blocks, etc.
603 // GetBlock fetches the block identified by "hash" into the provided
604 // buf, and returns the data size.
606 // If the block cannot be found on any volume, returns NotFoundError.
608 // If the block found does not have the correct MD5 hash, returns
611 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
612 // Attempt to read the requested hash from a keep volume.
613 errorToCaller := NotFoundError
615 for _, vol := range KeepVM.AllReadable() {
616 size, err := vol.Get(ctx, hash, buf)
619 return 0, ErrClientDisconnect
623 // IsNotExist is an expected error and may be
624 // ignored. All other errors are logged. In
625 // any case we continue trying to read other
626 // volumes. If all volumes report IsNotExist,
627 // we return a NotFoundError.
628 if !os.IsNotExist(err) {
629 log.Printf("%s: Get(%s): %s", vol, hash, err)
633 // Check the file checksum.
635 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
636 if filehash != hash {
637 // TODO: Try harder to tell a sysadmin about
639 log.Printf("%s: checksum mismatch for request %s (actual %s)",
641 errorToCaller = DiskHashError
644 if errorToCaller == DiskHashError {
645 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
650 return 0, errorToCaller
653 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
655 // PutBlock(ctx, block, hash)
656 // Stores the BLOCK (identified by the content id HASH) in Keep.
658 // The MD5 checksum of the block must be identical to the content id HASH.
659 // If not, an error is returned.
661 // PutBlock stores the BLOCK on the first Keep volume with free space.
662 // A failure code is returned to the user only if all volumes fail.
664 // On success, PutBlock returns nil.
665 // On failure, it returns a KeepError with one of the following codes:
668 // A different block with the same hash already exists on this
671 // The MD5 hash of the BLOCK does not match the argument HASH.
673 // There was not enough space left in any Keep volume to store
676 // The object could not be stored for some other reason (e.g.
677 // all writes failed). The text of the error message should
678 // provide as much detail as possible.
680 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
681 // Check that BLOCK's checksum matches HASH.
682 blockhash := fmt.Sprintf("%x", md5.Sum(block))
683 if blockhash != hash {
684 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
685 return 0, RequestHashError
688 // If we already have this data, it's intact on disk, and we
689 // can update its timestamp, return success. If we have
690 // different data with the same hash, return failure.
691 if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
693 } else if ctx.Err() != nil {
694 return 0, ErrClientDisconnect
697 // Choose a Keep volume to write to.
698 // If this volume fails, try all of the volumes in order.
699 if vol := KeepVM.NextWritable(); vol != nil {
700 if err := vol.Put(ctx, hash, block); err == nil {
701 return vol.Replication(), nil // success!
703 if ctx.Err() != nil {
704 return 0, ErrClientDisconnect
708 writables := KeepVM.AllWritable()
709 if len(writables) == 0 {
710 log.Print("No writable volumes.")
715 for _, vol := range writables {
716 err := vol.Put(ctx, hash, block)
717 if ctx.Err() != nil {
718 return 0, ErrClientDisconnect
721 return vol.Replication(), nil // success!
723 if err != FullError {
724 // The volume is not full but the
725 // write did not succeed. Report the
726 // error and continue trying.
728 log.Printf("%s: Write(%s): %s", vol, hash, err)
733 log.Print("All volumes are full.")
736 // Already logged the non-full errors.
737 return 0, GenericError
740 // CompareAndTouch returns the current replication level if one of the
741 // volumes already has the given content and it successfully updates
742 // the relevant block's modification time in order to protect it from
743 // premature garbage collection. Otherwise, it returns a non-nil
745 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
746 var bestErr error = NotFoundError
747 for _, vol := range KeepVM.AllWritable() {
748 err := vol.Compare(ctx, hash, buf)
749 if ctx.Err() != nil {
751 } else if err == CollisionError {
752 // Stop if we have a block with same hash but
753 // different content. (It will be impossible
754 // to tell which one is wanted if we have
755 // both, so there's no point writing it even
756 // on a different volume.)
757 log.Printf("%s: Compare(%s): %s", vol, hash, err)
759 } else if os.IsNotExist(err) {
760 // Block does not exist. This is the only
761 // "normal" error: we don't log anything.
763 } else if err != nil {
764 // Couldn't open file, data is corrupt on
765 // disk, etc.: log this abnormal condition,
766 // and try the next volume.
767 log.Printf("%s: Compare(%s): %s", vol, hash, err)
770 if err := vol.Touch(hash); err != nil {
771 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
775 // Compare and Touch both worked --> done.
776 return vol.Replication(), nil
781 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
783 // IsValidLocator returns true if the specified string is a valid Keep locator.
784 // When Keep is extended to support hash types other than MD5,
785 // this should be updated to cover those as well.
787 func IsValidLocator(loc string) bool {
788 return validLocatorRe.MatchString(loc)
791 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
793 // GetAPIToken returns the OAuth2 token from the Authorization
794 // header of a HTTP request, or an empty string if no matching
796 func GetAPIToken(req *http.Request) string {
797 if auth, ok := req.Header["Authorization"]; ok {
798 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
805 // IsExpired returns true if the given Unix timestamp (expressed as a
806 // hexadecimal string) is in the past, or if timestampHex cannot be
807 // parsed as a hexadecimal string.
808 func IsExpired(timestampHex string) bool {
809 ts, err := strconv.ParseInt(timestampHex, 16, 0)
811 log.Printf("IsExpired: %s", err)
814 return time.Unix(ts, 0).Before(time.Now())
817 // CanDelete returns true if the user identified by apiToken is
818 // allowed to delete blocks.
819 func CanDelete(apiToken string) bool {
823 // Blocks may be deleted only when Keep has been configured with a
825 if IsSystemAuth(apiToken) {
828 // TODO(twp): look up apiToken with the API server
829 // return true if is_admin is true and if the token
830 // has unlimited scope
834 // IsSystemAuth returns true if the given token is allowed to perform
835 // system level actions like deleting data.
836 func IsSystemAuth(token string) bool {
837 return token != "" && token == theConfig.systemAuthToken