3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
16 "github.com/gorilla/mux"
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
32 func MakeRESTRouter() *mux.Router {
33 rest := mux.NewRouter()
36 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
38 `/{hash:[0-9a-f]{32}}+{hints}`,
39 GetBlockHandler).Methods("GET", "HEAD")
41 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
43 // List all blocks stored here. Privileged client only.
44 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
45 // List blocks stored here whose hash has the given prefix.
46 // Privileged client only.
47 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
49 // Internals/debugging info (runtime.MemStats)
50 rest.HandleFunc(`/debug.json`, DebugHandler).Methods("GET", "HEAD")
52 // List volumes: path, device number, bytes used/avail.
53 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
55 // Replace the current pull queue.
56 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
58 // Replace the current trash queue.
59 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
61 // Untrash moves blocks from trash back into store
62 rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
64 // Any request which does not match any of these routes gets
66 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
71 // BadRequestHandler is a HandleFunc to address bad requests.
72 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
73 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
76 // GetBlockHandler is a HandleFunc to address Get block requests.
77 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
78 ctx, cancel := contextForResponse(context.TODO(), resp)
81 if theConfig.RequireSignatures {
82 locator := req.URL.Path[1:] // strip leading slash
83 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
84 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
89 // TODO: Probe volumes to check whether the block _might_
90 // exist. Some volumes/types could support a quick existence
91 // check without causing other operations to suffer. If all
92 // volumes support that, and assure us the block definitely
93 // isn't here, we can return 404 now instead of waiting for a
96 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
98 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
103 size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
105 code := http.StatusInternalServerError
106 if err, ok := err.(*KeepError); ok {
109 http.Error(resp, err.Error(), code)
113 resp.Header().Set("Content-Length", strconv.Itoa(size))
114 resp.Header().Set("Content-Type", "application/octet-stream")
115 resp.Write(buf[:size])
118 // Return a new context that gets cancelled by resp's CloseNotifier.
119 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
120 ctx, cancel := context.WithCancel(parent)
121 if cn, ok := resp.(http.CloseNotifier); ok {
122 go func(c <-chan bool) {
125 theConfig.debugLogf("cancel context")
134 // Get a buffer from the pool -- but give up and return a non-nil
135 // error if ctx ends before we get a buffer.
136 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
137 bufReady := make(chan []byte)
139 bufReady <- bufs.Get(bufSize)
142 case buf := <-bufReady:
146 // Even if closeNotifier happened first, we
147 // need to keep waiting for our buf so we can
148 // return it to the pool.
151 return nil, ErrClientDisconnect
155 // PutBlockHandler is a HandleFunc to address Put block requests.
156 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
157 ctx, cancel := contextForResponse(context.TODO(), resp)
160 hash := mux.Vars(req)["hash"]
162 // Detect as many error conditions as possible before reading
163 // the body: avoid transmitting data that will not end up
164 // being written anyway.
166 if req.ContentLength == -1 {
167 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
171 if req.ContentLength > BlockSize {
172 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
176 if len(KeepVM.AllWritable()) == 0 {
177 http.Error(resp, FullError.Error(), FullError.HTTPCode)
181 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
183 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
187 _, err = io.ReadFull(req.Body, buf)
189 http.Error(resp, err.Error(), 500)
194 replication, err := PutBlock(ctx, buf, hash)
198 code := http.StatusInternalServerError
199 if err, ok := err.(*KeepError); ok {
202 http.Error(resp, err.Error(), code)
206 // Success; add a size hint, sign the locator if possible, and
207 // return it to the client.
208 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
209 apiToken := GetAPIToken(req)
210 if theConfig.blobSigningKey != nil && apiToken != "" {
211 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
212 returnHash = SignLocator(returnHash, apiToken, expiry)
214 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
215 resp.Write([]byte(returnHash + "\n"))
218 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
219 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
220 // Reject unauthorized requests.
221 if !IsSystemAuth(GetAPIToken(req)) {
222 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
226 prefix := mux.Vars(req)["prefix"]
228 for _, vol := range KeepVM.AllReadable() {
229 if err := vol.IndexTo(prefix, resp); err != nil {
230 // The only errors returned by IndexTo are
231 // write errors returned by resp.Write(),
232 // which probably means the client has
233 // disconnected and this error will never be
234 // reported to the client -- but it will
235 // appear in our own error log.
236 http.Error(resp, err.Error(), http.StatusInternalServerError)
240 // An empty line at EOF is the only way the client can be
241 // assured the entire index was received.
242 resp.Write([]byte{'\n'})
246 type PoolStatus struct {
247 Alloc uint64 `json:"BytesAllocated"`
248 Cap int `json:"BuffersMax"`
249 Len int `json:"BuffersInUse"`
252 type volumeStatusEnt struct {
254 Status *VolumeStatus `json:",omitempty"`
255 VolumeStats *ioStats `json:",omitempty"`
256 InternalStats interface{} `json:",omitempty"`
260 type NodeStatus struct {
261 Volumes []*volumeStatusEnt
262 BufferPool PoolStatus
263 PullQueue WorkQueueStatus
264 TrashQueue WorkQueueStatus
268 var stLock sync.Mutex
270 // DebugHandler addresses /debug.json requests.
271 func DebugHandler(resp http.ResponseWriter, req *http.Request) {
272 type debugStats struct {
273 MemStats runtime.MemStats
276 runtime.ReadMemStats(&ds.MemStats)
277 err := json.NewEncoder(resp).Encode(&ds)
279 http.Error(resp, err.Error(), 500)
283 // StatusHandler addresses /status.json requests.
284 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
287 jstat, err := json.Marshal(&st)
292 log.Printf("json.Marshal: %s", err)
293 log.Printf("NodeStatus = %v", &st)
294 http.Error(resp, err.Error(), 500)
298 // populate the given NodeStatus struct with current values.
299 func readNodeStatus(st *NodeStatus) {
300 vols := KeepVM.AllReadable()
301 if cap(st.Volumes) < len(vols) {
302 st.Volumes = make([]*volumeStatusEnt, len(vols))
304 st.Volumes = st.Volumes[:0]
305 for _, vol := range vols {
306 var internalStats interface{}
307 if vol, ok := vol.(InternalStatser); ok {
308 internalStats = vol.InternalStats()
310 st.Volumes = append(st.Volumes, &volumeStatusEnt{
312 Status: vol.Status(),
313 InternalStats: internalStats,
314 //VolumeStats: KeepVM.VolumeStats(vol),
317 st.BufferPool.Alloc = bufs.Alloc()
318 st.BufferPool.Cap = bufs.Cap()
319 st.BufferPool.Len = bufs.Len()
320 st.PullQueue = getWorkQueueStatus(pullq)
321 st.TrashQueue = getWorkQueueStatus(trashq)
324 // return a WorkQueueStatus for the given queue. If q is nil (which
325 // should never happen except in test suites), return a zero status
326 // value instead of crashing.
327 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
329 // This should only happen during tests.
330 return WorkQueueStatus{}
335 // DeleteHandler processes DELETE requests.
337 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
338 // from all connected volumes.
340 // Only the Data Manager, or an Arvados admin with scope "all", are
341 // allowed to issue DELETE requests. If a DELETE request is not
342 // authenticated or is issued by a non-admin user, the server returns
343 // a PermissionError.
345 // Upon receiving a valid request from an authorized user,
346 // DeleteHandler deletes all copies of the specified block on local
351 // If the requested blocks was not found on any volume, the response
352 // code is HTTP 404 Not Found.
354 // Otherwise, the response code is 200 OK, with a response body
355 // consisting of the JSON message
357 // {"copies_deleted":d,"copies_failed":f}
359 // where d and f are integers representing the number of blocks that
360 // were successfully and unsuccessfully deleted.
362 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
363 hash := mux.Vars(req)["hash"]
365 // Confirm that this user is an admin and has a token with unlimited scope.
366 var tok = GetAPIToken(req)
367 if tok == "" || !CanDelete(tok) {
368 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
372 if !theConfig.EnableDelete {
373 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
377 // Delete copies of this block from all available volumes.
378 // Report how many blocks were successfully deleted, and how
379 // many were found on writable volumes but not deleted.
381 Deleted int `json:"copies_deleted"`
382 Failed int `json:"copies_failed"`
384 for _, vol := range KeepVM.AllWritable() {
385 if err := vol.Trash(hash); err == nil {
387 } else if os.IsNotExist(err) {
391 log.Println("DeleteHandler:", err)
397 if result.Deleted == 0 && result.Failed == 0 {
398 st = http.StatusNotFound
405 if st == http.StatusOK {
406 if body, err := json.Marshal(result); err == nil {
409 log.Printf("json.Marshal: %s (result = %v)", err, result)
410 http.Error(resp, err.Error(), 500)
415 /* PullHandler processes "PUT /pull" requests for the data manager.
416 The request body is a JSON message containing a list of pull
417 requests in the following format:
421 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
423 "keep0.qr1hi.arvadosapi.com:25107",
424 "keep1.qr1hi.arvadosapi.com:25108"
428 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
438 Each pull request in the list consists of a block locator string
439 and an ordered list of servers. Keepstore should try to fetch the
440 block from each server in turn.
442 If the request has not been sent by the Data Manager, return 401
445 If the JSON unmarshalling fails, return 400 Bad Request.
448 // PullRequest consists of a block locator and an ordered list of servers
449 type PullRequest struct {
450 Locator string `json:"locator"`
451 Servers []string `json:"servers"`
454 // PullHandler processes "PUT /pull" requests for the data manager.
455 func PullHandler(resp http.ResponseWriter, req *http.Request) {
456 // Reject unauthorized requests.
457 if !IsSystemAuth(GetAPIToken(req)) {
458 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
462 // Parse the request body.
464 r := json.NewDecoder(req.Body)
465 if err := r.Decode(&pr); err != nil {
466 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
470 // We have a properly formatted pull list sent from the data
471 // manager. Report success and send the list to the pull list
472 // manager for further handling.
473 resp.WriteHeader(http.StatusOK)
475 fmt.Sprintf("Received %d pull requests\n", len(pr))))
478 for _, p := range pr {
481 pullq.ReplaceQueue(plist)
484 // TrashRequest consists of a block locator and it's Mtime
485 type TrashRequest struct {
486 Locator string `json:"locator"`
487 BlockMtime int64 `json:"block_mtime"`
490 // TrashHandler processes /trash requests.
491 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
492 // Reject unauthorized requests.
493 if !IsSystemAuth(GetAPIToken(req)) {
494 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
498 // Parse the request body.
499 var trash []TrashRequest
500 r := json.NewDecoder(req.Body)
501 if err := r.Decode(&trash); err != nil {
502 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
506 // We have a properly formatted trash list sent from the data
507 // manager. Report success and send the list to the trash work
508 // queue for further handling.
509 resp.WriteHeader(http.StatusOK)
511 fmt.Sprintf("Received %d trash requests\n", len(trash))))
514 for _, t := range trash {
517 trashq.ReplaceQueue(tlist)
520 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
521 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
522 // Reject unauthorized requests.
523 if !IsSystemAuth(GetAPIToken(req)) {
524 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
528 hash := mux.Vars(req)["hash"]
530 if len(KeepVM.AllWritable()) == 0 {
531 http.Error(resp, "No writable volumes", http.StatusNotFound)
535 var untrashedOn, failedOn []string
537 for _, vol := range KeepVM.AllWritable() {
538 err := vol.Untrash(hash)
540 if os.IsNotExist(err) {
542 } else if err != nil {
543 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
544 failedOn = append(failedOn, vol.String())
546 log.Printf("Untrashed %v on volume %v", hash, vol.String())
547 untrashedOn = append(untrashedOn, vol.String())
551 if numNotFound == len(KeepVM.AllWritable()) {
552 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
556 if len(failedOn) == len(KeepVM.AllWritable()) {
557 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
559 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
560 if len(failedOn) > 0 {
561 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
563 resp.Write([]byte(respBody))
567 // GetBlock and PutBlock implement lower-level code for handling
568 // blocks by rooting through volumes connected to the local machine.
569 // Once the handler has determined that system policy permits the
570 // request, it calls these methods to perform the actual operation.
572 // TODO(twp): this code would probably be better located in the
573 // VolumeManager interface. As an abstraction, the VolumeManager
574 // should be the only part of the code that cares about which volume a
575 // block is stored on, so it should be responsible for figuring out
576 // which volume to check for fetching blocks, storing blocks, etc.
578 // GetBlock fetches the block identified by "hash" into the provided
579 // buf, and returns the data size.
581 // If the block cannot be found on any volume, returns NotFoundError.
583 // If the block found does not have the correct MD5 hash, returns
586 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
587 // Attempt to read the requested hash from a keep volume.
588 errorToCaller := NotFoundError
590 for _, vol := range KeepVM.AllReadable() {
591 size, err := vol.Get(ctx, hash, buf)
594 return 0, ErrClientDisconnect
598 // IsNotExist is an expected error and may be
599 // ignored. All other errors are logged. In
600 // any case we continue trying to read other
601 // volumes. If all volumes report IsNotExist,
602 // we return a NotFoundError.
603 if !os.IsNotExist(err) {
604 log.Printf("%s: Get(%s): %s", vol, hash, err)
608 // Check the file checksum.
610 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
611 if filehash != hash {
612 // TODO: Try harder to tell a sysadmin about
614 log.Printf("%s: checksum mismatch for request %s (actual %s)",
616 errorToCaller = DiskHashError
619 if errorToCaller == DiskHashError {
620 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
625 return 0, errorToCaller
628 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
630 // PutBlock(ctx, block, hash)
631 // Stores the BLOCK (identified by the content id HASH) in Keep.
633 // The MD5 checksum of the block must be identical to the content id HASH.
634 // If not, an error is returned.
636 // PutBlock stores the BLOCK on the first Keep volume with free space.
637 // A failure code is returned to the user only if all volumes fail.
639 // On success, PutBlock returns nil.
640 // On failure, it returns a KeepError with one of the following codes:
643 // A different block with the same hash already exists on this
646 // The MD5 hash of the BLOCK does not match the argument HASH.
648 // There was not enough space left in any Keep volume to store
651 // The object could not be stored for some other reason (e.g.
652 // all writes failed). The text of the error message should
653 // provide as much detail as possible.
655 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
656 // Check that BLOCK's checksum matches HASH.
657 blockhash := fmt.Sprintf("%x", md5.Sum(block))
658 if blockhash != hash {
659 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
660 return 0, RequestHashError
663 // If we already have this data, it's intact on disk, and we
664 // can update its timestamp, return success. If we have
665 // different data with the same hash, return failure.
666 if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
668 } else if ctx.Err() != nil {
669 return 0, ErrClientDisconnect
672 // Choose a Keep volume to write to.
673 // If this volume fails, try all of the volumes in order.
674 if vol := KeepVM.NextWritable(); vol != nil {
675 if err := vol.Put(ctx, hash, block); err == nil {
676 return vol.Replication(), nil // success!
678 if ctx.Err() != nil {
679 return 0, ErrClientDisconnect
683 writables := KeepVM.AllWritable()
684 if len(writables) == 0 {
685 log.Print("No writable volumes.")
690 for _, vol := range writables {
691 err := vol.Put(ctx, hash, block)
692 if ctx.Err() != nil {
693 return 0, ErrClientDisconnect
696 return vol.Replication(), nil // success!
698 if err != FullError {
699 // The volume is not full but the
700 // write did not succeed. Report the
701 // error and continue trying.
703 log.Printf("%s: Write(%s): %s", vol, hash, err)
708 log.Print("All volumes are full.")
711 // Already logged the non-full errors.
712 return 0, GenericError
715 // CompareAndTouch returns the current replication level if one of the
716 // volumes already has the given content and it successfully updates
717 // the relevant block's modification time in order to protect it from
718 // premature garbage collection. Otherwise, it returns a non-nil
720 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
721 var bestErr error = NotFoundError
722 for _, vol := range KeepVM.AllWritable() {
723 err := vol.Compare(ctx, hash, buf)
724 if ctx.Err() != nil {
726 } else if err == CollisionError {
727 // Stop if we have a block with same hash but
728 // different content. (It will be impossible
729 // to tell which one is wanted if we have
730 // both, so there's no point writing it even
731 // on a different volume.)
732 log.Printf("%s: Compare(%s): %s", vol, hash, err)
734 } else if os.IsNotExist(err) {
735 // Block does not exist. This is the only
736 // "normal" error: we don't log anything.
738 } else if err != nil {
739 // Couldn't open file, data is corrupt on
740 // disk, etc.: log this abnormal condition,
741 // and try the next volume.
742 log.Printf("%s: Compare(%s): %s", vol, hash, err)
745 if err := vol.Touch(hash); err != nil {
746 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
750 // Compare and Touch both worked --> done.
751 return vol.Replication(), nil
756 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
758 // IsValidLocator returns true if the specified string is a valid Keep locator.
759 // When Keep is extended to support hash types other than MD5,
760 // this should be updated to cover those as well.
762 func IsValidLocator(loc string) bool {
763 return validLocatorRe.MatchString(loc)
766 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
768 // GetAPIToken returns the OAuth2 token from the Authorization
769 // header of a HTTP request, or an empty string if no matching
771 func GetAPIToken(req *http.Request) string {
772 if auth, ok := req.Header["Authorization"]; ok {
773 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
780 // IsExpired returns true if the given Unix timestamp (expressed as a
781 // hexadecimal string) is in the past, or if timestampHex cannot be
782 // parsed as a hexadecimal string.
783 func IsExpired(timestampHex string) bool {
784 ts, err := strconv.ParseInt(timestampHex, 16, 0)
786 log.Printf("IsExpired: %s", err)
789 return time.Unix(ts, 0).Before(time.Now())
792 // CanDelete returns true if the user identified by apiToken is
793 // allowed to delete blocks.
794 func CanDelete(apiToken string) bool {
798 // Blocks may be deleted only when Keep has been configured with a
800 if IsSystemAuth(apiToken) {
803 // TODO(twp): look up apiToken with the API server
804 // return true if is_admin is true and if the token
805 // has unlimited scope
809 // IsSystemAuth returns true if the given token is allowed to perform
810 // system level actions like deleting data.
811 func IsSystemAuth(token string) bool {
812 return token != "" && token == theConfig.systemAuthToken