3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
16 "github.com/gorilla/mux"
27 log "github.com/Sirupsen/logrus"
30 // MakeRESTRouter returns a new mux.Router that forwards all Keep
31 // requests to the appropriate handlers.
33 func MakeRESTRouter() *mux.Router {
34 rest := mux.NewRouter()
37 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
39 `/{hash:[0-9a-f]{32}}+{hints}`,
40 GetBlockHandler).Methods("GET", "HEAD")
42 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
43 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
44 // List all blocks stored here. Privileged client only.
45 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
46 // List blocks stored here whose hash has the given prefix.
47 // Privileged client only.
48 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
50 // Internals/debugging info (runtime.MemStats)
51 rest.HandleFunc(`/debug.json`, DebugHandler).Methods("GET", "HEAD")
53 // List volumes: path, device number, bytes used/avail.
54 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
56 // Replace the current pull queue.
57 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
59 // Replace the current trash queue.
60 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
62 // Untrash moves blocks from trash back into store
63 rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
65 // Any request which does not match any of these routes gets
67 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
72 // BadRequestHandler is a HandleFunc to address bad requests.
73 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
74 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
77 // GetBlockHandler is a HandleFunc to address Get block requests.
78 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
79 ctx, cancel := contextForResponse(context.TODO(), resp)
82 if theConfig.RequireSignatures {
83 locator := req.URL.Path[1:] // strip leading slash
84 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
85 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
90 // TODO: Probe volumes to check whether the block _might_
91 // exist. Some volumes/types could support a quick existence
92 // check without causing other operations to suffer. If all
93 // volumes support that, and assure us the block definitely
94 // isn't here, we can return 404 now instead of waiting for a
97 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
99 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
104 size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
106 code := http.StatusInternalServerError
107 if err, ok := err.(*KeepError); ok {
110 http.Error(resp, err.Error(), code)
114 resp.Header().Set("Content-Length", strconv.Itoa(size))
115 resp.Header().Set("Content-Type", "application/octet-stream")
116 resp.Write(buf[:size])
119 // Return a new context that gets cancelled by resp's CloseNotifier.
120 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
121 ctx, cancel := context.WithCancel(parent)
122 if cn, ok := resp.(http.CloseNotifier); ok {
123 go func(c <-chan bool) {
126 theConfig.debugLogf("cancel context")
135 // Get a buffer from the pool -- but give up and return a non-nil
136 // error if ctx ends before we get a buffer.
137 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
138 bufReady := make(chan []byte)
140 bufReady <- bufs.Get(bufSize)
143 case buf := <-bufReady:
147 // Even if closeNotifier happened first, we
148 // need to keep waiting for our buf so we can
149 // return it to the pool.
152 return nil, ErrClientDisconnect
156 // PutBlockHandler is a HandleFunc to address Put block requests.
157 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
158 ctx, cancel := contextForResponse(context.TODO(), resp)
161 hash := mux.Vars(req)["hash"]
163 // Detect as many error conditions as possible before reading
164 // the body: avoid transmitting data that will not end up
165 // being written anyway.
167 if req.ContentLength == -1 {
168 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
172 if req.ContentLength > BlockSize {
173 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
177 if len(KeepVM.AllWritable()) == 0 {
178 http.Error(resp, FullError.Error(), FullError.HTTPCode)
182 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
184 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
188 _, err = io.ReadFull(req.Body, buf)
190 http.Error(resp, err.Error(), 500)
195 replication, err := PutBlock(ctx, buf, hash)
199 code := http.StatusInternalServerError
200 if err, ok := err.(*KeepError); ok {
203 http.Error(resp, err.Error(), code)
207 // Success; add a size hint, sign the locator if possible, and
208 // return it to the client.
209 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
210 apiToken := GetAPIToken(req)
211 if theConfig.blobSigningKey != nil && apiToken != "" {
212 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
213 returnHash = SignLocator(returnHash, apiToken, expiry)
215 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
216 resp.Write([]byte(returnHash + "\n"))
219 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
220 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
221 // Reject unauthorized requests.
222 if !IsSystemAuth(GetAPIToken(req)) {
223 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
227 prefix := mux.Vars(req)["prefix"]
229 for _, vol := range KeepVM.AllReadable() {
230 if err := vol.IndexTo(prefix, resp); err != nil {
231 // The only errors returned by IndexTo are
232 // write errors returned by resp.Write(),
233 // which probably means the client has
234 // disconnected and this error will never be
235 // reported to the client -- but it will
236 // appear in our own error log.
237 http.Error(resp, err.Error(), http.StatusInternalServerError)
241 // An empty line at EOF is the only way the client can be
242 // assured the entire index was received.
243 resp.Write([]byte{'\n'})
247 type PoolStatus struct {
248 Alloc uint64 `json:"BytesAllocated"`
249 Cap int `json:"BuffersMax"`
250 Len int `json:"BuffersInUse"`
253 type volumeStatusEnt struct {
255 Status *VolumeStatus `json:",omitempty"`
256 VolumeStats *ioStats `json:",omitempty"`
257 InternalStats interface{} `json:",omitempty"`
261 type NodeStatus struct {
262 Volumes []*volumeStatusEnt
263 BufferPool PoolStatus
264 PullQueue WorkQueueStatus
265 TrashQueue WorkQueueStatus
269 var stLock sync.Mutex
271 // DebugHandler addresses /debug.json requests.
272 func DebugHandler(resp http.ResponseWriter, req *http.Request) {
273 type debugStats struct {
274 MemStats runtime.MemStats
277 runtime.ReadMemStats(&ds.MemStats)
278 err := json.NewEncoder(resp).Encode(&ds)
280 http.Error(resp, err.Error(), 500)
284 // StatusHandler addresses /status.json requests.
285 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
288 jstat, err := json.Marshal(&st)
293 log.Printf("json.Marshal: %s", err)
294 log.Printf("NodeStatus = %v", &st)
295 http.Error(resp, err.Error(), 500)
299 // populate the given NodeStatus struct with current values.
300 func readNodeStatus(st *NodeStatus) {
301 vols := KeepVM.AllReadable()
302 if cap(st.Volumes) < len(vols) {
303 st.Volumes = make([]*volumeStatusEnt, len(vols))
305 st.Volumes = st.Volumes[:0]
306 for _, vol := range vols {
307 var internalStats interface{}
308 if vol, ok := vol.(InternalStatser); ok {
309 internalStats = vol.InternalStats()
311 st.Volumes = append(st.Volumes, &volumeStatusEnt{
313 Status: vol.Status(),
314 InternalStats: internalStats,
315 //VolumeStats: KeepVM.VolumeStats(vol),
318 st.BufferPool.Alloc = bufs.Alloc()
319 st.BufferPool.Cap = bufs.Cap()
320 st.BufferPool.Len = bufs.Len()
321 st.PullQueue = getWorkQueueStatus(pullq)
322 st.TrashQueue = getWorkQueueStatus(trashq)
325 // return a WorkQueueStatus for the given queue. If q is nil (which
326 // should never happen except in test suites), return a zero status
327 // value instead of crashing.
328 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
330 // This should only happen during tests.
331 return WorkQueueStatus{}
336 // DeleteHandler processes DELETE requests.
338 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
339 // from all connected volumes.
341 // Only the Data Manager, or an Arvados admin with scope "all", are
342 // allowed to issue DELETE requests. If a DELETE request is not
343 // authenticated or is issued by a non-admin user, the server returns
344 // a PermissionError.
346 // Upon receiving a valid request from an authorized user,
347 // DeleteHandler deletes all copies of the specified block on local
352 // If the requested blocks was not found on any volume, the response
353 // code is HTTP 404 Not Found.
355 // Otherwise, the response code is 200 OK, with a response body
356 // consisting of the JSON message
358 // {"copies_deleted":d,"copies_failed":f}
360 // where d and f are integers representing the number of blocks that
361 // were successfully and unsuccessfully deleted.
363 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
364 hash := mux.Vars(req)["hash"]
366 // Confirm that this user is an admin and has a token with unlimited scope.
367 var tok = GetAPIToken(req)
368 if tok == "" || !CanDelete(tok) {
369 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
373 if !theConfig.EnableDelete {
374 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
378 // Delete copies of this block from all available volumes.
379 // Report how many blocks were successfully deleted, and how
380 // many were found on writable volumes but not deleted.
382 Deleted int `json:"copies_deleted"`
383 Failed int `json:"copies_failed"`
385 for _, vol := range KeepVM.AllWritable() {
386 if err := vol.Trash(hash); err == nil {
388 } else if os.IsNotExist(err) {
392 log.Println("DeleteHandler:", err)
398 if result.Deleted == 0 && result.Failed == 0 {
399 st = http.StatusNotFound
406 if st == http.StatusOK {
407 if body, err := json.Marshal(result); err == nil {
410 log.Printf("json.Marshal: %s (result = %v)", err, result)
411 http.Error(resp, err.Error(), 500)
416 /* PullHandler processes "PUT /pull" requests for the data manager.
417 The request body is a JSON message containing a list of pull
418 requests in the following format:
422 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
424 "keep0.qr1hi.arvadosapi.com:25107",
425 "keep1.qr1hi.arvadosapi.com:25108"
429 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
439 Each pull request in the list consists of a block locator string
440 and an ordered list of servers. Keepstore should try to fetch the
441 block from each server in turn.
443 If the request has not been sent by the Data Manager, return 401
446 If the JSON unmarshalling fails, return 400 Bad Request.
449 // PullRequest consists of a block locator and an ordered list of servers
450 type PullRequest struct {
451 Locator string `json:"locator"`
452 Servers []string `json:"servers"`
455 // PullHandler processes "PUT /pull" requests for the data manager.
456 func PullHandler(resp http.ResponseWriter, req *http.Request) {
457 // Reject unauthorized requests.
458 if !IsSystemAuth(GetAPIToken(req)) {
459 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
463 // Parse the request body.
465 r := json.NewDecoder(req.Body)
466 if err := r.Decode(&pr); err != nil {
467 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
471 // We have a properly formatted pull list sent from the data
472 // manager. Report success and send the list to the pull list
473 // manager for further handling.
474 resp.WriteHeader(http.StatusOK)
476 fmt.Sprintf("Received %d pull requests\n", len(pr))))
479 for _, p := range pr {
482 pullq.ReplaceQueue(plist)
485 // TrashRequest consists of a block locator and it's Mtime
486 type TrashRequest struct {
487 Locator string `json:"locator"`
488 BlockMtime int64 `json:"block_mtime"`
491 // TrashHandler processes /trash requests.
492 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
493 // Reject unauthorized requests.
494 if !IsSystemAuth(GetAPIToken(req)) {
495 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
499 // Parse the request body.
500 var trash []TrashRequest
501 r := json.NewDecoder(req.Body)
502 if err := r.Decode(&trash); err != nil {
503 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
507 // We have a properly formatted trash list sent from the data
508 // manager. Report success and send the list to the trash work
509 // queue for further handling.
510 resp.WriteHeader(http.StatusOK)
512 fmt.Sprintf("Received %d trash requests\n", len(trash))))
515 for _, t := range trash {
518 trashq.ReplaceQueue(tlist)
521 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
522 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
523 // Reject unauthorized requests.
524 if !IsSystemAuth(GetAPIToken(req)) {
525 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
529 hash := mux.Vars(req)["hash"]
531 if len(KeepVM.AllWritable()) == 0 {
532 http.Error(resp, "No writable volumes", http.StatusNotFound)
536 var untrashedOn, failedOn []string
538 for _, vol := range KeepVM.AllWritable() {
539 err := vol.Untrash(hash)
541 if os.IsNotExist(err) {
543 } else if err != nil {
544 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
545 failedOn = append(failedOn, vol.String())
547 log.Printf("Untrashed %v on volume %v", hash, vol.String())
548 untrashedOn = append(untrashedOn, vol.String())
552 if numNotFound == len(KeepVM.AllWritable()) {
553 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
557 if len(failedOn) == len(KeepVM.AllWritable()) {
558 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
560 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
561 if len(failedOn) > 0 {
562 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
564 resp.Write([]byte(respBody))
568 // GetBlock and PutBlock implement lower-level code for handling
569 // blocks by rooting through volumes connected to the local machine.
570 // Once the handler has determined that system policy permits the
571 // request, it calls these methods to perform the actual operation.
573 // TODO(twp): this code would probably be better located in the
574 // VolumeManager interface. As an abstraction, the VolumeManager
575 // should be the only part of the code that cares about which volume a
576 // block is stored on, so it should be responsible for figuring out
577 // which volume to check for fetching blocks, storing blocks, etc.
579 // GetBlock fetches the block identified by "hash" into the provided
580 // buf, and returns the data size.
582 // If the block cannot be found on any volume, returns NotFoundError.
584 // If the block found does not have the correct MD5 hash, returns
587 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
588 // Attempt to read the requested hash from a keep volume.
589 errorToCaller := NotFoundError
591 for _, vol := range KeepVM.AllReadable() {
592 size, err := vol.Get(ctx, hash, buf)
595 return 0, ErrClientDisconnect
599 // IsNotExist is an expected error and may be
600 // ignored. All other errors are logged. In
601 // any case we continue trying to read other
602 // volumes. If all volumes report IsNotExist,
603 // we return a NotFoundError.
604 if !os.IsNotExist(err) {
605 log.Printf("%s: Get(%s): %s", vol, hash, err)
609 // Check the file checksum.
611 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
612 if filehash != hash {
613 // TODO: Try harder to tell a sysadmin about
615 log.Printf("%s: checksum mismatch for request %s (actual %s)",
617 errorToCaller = DiskHashError
620 if errorToCaller == DiskHashError {
621 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
626 return 0, errorToCaller
629 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
631 // PutBlock(ctx, block, hash)
632 // Stores the BLOCK (identified by the content id HASH) in Keep.
634 // The MD5 checksum of the block must be identical to the content id HASH.
635 // If not, an error is returned.
637 // PutBlock stores the BLOCK on the first Keep volume with free space.
638 // A failure code is returned to the user only if all volumes fail.
640 // On success, PutBlock returns nil.
641 // On failure, it returns a KeepError with one of the following codes:
644 // A different block with the same hash already exists on this
647 // The MD5 hash of the BLOCK does not match the argument HASH.
649 // There was not enough space left in any Keep volume to store
652 // The object could not be stored for some other reason (e.g.
653 // all writes failed). The text of the error message should
654 // provide as much detail as possible.
656 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
657 // Check that BLOCK's checksum matches HASH.
658 blockhash := fmt.Sprintf("%x", md5.Sum(block))
659 if blockhash != hash {
660 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
661 return 0, RequestHashError
664 // If we already have this data, it's intact on disk, and we
665 // can update its timestamp, return success. If we have
666 // different data with the same hash, return failure.
667 if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
669 } else if ctx.Err() != nil {
670 return 0, ErrClientDisconnect
673 // Choose a Keep volume to write to.
674 // If this volume fails, try all of the volumes in order.
675 if vol := KeepVM.NextWritable(); vol != nil {
676 if err := vol.Put(ctx, hash, block); err == nil {
677 return vol.Replication(), nil // success!
679 if ctx.Err() != nil {
680 return 0, ErrClientDisconnect
684 writables := KeepVM.AllWritable()
685 if len(writables) == 0 {
686 log.Print("No writable volumes.")
691 for _, vol := range writables {
692 err := vol.Put(ctx, hash, block)
693 if ctx.Err() != nil {
694 return 0, ErrClientDisconnect
697 return vol.Replication(), nil // success!
699 if err != FullError {
700 // The volume is not full but the
701 // write did not succeed. Report the
702 // error and continue trying.
704 log.Printf("%s: Write(%s): %s", vol, hash, err)
709 log.Print("All volumes are full.")
712 // Already logged the non-full errors.
713 return 0, GenericError
716 // CompareAndTouch returns the current replication level if one of the
717 // volumes already has the given content and it successfully updates
718 // the relevant block's modification time in order to protect it from
719 // premature garbage collection. Otherwise, it returns a non-nil
721 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
722 var bestErr error = NotFoundError
723 for _, vol := range KeepVM.AllWritable() {
724 err := vol.Compare(ctx, hash, buf)
725 if ctx.Err() != nil {
727 } else if err == CollisionError {
728 // Stop if we have a block with same hash but
729 // different content. (It will be impossible
730 // to tell which one is wanted if we have
731 // both, so there's no point writing it even
732 // on a different volume.)
733 log.Printf("%s: Compare(%s): %s", vol, hash, err)
735 } else if os.IsNotExist(err) {
736 // Block does not exist. This is the only
737 // "normal" error: we don't log anything.
739 } else if err != nil {
740 // Couldn't open file, data is corrupt on
741 // disk, etc.: log this abnormal condition,
742 // and try the next volume.
743 log.Printf("%s: Compare(%s): %s", vol, hash, err)
746 if err := vol.Touch(hash); err != nil {
747 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
751 // Compare and Touch both worked --> done.
752 return vol.Replication(), nil
757 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
759 // IsValidLocator returns true if the specified string is a valid Keep locator.
760 // When Keep is extended to support hash types other than MD5,
761 // this should be updated to cover those as well.
763 func IsValidLocator(loc string) bool {
764 return validLocatorRe.MatchString(loc)
767 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
769 // GetAPIToken returns the OAuth2 token from the Authorization
770 // header of a HTTP request, or an empty string if no matching
772 func GetAPIToken(req *http.Request) string {
773 if auth, ok := req.Header["Authorization"]; ok {
774 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
781 // IsExpired returns true if the given Unix timestamp (expressed as a
782 // hexadecimal string) is in the past, or if timestampHex cannot be
783 // parsed as a hexadecimal string.
784 func IsExpired(timestampHex string) bool {
785 ts, err := strconv.ParseInt(timestampHex, 16, 0)
787 log.Printf("IsExpired: %s", err)
790 return time.Unix(ts, 0).Before(time.Now())
793 // CanDelete returns true if the user identified by apiToken is
794 // allowed to delete blocks.
795 func CanDelete(apiToken string) bool {
799 // Blocks may be deleted only when Keep has been configured with a
801 if IsSystemAuth(apiToken) {
804 // TODO(twp): look up apiToken with the API server
805 // return true if is_admin is true and if the token
806 // has unlimited scope
810 // IsSystemAuth returns true if the given token is allowed to perform
811 // system level actions like deleting data.
812 func IsSystemAuth(token string) bool {
813 return token != "" && token == theConfig.systemAuthToken