3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
15 "github.com/gorilla/mux"
28 // MakeRESTRouter returns a new mux.Router that forwards all Keep
29 // requests to the appropriate handlers.
31 func MakeRESTRouter() *mux.Router {
32 rest := mux.NewRouter()
35 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37 `/{hash:[0-9a-f]{32}}+{hints}`,
38 GetBlockHandler).Methods("GET", "HEAD")
40 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
41 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
42 // List all blocks stored here. Privileged client only.
43 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
44 // List blocks stored here whose hash has the given prefix.
45 // Privileged client only.
46 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
48 // List volumes: path, device number, bytes used/avail.
49 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
51 // Replace the current pull queue.
52 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
54 // Replace the current trash queue.
55 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
57 // Untrash moves blocks from trash back into store
58 rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
60 // Any request which does not match any of these routes gets
62 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
67 // BadRequestHandler is a HandleFunc to address bad requests.
68 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
69 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
72 // GetBlockHandler is a HandleFunc to address Get block requests.
73 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
74 if enforcePermissions {
75 locator := req.URL.Path[1:] // strip leading slash
76 if err := VerifySignature(locator, GetApiToken(req)); err != nil {
77 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
82 block, err := GetBlock(mux.Vars(req)["hash"])
84 // This type assertion is safe because the only errors
85 // GetBlock can return are DiskHashError or NotFoundError.
86 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
91 resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
92 resp.Header().Set("Content-Type", "application/octet-stream")
96 var errClientDisconnected = fmt.Errorf("client disconnected")
98 // Get a buffer from the pool -- but give up and return a non-nil
99 // error if resp implements http.CloseNotifier and tells us that the
100 // client has disconnected before we get a buffer.
101 func getBufferForResponseWriter(resp http.ResponseWriter, bufSize int) ([]byte, error) {
102 var closeNotifier <-chan bool
103 if resp, ok := resp.(http.CloseNotifier); ok {
104 closeNotifier = resp.CloseNotify()
107 bufReady := make(chan []byte)
109 bufReady <- bufs.Get(bufSize)
113 case buf = <-bufReady:
115 case <-closeNotifier:
117 // Even if closeNotifier happened first, we
118 // need to keep waiting for our buf so we can
119 // return it to the pool.
122 return nil, errClientDisconnected
126 // PutBlockHandler is a HandleFunc to address Put block requests.
127 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
128 hash := mux.Vars(req)["hash"]
130 // Detect as many error conditions as possible before reading
131 // the body: avoid transmitting data that will not end up
132 // being written anyway.
134 if req.ContentLength == -1 {
135 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
139 if req.ContentLength > BlockSize {
140 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
144 if len(KeepVM.AllWritable()) == 0 {
145 http.Error(resp, FullError.Error(), FullError.HTTPCode)
149 buf, err := getBufferForResponseWriter(resp, int(req.ContentLength))
151 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
155 _, err = io.ReadFull(req.Body, buf)
157 http.Error(resp, err.Error(), 500)
162 replication, err := PutBlock(buf, hash)
166 ke := err.(*KeepError)
167 http.Error(resp, ke.Error(), ke.HTTPCode)
171 // Success; add a size hint, sign the locator if possible, and
172 // return it to the client.
173 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
174 apiToken := GetApiToken(req)
175 if PermissionSecret != nil && apiToken != "" {
176 expiry := time.Now().Add(blobSignatureTTL)
177 returnHash = SignLocator(returnHash, apiToken, expiry)
179 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
180 resp.Write([]byte(returnHash + "\n"))
183 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
184 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
185 // Reject unauthorized requests.
186 if !IsDataManagerToken(GetApiToken(req)) {
187 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
191 prefix := mux.Vars(req)["prefix"]
193 for _, vol := range KeepVM.AllReadable() {
194 if err := vol.IndexTo(prefix, resp); err != nil {
195 // The only errors returned by IndexTo are
196 // write errors returned by resp.Write(),
197 // which probably means the client has
198 // disconnected and this error will never be
199 // reported to the client -- but it will
200 // appear in our own error log.
201 http.Error(resp, err.Error(), http.StatusInternalServerError)
205 // An empty line at EOF is the only way the client can be
206 // assured the entire index was received.
207 resp.Write([]byte{'\n'})
211 // Responds to /status.json requests with the current node status,
212 // described in a JSON structure.
214 // The data given in a status.json response includes:
215 // volumes - a list of Keep volumes currently in use by this server
216 // each volume is an object with the following fields:
218 // * device_num (an integer identifying the underlying filesystem)
223 type PoolStatus struct {
224 Alloc uint64 `json:"BytesAllocated"`
225 Cap int `json:"BuffersMax"`
226 Len int `json:"BuffersInUse"`
230 type NodeStatus struct {
231 Volumes []*VolumeStatus `json:"volumes"`
232 BufferPool PoolStatus
233 PullQueue WorkQueueStatus
234 TrashQueue WorkQueueStatus
235 Memory runtime.MemStats
239 var stLock sync.Mutex
241 // StatusHandler addresses /status.json requests.
242 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
245 jstat, err := json.Marshal(&st)
250 log.Printf("json.Marshal: %s", err)
251 log.Printf("NodeStatus = %v", &st)
252 http.Error(resp, err.Error(), 500)
256 // populate the given NodeStatus struct with current values.
257 func readNodeStatus(st *NodeStatus) {
258 vols := KeepVM.AllReadable()
259 if cap(st.Volumes) < len(vols) {
260 st.Volumes = make([]*VolumeStatus, len(vols))
262 st.Volumes = st.Volumes[:0]
263 for _, vol := range vols {
264 if s := vol.Status(); s != nil {
265 st.Volumes = append(st.Volumes, s)
268 st.BufferPool.Alloc = bufs.Alloc()
269 st.BufferPool.Cap = bufs.Cap()
270 st.BufferPool.Len = bufs.Len()
271 st.PullQueue = getWorkQueueStatus(pullq)
272 st.TrashQueue = getWorkQueueStatus(trashq)
273 runtime.ReadMemStats(&st.Memory)
276 // return a WorkQueueStatus for the given queue. If q is nil (which
277 // should never happen except in test suites), return a zero status
278 // value instead of crashing.
279 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
281 // This should only happen during tests.
282 return WorkQueueStatus{}
287 // DeleteHandler processes DELETE requests.
289 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
290 // from all connected volumes.
292 // Only the Data Manager, or an Arvados admin with scope "all", are
293 // allowed to issue DELETE requests. If a DELETE request is not
294 // authenticated or is issued by a non-admin user, the server returns
295 // a PermissionError.
297 // Upon receiving a valid request from an authorized user,
298 // DeleteHandler deletes all copies of the specified block on local
303 // If the requested blocks was not found on any volume, the response
304 // code is HTTP 404 Not Found.
306 // Otherwise, the response code is 200 OK, with a response body
307 // consisting of the JSON message
309 // {"copies_deleted":d,"copies_failed":f}
311 // where d and f are integers representing the number of blocks that
312 // were successfully and unsuccessfully deleted.
314 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
315 hash := mux.Vars(req)["hash"]
317 // Confirm that this user is an admin and has a token with unlimited scope.
318 var tok = GetApiToken(req)
319 if tok == "" || !CanDelete(tok) {
320 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
325 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
329 // Delete copies of this block from all available volumes.
330 // Report how many blocks were successfully deleted, and how
331 // many were found on writable volumes but not deleted.
333 Deleted int `json:"copies_deleted"`
334 Failed int `json:"copies_failed"`
336 for _, vol := range KeepVM.AllWritable() {
337 if err := vol.Trash(hash); err == nil {
339 } else if os.IsNotExist(err) {
343 log.Println("DeleteHandler:", err)
349 if result.Deleted == 0 && result.Failed == 0 {
350 st = http.StatusNotFound
357 if st == http.StatusOK {
358 if body, err := json.Marshal(result); err == nil {
361 log.Printf("json.Marshal: %s (result = %v)", err, result)
362 http.Error(resp, err.Error(), 500)
367 /* PullHandler processes "PUT /pull" requests for the data manager.
368 The request body is a JSON message containing a list of pull
369 requests in the following format:
373 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
375 "keep0.qr1hi.arvadosapi.com:25107",
376 "keep1.qr1hi.arvadosapi.com:25108"
380 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
390 Each pull request in the list consists of a block locator string
391 and an ordered list of servers. Keepstore should try to fetch the
392 block from each server in turn.
394 If the request has not been sent by the Data Manager, return 401
397 If the JSON unmarshalling fails, return 400 Bad Request.
400 // PullRequest consists of a block locator and an ordered list of servers
401 type PullRequest struct {
402 Locator string `json:"locator"`
403 Servers []string `json:"servers"`
406 // PullHandler processes "PUT /pull" requests for the data manager.
407 func PullHandler(resp http.ResponseWriter, req *http.Request) {
408 // Reject unauthorized requests.
409 if !IsDataManagerToken(GetApiToken(req)) {
410 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
414 // Parse the request body.
416 r := json.NewDecoder(req.Body)
417 if err := r.Decode(&pr); err != nil {
418 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
422 // We have a properly formatted pull list sent from the data
423 // manager. Report success and send the list to the pull list
424 // manager for further handling.
425 resp.WriteHeader(http.StatusOK)
427 fmt.Sprintf("Received %d pull requests\n", len(pr))))
430 for _, p := range pr {
433 pullq.ReplaceQueue(plist)
436 // TrashRequest consists of a block locator and it's Mtime
437 type TrashRequest struct {
438 Locator string `json:"locator"`
439 BlockMtime int64 `json:"block_mtime"`
442 // TrashHandler processes /trash requests.
443 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
444 // Reject unauthorized requests.
445 if !IsDataManagerToken(GetApiToken(req)) {
446 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
450 // Parse the request body.
451 var trash []TrashRequest
452 r := json.NewDecoder(req.Body)
453 if err := r.Decode(&trash); err != nil {
454 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
458 // We have a properly formatted trash list sent from the data
459 // manager. Report success and send the list to the trash work
460 // queue for further handling.
461 resp.WriteHeader(http.StatusOK)
463 fmt.Sprintf("Received %d trash requests\n", len(trash))))
466 for _, t := range trash {
469 trashq.ReplaceQueue(tlist)
472 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
473 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
474 // Reject unauthorized requests.
475 if !IsDataManagerToken(GetApiToken(req)) {
476 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
480 hash := mux.Vars(req)["hash"]
482 if len(KeepVM.AllWritable()) == 0 {
483 http.Error(resp, "No writable volumes", http.StatusNotFound)
487 var untrashedOn, failedOn []string
489 for _, vol := range KeepVM.AllWritable() {
490 err := vol.Untrash(hash)
492 if os.IsNotExist(err) {
494 } else if err != nil {
495 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
496 failedOn = append(failedOn, vol.String())
498 log.Printf("Untrashed %v on volume %v", hash, vol.String())
499 untrashedOn = append(untrashedOn, vol.String())
503 if numNotFound == len(KeepVM.AllWritable()) {
504 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
508 if len(failedOn) == len(KeepVM.AllWritable()) {
509 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
511 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
512 if len(failedOn) > 0 {
513 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
515 resp.Write([]byte(respBody))
519 // ==============================
520 // GetBlock and PutBlock implement lower-level code for handling
521 // blocks by rooting through volumes connected to the local machine.
522 // Once the handler has determined that system policy permits the
523 // request, it calls these methods to perform the actual operation.
525 // TODO(twp): this code would probably be better located in the
526 // VolumeManager interface. As an abstraction, the VolumeManager
527 // should be the only part of the code that cares about which volume a
528 // block is stored on, so it should be responsible for figuring out
529 // which volume to check for fetching blocks, storing blocks, etc.
530 // ==============================
532 // GetBlock fetches and returns the block identified by "hash".
534 // On success, GetBlock returns a byte slice with the block data, and
537 // If the block cannot be found on any volume, returns NotFoundError.
539 // If the block found does not have the correct MD5 hash, returns
542 func GetBlock(hash string) ([]byte, error) {
543 // Attempt to read the requested hash from a keep volume.
544 errorToCaller := NotFoundError
546 for _, vol := range KeepVM.AllReadable() {
547 buf, err := vol.Get(hash)
549 // IsNotExist is an expected error and may be
550 // ignored. All other errors are logged. In
551 // any case we continue trying to read other
552 // volumes. If all volumes report IsNotExist,
553 // we return a NotFoundError.
554 if !os.IsNotExist(err) {
555 log.Printf("%s: Get(%s): %s", vol, hash, err)
559 // Check the file checksum.
561 filehash := fmt.Sprintf("%x", md5.Sum(buf))
562 if filehash != hash {
563 // TODO: Try harder to tell a sysadmin about
565 log.Printf("%s: checksum mismatch for request %s (actual %s)",
567 errorToCaller = DiskHashError
571 if errorToCaller == DiskHashError {
572 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
577 return nil, errorToCaller
580 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
582 // PutBlock(block, hash)
583 // Stores the BLOCK (identified by the content id HASH) in Keep.
585 // The MD5 checksum of the block must be identical to the content id HASH.
586 // If not, an error is returned.
588 // PutBlock stores the BLOCK on the first Keep volume with free space.
589 // A failure code is returned to the user only if all volumes fail.
591 // On success, PutBlock returns nil.
592 // On failure, it returns a KeepError with one of the following codes:
595 // A different block with the same hash already exists on this
598 // The MD5 hash of the BLOCK does not match the argument HASH.
600 // There was not enough space left in any Keep volume to store
603 // The object could not be stored for some other reason (e.g.
604 // all writes failed). The text of the error message should
605 // provide as much detail as possible.
607 func PutBlock(block []byte, hash string) (int, error) {
608 // Check that BLOCK's checksum matches HASH.
609 blockhash := fmt.Sprintf("%x", md5.Sum(block))
610 if blockhash != hash {
611 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
612 return 0, RequestHashError
615 // If we already have this data, it's intact on disk, and we
616 // can update its timestamp, return success. If we have
617 // different data with the same hash, return failure.
618 if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
622 // Choose a Keep volume to write to.
623 // If this volume fails, try all of the volumes in order.
624 if vol := KeepVM.NextWritable(); vol != nil {
625 if err := vol.Put(hash, block); err == nil {
626 return vol.Replication(), nil // success!
630 writables := KeepVM.AllWritable()
631 if len(writables) == 0 {
632 log.Print("No writable volumes.")
637 for _, vol := range writables {
638 err := vol.Put(hash, block)
640 return vol.Replication(), nil // success!
642 if err != FullError {
643 // The volume is not full but the
644 // write did not succeed. Report the
645 // error and continue trying.
647 log.Printf("%s: Write(%s): %s", vol, hash, err)
652 log.Print("All volumes are full.")
655 // Already logged the non-full errors.
656 return 0, GenericError
659 // CompareAndTouch returns the current replication level if one of the
660 // volumes already has the given content and it successfully updates
661 // the relevant block's modification time in order to protect it from
662 // premature garbage collection. Otherwise, it returns a non-nil
664 func CompareAndTouch(hash string, buf []byte) (int, error) {
665 var bestErr error = NotFoundError
666 for _, vol := range KeepVM.AllWritable() {
667 if err := vol.Compare(hash, buf); err == CollisionError {
668 // Stop if we have a block with same hash but
669 // different content. (It will be impossible
670 // to tell which one is wanted if we have
671 // both, so there's no point writing it even
672 // on a different volume.)
673 log.Printf("%s: Compare(%s): %s", vol, hash, err)
675 } else if os.IsNotExist(err) {
676 // Block does not exist. This is the only
677 // "normal" error: we don't log anything.
679 } else if err != nil {
680 // Couldn't open file, data is corrupt on
681 // disk, etc.: log this abnormal condition,
682 // and try the next volume.
683 log.Printf("%s: Compare(%s): %s", vol, hash, err)
686 if err := vol.Touch(hash); err != nil {
687 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
691 // Compare and Touch both worked --> done.
692 return vol.Replication(), nil
697 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
699 // IsValidLocator returns true if the specified string is a valid Keep locator.
700 // When Keep is extended to support hash types other than MD5,
701 // this should be updated to cover those as well.
703 func IsValidLocator(loc string) bool {
704 return validLocatorRe.MatchString(loc)
707 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
709 // GetApiToken returns the OAuth2 token from the Authorization
710 // header of a HTTP request, or an empty string if no matching
712 func GetApiToken(req *http.Request) string {
713 if auth, ok := req.Header["Authorization"]; ok {
714 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
721 // IsExpired returns true if the given Unix timestamp (expressed as a
722 // hexadecimal string) is in the past, or if timestampHex cannot be
723 // parsed as a hexadecimal string.
724 func IsExpired(timestampHex string) bool {
725 ts, err := strconv.ParseInt(timestampHex, 16, 0)
727 log.Printf("IsExpired: %s", err)
730 return time.Unix(ts, 0).Before(time.Now())
733 // CanDelete returns true if the user identified by apiToken is
734 // allowed to delete blocks.
735 func CanDelete(apiToken string) bool {
739 // Blocks may be deleted only when Keep has been configured with a
741 if IsDataManagerToken(apiToken) {
744 // TODO(twp): look up apiToken with the API server
745 // return true if is_admin is true and if the token
746 // has unlimited scope
750 // IsDataManagerToken returns true if apiToken represents the data
752 func IsDataManagerToken(apiToken string) bool {
753 return dataManagerToken != "" && apiToken == dataManagerToken