3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
16 "github.com/gorilla/mux"
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
32 func MakeRESTRouter() *mux.Router {
33 rest := mux.NewRouter()
36 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
38 `/{hash:[0-9a-f]{32}}+{hints}`,
39 GetBlockHandler).Methods("GET", "HEAD")
41 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
43 // List all blocks stored here. Privileged client only.
44 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
45 // List blocks stored here whose hash has the given prefix.
46 // Privileged client only.
47 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
49 // List volumes: path, device number, bytes used/avail.
50 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
52 // Replace the current pull queue.
53 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
55 // Replace the current trash queue.
56 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
58 // Untrash moves blocks from trash back into store
59 rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
61 // Any request which does not match any of these routes gets
63 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
68 // BadRequestHandler is a HandleFunc to address bad requests.
69 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
70 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
73 // GetBlockHandler is a HandleFunc to address Get block requests.
74 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
75 ctx, cancel := contextForResponse(context.TODO(), resp)
78 if theConfig.RequireSignatures {
79 locator := req.URL.Path[1:] // strip leading slash
80 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
81 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
86 // TODO: Probe volumes to check whether the block _might_
87 // exist. Some volumes/types could support a quick existence
88 // check without causing other operations to suffer. If all
89 // volumes support that, and assure us the block definitely
90 // isn't here, we can return 404 now instead of waiting for a
93 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
95 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
100 size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
102 code := http.StatusInternalServerError
103 if err, ok := err.(*KeepError); ok {
106 http.Error(resp, err.Error(), code)
110 resp.Header().Set("Content-Length", strconv.Itoa(size))
111 resp.Header().Set("Content-Type", "application/octet-stream")
112 resp.Write(buf[:size])
115 // Return a new context that gets cancelled by resp's CloseNotifier.
116 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
117 ctx, cancel := context.WithCancel(parent)
118 if cn, ok := resp.(http.CloseNotifier); ok {
119 go func(c <-chan bool) {
122 theConfig.debugLogf("cancel context")
131 // Get a buffer from the pool -- but give up and return a non-nil
132 // error if ctx ends before we get a buffer.
133 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
134 bufReady := make(chan []byte)
136 bufReady <- bufs.Get(bufSize)
139 case buf := <-bufReady:
143 // Even if closeNotifier happened first, we
144 // need to keep waiting for our buf so we can
145 // return it to the pool.
148 return nil, ErrClientDisconnect
152 // PutBlockHandler is a HandleFunc to address Put block requests.
153 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
154 ctx, cancel := contextForResponse(context.TODO(), resp)
157 hash := mux.Vars(req)["hash"]
159 // Detect as many error conditions as possible before reading
160 // the body: avoid transmitting data that will not end up
161 // being written anyway.
163 if req.ContentLength == -1 {
164 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
168 if req.ContentLength > BlockSize {
169 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
173 if len(KeepVM.AllWritable()) == 0 {
174 http.Error(resp, FullError.Error(), FullError.HTTPCode)
178 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
180 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
184 _, err = io.ReadFull(req.Body, buf)
186 http.Error(resp, err.Error(), 500)
191 replication, err := PutBlock(ctx, buf, hash)
195 ke := err.(*KeepError)
196 http.Error(resp, ke.Error(), ke.HTTPCode)
200 // Success; add a size hint, sign the locator if possible, and
201 // return it to the client.
202 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
203 apiToken := GetAPIToken(req)
204 if theConfig.blobSigningKey != nil && apiToken != "" {
205 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
206 returnHash = SignLocator(returnHash, apiToken, expiry)
208 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
209 resp.Write([]byte(returnHash + "\n"))
212 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
213 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
214 // Reject unauthorized requests.
215 if !IsSystemAuth(GetAPIToken(req)) {
216 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
220 prefix := mux.Vars(req)["prefix"]
222 for _, vol := range KeepVM.AllReadable() {
223 if err := vol.IndexTo(prefix, resp); err != nil {
224 // The only errors returned by IndexTo are
225 // write errors returned by resp.Write(),
226 // which probably means the client has
227 // disconnected and this error will never be
228 // reported to the client -- but it will
229 // appear in our own error log.
230 http.Error(resp, err.Error(), http.StatusInternalServerError)
234 // An empty line at EOF is the only way the client can be
235 // assured the entire index was received.
236 resp.Write([]byte{'\n'})
240 // Responds to /status.json requests with the current node status,
241 // described in a JSON structure.
243 // The data given in a status.json response includes:
244 // volumes - a list of Keep volumes currently in use by this server
245 // each volume is an object with the following fields:
247 // * device_num (an integer identifying the underlying filesystem)
252 type PoolStatus struct {
253 Alloc uint64 `json:"BytesAllocated"`
254 Cap int `json:"BuffersMax"`
255 Len int `json:"BuffersInUse"`
259 type NodeStatus struct {
260 Volumes []*VolumeStatus `json:"volumes"`
261 BufferPool PoolStatus
262 PullQueue WorkQueueStatus
263 TrashQueue WorkQueueStatus
264 Memory runtime.MemStats
268 var stLock sync.Mutex
270 // StatusHandler addresses /status.json requests.
271 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
274 jstat, err := json.Marshal(&st)
279 log.Printf("json.Marshal: %s", err)
280 log.Printf("NodeStatus = %v", &st)
281 http.Error(resp, err.Error(), 500)
285 // populate the given NodeStatus struct with current values.
286 func readNodeStatus(st *NodeStatus) {
287 vols := KeepVM.AllReadable()
288 if cap(st.Volumes) < len(vols) {
289 st.Volumes = make([]*VolumeStatus, len(vols))
291 st.Volumes = st.Volumes[:0]
292 for _, vol := range vols {
293 if s := vol.Status(); s != nil {
294 st.Volumes = append(st.Volumes, s)
297 st.BufferPool.Alloc = bufs.Alloc()
298 st.BufferPool.Cap = bufs.Cap()
299 st.BufferPool.Len = bufs.Len()
300 st.PullQueue = getWorkQueueStatus(pullq)
301 st.TrashQueue = getWorkQueueStatus(trashq)
302 runtime.ReadMemStats(&st.Memory)
305 // return a WorkQueueStatus for the given queue. If q is nil (which
306 // should never happen except in test suites), return a zero status
307 // value instead of crashing.
308 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
310 // This should only happen during tests.
311 return WorkQueueStatus{}
316 // DeleteHandler processes DELETE requests.
318 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
319 // from all connected volumes.
321 // Only the Data Manager, or an Arvados admin with scope "all", are
322 // allowed to issue DELETE requests. If a DELETE request is not
323 // authenticated or is issued by a non-admin user, the server returns
324 // a PermissionError.
326 // Upon receiving a valid request from an authorized user,
327 // DeleteHandler deletes all copies of the specified block on local
332 // If the requested blocks was not found on any volume, the response
333 // code is HTTP 404 Not Found.
335 // Otherwise, the response code is 200 OK, with a response body
336 // consisting of the JSON message
338 // {"copies_deleted":d,"copies_failed":f}
340 // where d and f are integers representing the number of blocks that
341 // were successfully and unsuccessfully deleted.
343 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
344 hash := mux.Vars(req)["hash"]
346 // Confirm that this user is an admin and has a token with unlimited scope.
347 var tok = GetAPIToken(req)
348 if tok == "" || !CanDelete(tok) {
349 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
353 if !theConfig.EnableDelete {
354 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
358 // Delete copies of this block from all available volumes.
359 // Report how many blocks were successfully deleted, and how
360 // many were found on writable volumes but not deleted.
362 Deleted int `json:"copies_deleted"`
363 Failed int `json:"copies_failed"`
365 for _, vol := range KeepVM.AllWritable() {
366 if err := vol.Trash(hash); err == nil {
368 } else if os.IsNotExist(err) {
372 log.Println("DeleteHandler:", err)
378 if result.Deleted == 0 && result.Failed == 0 {
379 st = http.StatusNotFound
386 if st == http.StatusOK {
387 if body, err := json.Marshal(result); err == nil {
390 log.Printf("json.Marshal: %s (result = %v)", err, result)
391 http.Error(resp, err.Error(), 500)
396 /* PullHandler processes "PUT /pull" requests for the data manager.
397 The request body is a JSON message containing a list of pull
398 requests in the following format:
402 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
404 "keep0.qr1hi.arvadosapi.com:25107",
405 "keep1.qr1hi.arvadosapi.com:25108"
409 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
419 Each pull request in the list consists of a block locator string
420 and an ordered list of servers. Keepstore should try to fetch the
421 block from each server in turn.
423 If the request has not been sent by the Data Manager, return 401
426 If the JSON unmarshalling fails, return 400 Bad Request.
429 // PullRequest consists of a block locator and an ordered list of servers
430 type PullRequest struct {
431 Locator string `json:"locator"`
432 Servers []string `json:"servers"`
435 // PullHandler processes "PUT /pull" requests for the data manager.
436 func PullHandler(resp http.ResponseWriter, req *http.Request) {
437 // Reject unauthorized requests.
438 if !IsSystemAuth(GetAPIToken(req)) {
439 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
443 // Parse the request body.
445 r := json.NewDecoder(req.Body)
446 if err := r.Decode(&pr); err != nil {
447 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
451 // We have a properly formatted pull list sent from the data
452 // manager. Report success and send the list to the pull list
453 // manager for further handling.
454 resp.WriteHeader(http.StatusOK)
456 fmt.Sprintf("Received %d pull requests\n", len(pr))))
459 for _, p := range pr {
462 pullq.ReplaceQueue(plist)
465 // TrashRequest consists of a block locator and it's Mtime
466 type TrashRequest struct {
467 Locator string `json:"locator"`
468 BlockMtime int64 `json:"block_mtime"`
471 // TrashHandler processes /trash requests.
472 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
473 // Reject unauthorized requests.
474 if !IsSystemAuth(GetAPIToken(req)) {
475 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
479 // Parse the request body.
480 var trash []TrashRequest
481 r := json.NewDecoder(req.Body)
482 if err := r.Decode(&trash); err != nil {
483 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
487 // We have a properly formatted trash list sent from the data
488 // manager. Report success and send the list to the trash work
489 // queue for further handling.
490 resp.WriteHeader(http.StatusOK)
492 fmt.Sprintf("Received %d trash requests\n", len(trash))))
495 for _, t := range trash {
498 trashq.ReplaceQueue(tlist)
501 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
502 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
503 // Reject unauthorized requests.
504 if !IsSystemAuth(GetAPIToken(req)) {
505 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
509 hash := mux.Vars(req)["hash"]
511 if len(KeepVM.AllWritable()) == 0 {
512 http.Error(resp, "No writable volumes", http.StatusNotFound)
516 var untrashedOn, failedOn []string
518 for _, vol := range KeepVM.AllWritable() {
519 err := vol.Untrash(hash)
521 if os.IsNotExist(err) {
523 } else if err != nil {
524 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
525 failedOn = append(failedOn, vol.String())
527 log.Printf("Untrashed %v on volume %v", hash, vol.String())
528 untrashedOn = append(untrashedOn, vol.String())
532 if numNotFound == len(KeepVM.AllWritable()) {
533 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
537 if len(failedOn) == len(KeepVM.AllWritable()) {
538 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
540 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
541 if len(failedOn) > 0 {
542 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
544 resp.Write([]byte(respBody))
548 // GetBlock and PutBlock implement lower-level code for handling
549 // blocks by rooting through volumes connected to the local machine.
550 // Once the handler has determined that system policy permits the
551 // request, it calls these methods to perform the actual operation.
553 // TODO(twp): this code would probably be better located in the
554 // VolumeManager interface. As an abstraction, the VolumeManager
555 // should be the only part of the code that cares about which volume a
556 // block is stored on, so it should be responsible for figuring out
557 // which volume to check for fetching blocks, storing blocks, etc.
559 // GetBlock fetches the block identified by "hash" into the provided
560 // buf, and returns the data size.
562 // If the block cannot be found on any volume, returns NotFoundError.
564 // If the block found does not have the correct MD5 hash, returns
567 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
568 // Attempt to read the requested hash from a keep volume.
569 errorToCaller := NotFoundError
571 for _, vol := range KeepVM.AllReadable() {
572 size, err := vol.Get(ctx, hash, buf)
579 // IsNotExist is an expected error and may be
580 // ignored. All other errors are logged. In
581 // any case we continue trying to read other
582 // volumes. If all volumes report IsNotExist,
583 // we return a NotFoundError.
584 if !os.IsNotExist(err) {
585 log.Printf("%s: Get(%s): %s", vol, hash, err)
589 // Check the file checksum.
591 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
592 if filehash != hash {
593 // TODO: Try harder to tell a sysadmin about
595 log.Printf("%s: checksum mismatch for request %s (actual %s)",
597 errorToCaller = DiskHashError
600 if errorToCaller == DiskHashError {
601 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
606 return 0, errorToCaller
609 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
611 // PutBlock(ctx, block, hash)
612 // Stores the BLOCK (identified by the content id HASH) in Keep.
614 // The MD5 checksum of the block must be identical to the content id HASH.
615 // If not, an error is returned.
617 // PutBlock stores the BLOCK on the first Keep volume with free space.
618 // A failure code is returned to the user only if all volumes fail.
620 // On success, PutBlock returns nil.
621 // On failure, it returns a KeepError with one of the following codes:
624 // A different block with the same hash already exists on this
627 // The MD5 hash of the BLOCK does not match the argument HASH.
629 // There was not enough space left in any Keep volume to store
632 // The object could not be stored for some other reason (e.g.
633 // all writes failed). The text of the error message should
634 // provide as much detail as possible.
636 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
637 // Check that BLOCK's checksum matches HASH.
638 blockhash := fmt.Sprintf("%x", md5.Sum(block))
639 if blockhash != hash {
640 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
641 return 0, RequestHashError
644 // If we already have this data, it's intact on disk, and we
645 // can update its timestamp, return success. If we have
646 // different data with the same hash, return failure.
647 if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
651 // Choose a Keep volume to write to.
652 // If this volume fails, try all of the volumes in order.
653 if vol := KeepVM.NextWritable(); vol != nil {
654 if err := vol.Put(context.TODO(), hash, block); err == nil {
655 return vol.Replication(), nil // success!
659 writables := KeepVM.AllWritable()
660 if len(writables) == 0 {
661 log.Print("No writable volumes.")
666 for _, vol := range writables {
667 err := vol.Put(ctx, hash, block)
674 return vol.Replication(), nil // success!
676 if err != FullError {
677 // The volume is not full but the
678 // write did not succeed. Report the
679 // error and continue trying.
681 log.Printf("%s: Write(%s): %s", vol, hash, err)
686 log.Print("All volumes are full.")
689 // Already logged the non-full errors.
690 return 0, GenericError
693 // CompareAndTouch returns the current replication level if one of the
694 // volumes already has the given content and it successfully updates
695 // the relevant block's modification time in order to protect it from
696 // premature garbage collection. Otherwise, it returns a non-nil
698 func CompareAndTouch(hash string, buf []byte) (int, error) {
699 var bestErr error = NotFoundError
700 for _, vol := range KeepVM.AllWritable() {
701 if err := vol.Compare(hash, buf); err == CollisionError {
702 // Stop if we have a block with same hash but
703 // different content. (It will be impossible
704 // to tell which one is wanted if we have
705 // both, so there's no point writing it even
706 // on a different volume.)
707 log.Printf("%s: Compare(%s): %s", vol, hash, err)
709 } else if os.IsNotExist(err) {
710 // Block does not exist. This is the only
711 // "normal" error: we don't log anything.
713 } else if err != nil {
714 // Couldn't open file, data is corrupt on
715 // disk, etc.: log this abnormal condition,
716 // and try the next volume.
717 log.Printf("%s: Compare(%s): %s", vol, hash, err)
720 if err := vol.Touch(hash); err != nil {
721 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
725 // Compare and Touch both worked --> done.
726 return vol.Replication(), nil
731 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
733 // IsValidLocator returns true if the specified string is a valid Keep locator.
734 // When Keep is extended to support hash types other than MD5,
735 // this should be updated to cover those as well.
737 func IsValidLocator(loc string) bool {
738 return validLocatorRe.MatchString(loc)
741 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
743 // GetAPIToken returns the OAuth2 token from the Authorization
744 // header of a HTTP request, or an empty string if no matching
746 func GetAPIToken(req *http.Request) string {
747 if auth, ok := req.Header["Authorization"]; ok {
748 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
755 // IsExpired returns true if the given Unix timestamp (expressed as a
756 // hexadecimal string) is in the past, or if timestampHex cannot be
757 // parsed as a hexadecimal string.
758 func IsExpired(timestampHex string) bool {
759 ts, err := strconv.ParseInt(timestampHex, 16, 0)
761 log.Printf("IsExpired: %s", err)
764 return time.Unix(ts, 0).Before(time.Now())
767 // CanDelete returns true if the user identified by apiToken is
768 // allowed to delete blocks.
769 func CanDelete(apiToken string) bool {
773 // Blocks may be deleted only when Keep has been configured with a
775 if IsSystemAuth(apiToken) {
778 // TODO(twp): look up apiToken with the API server
779 // return true if is_admin is true and if the token
780 // has unlimited scope
784 // IsSystemAuth returns true if the given token is allowed to perform
785 // system level actions like deleting data.
786 func IsSystemAuth(token string) bool {
787 return token != "" && token == theConfig.systemAuthToken