3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
16 "github.com/gorilla/mux"
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
32 func MakeRESTRouter() *mux.Router {
33 rest := mux.NewRouter()
36 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
38 `/{hash:[0-9a-f]{32}}+{hints}`,
39 GetBlockHandler).Methods("GET", "HEAD")
41 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
43 // List all blocks stored here. Privileged client only.
44 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
45 // List blocks stored here whose hash has the given prefix.
46 // Privileged client only.
47 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
49 // List volumes: path, device number, bytes used/avail.
50 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
52 // Replace the current pull queue.
53 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
55 // Replace the current trash queue.
56 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
58 // Untrash moves blocks from trash back into store
59 rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
61 // Any request which does not match any of these routes gets
63 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
68 // BadRequestHandler is a HandleFunc to address bad requests.
69 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
70 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
73 // GetBlockHandler is a HandleFunc to address Get block requests.
74 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
75 if theConfig.RequireSignatures {
76 locator := req.URL.Path[1:] // strip leading slash
77 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
78 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
83 // TODO: Probe volumes to check whether the block _might_
84 // exist. Some volumes/types could support a quick existence
85 // check without causing other operations to suffer. If all
86 // volumes support that, and assure us the block definitely
87 // isn't here, we can return 404 now instead of waiting for a
90 buf, err := getBufferForResponseWriter(resp, bufs, BlockSize)
92 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
97 ctx, cancel := context.WithCancel(context.TODO())
98 if resp, ok := resp.(http.CloseNotifier); ok {
104 size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
106 code := http.StatusInternalServerError
107 if err, ok := err.(*KeepError); ok {
110 http.Error(resp, err.Error(), code)
114 resp.Header().Set("Content-Length", strconv.Itoa(size))
115 resp.Header().Set("Content-Type", "application/octet-stream")
116 resp.Write(buf[:size])
119 // Get a buffer from the pool -- but give up and return a non-nil
120 // error if resp implements http.CloseNotifier and tells us that the
121 // client has disconnected before we get a buffer.
122 func getBufferForResponseWriter(resp http.ResponseWriter, bufs *bufferPool, bufSize int) ([]byte, error) {
123 var closeNotifier <-chan bool
124 if resp, ok := resp.(http.CloseNotifier); ok {
125 closeNotifier = resp.CloseNotify()
128 bufReady := make(chan []byte)
130 bufReady <- bufs.Get(bufSize)
134 case buf = <-bufReady:
136 case <-closeNotifier:
138 // Even if closeNotifier happened first, we
139 // need to keep waiting for our buf so we can
140 // return it to the pool.
143 return nil, ErrClientDisconnect
147 // PutBlockHandler is a HandleFunc to address Put block requests.
148 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
149 hash := mux.Vars(req)["hash"]
151 // Detect as many error conditions as possible before reading
152 // the body: avoid transmitting data that will not end up
153 // being written anyway.
155 if req.ContentLength == -1 {
156 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
160 if req.ContentLength > BlockSize {
161 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
165 if len(KeepVM.AllWritable()) == 0 {
166 http.Error(resp, FullError.Error(), FullError.HTTPCode)
170 buf, err := getBufferForResponseWriter(resp, bufs, int(req.ContentLength))
172 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
176 _, err = io.ReadFull(req.Body, buf)
178 http.Error(resp, err.Error(), 500)
183 replication, err := PutBlock(buf, hash)
187 ke := err.(*KeepError)
188 http.Error(resp, ke.Error(), ke.HTTPCode)
192 // Success; add a size hint, sign the locator if possible, and
193 // return it to the client.
194 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
195 apiToken := GetAPIToken(req)
196 if theConfig.blobSigningKey != nil && apiToken != "" {
197 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
198 returnHash = SignLocator(returnHash, apiToken, expiry)
200 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
201 resp.Write([]byte(returnHash + "\n"))
204 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
205 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
206 // Reject unauthorized requests.
207 if !IsSystemAuth(GetAPIToken(req)) {
208 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
212 prefix := mux.Vars(req)["prefix"]
214 for _, vol := range KeepVM.AllReadable() {
215 if err := vol.IndexTo(prefix, resp); err != nil {
216 // The only errors returned by IndexTo are
217 // write errors returned by resp.Write(),
218 // which probably means the client has
219 // disconnected and this error will never be
220 // reported to the client -- but it will
221 // appear in our own error log.
222 http.Error(resp, err.Error(), http.StatusInternalServerError)
226 // An empty line at EOF is the only way the client can be
227 // assured the entire index was received.
228 resp.Write([]byte{'\n'})
232 // Responds to /status.json requests with the current node status,
233 // described in a JSON structure.
235 // The data given in a status.json response includes:
236 // volumes - a list of Keep volumes currently in use by this server
237 // each volume is an object with the following fields:
239 // * device_num (an integer identifying the underlying filesystem)
244 type PoolStatus struct {
245 Alloc uint64 `json:"BytesAllocated"`
246 Cap int `json:"BuffersMax"`
247 Len int `json:"BuffersInUse"`
251 type NodeStatus struct {
252 Volumes []*VolumeStatus `json:"volumes"`
253 BufferPool PoolStatus
254 PullQueue WorkQueueStatus
255 TrashQueue WorkQueueStatus
256 Memory runtime.MemStats
260 var stLock sync.Mutex
262 // StatusHandler addresses /status.json requests.
263 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
266 jstat, err := json.Marshal(&st)
271 log.Printf("json.Marshal: %s", err)
272 log.Printf("NodeStatus = %v", &st)
273 http.Error(resp, err.Error(), 500)
277 // populate the given NodeStatus struct with current values.
278 func readNodeStatus(st *NodeStatus) {
279 vols := KeepVM.AllReadable()
280 if cap(st.Volumes) < len(vols) {
281 st.Volumes = make([]*VolumeStatus, len(vols))
283 st.Volumes = st.Volumes[:0]
284 for _, vol := range vols {
285 if s := vol.Status(); s != nil {
286 st.Volumes = append(st.Volumes, s)
289 st.BufferPool.Alloc = bufs.Alloc()
290 st.BufferPool.Cap = bufs.Cap()
291 st.BufferPool.Len = bufs.Len()
292 st.PullQueue = getWorkQueueStatus(pullq)
293 st.TrashQueue = getWorkQueueStatus(trashq)
294 runtime.ReadMemStats(&st.Memory)
297 // return a WorkQueueStatus for the given queue. If q is nil (which
298 // should never happen except in test suites), return a zero status
299 // value instead of crashing.
300 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
302 // This should only happen during tests.
303 return WorkQueueStatus{}
308 // DeleteHandler processes DELETE requests.
310 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
311 // from all connected volumes.
313 // Only the Data Manager, or an Arvados admin with scope "all", are
314 // allowed to issue DELETE requests. If a DELETE request is not
315 // authenticated or is issued by a non-admin user, the server returns
316 // a PermissionError.
318 // Upon receiving a valid request from an authorized user,
319 // DeleteHandler deletes all copies of the specified block on local
324 // If the requested blocks was not found on any volume, the response
325 // code is HTTP 404 Not Found.
327 // Otherwise, the response code is 200 OK, with a response body
328 // consisting of the JSON message
330 // {"copies_deleted":d,"copies_failed":f}
332 // where d and f are integers representing the number of blocks that
333 // were successfully and unsuccessfully deleted.
335 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
336 hash := mux.Vars(req)["hash"]
338 // Confirm that this user is an admin and has a token with unlimited scope.
339 var tok = GetAPIToken(req)
340 if tok == "" || !CanDelete(tok) {
341 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
345 if !theConfig.EnableDelete {
346 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
350 // Delete copies of this block from all available volumes.
351 // Report how many blocks were successfully deleted, and how
352 // many were found on writable volumes but not deleted.
354 Deleted int `json:"copies_deleted"`
355 Failed int `json:"copies_failed"`
357 for _, vol := range KeepVM.AllWritable() {
358 if err := vol.Trash(hash); err == nil {
360 } else if os.IsNotExist(err) {
364 log.Println("DeleteHandler:", err)
370 if result.Deleted == 0 && result.Failed == 0 {
371 st = http.StatusNotFound
378 if st == http.StatusOK {
379 if body, err := json.Marshal(result); err == nil {
382 log.Printf("json.Marshal: %s (result = %v)", err, result)
383 http.Error(resp, err.Error(), 500)
388 /* PullHandler processes "PUT /pull" requests for the data manager.
389 The request body is a JSON message containing a list of pull
390 requests in the following format:
394 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
396 "keep0.qr1hi.arvadosapi.com:25107",
397 "keep1.qr1hi.arvadosapi.com:25108"
401 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
411 Each pull request in the list consists of a block locator string
412 and an ordered list of servers. Keepstore should try to fetch the
413 block from each server in turn.
415 If the request has not been sent by the Data Manager, return 401
418 If the JSON unmarshalling fails, return 400 Bad Request.
421 // PullRequest consists of a block locator and an ordered list of servers
422 type PullRequest struct {
423 Locator string `json:"locator"`
424 Servers []string `json:"servers"`
427 // PullHandler processes "PUT /pull" requests for the data manager.
428 func PullHandler(resp http.ResponseWriter, req *http.Request) {
429 // Reject unauthorized requests.
430 if !IsSystemAuth(GetAPIToken(req)) {
431 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
435 // Parse the request body.
437 r := json.NewDecoder(req.Body)
438 if err := r.Decode(&pr); err != nil {
439 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
443 // We have a properly formatted pull list sent from the data
444 // manager. Report success and send the list to the pull list
445 // manager for further handling.
446 resp.WriteHeader(http.StatusOK)
448 fmt.Sprintf("Received %d pull requests\n", len(pr))))
451 for _, p := range pr {
454 pullq.ReplaceQueue(plist)
457 // TrashRequest consists of a block locator and it's Mtime
458 type TrashRequest struct {
459 Locator string `json:"locator"`
460 BlockMtime int64 `json:"block_mtime"`
463 // TrashHandler processes /trash requests.
464 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
465 // Reject unauthorized requests.
466 if !IsSystemAuth(GetAPIToken(req)) {
467 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
471 // Parse the request body.
472 var trash []TrashRequest
473 r := json.NewDecoder(req.Body)
474 if err := r.Decode(&trash); err != nil {
475 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
479 // We have a properly formatted trash list sent from the data
480 // manager. Report success and send the list to the trash work
481 // queue for further handling.
482 resp.WriteHeader(http.StatusOK)
484 fmt.Sprintf("Received %d trash requests\n", len(trash))))
487 for _, t := range trash {
490 trashq.ReplaceQueue(tlist)
493 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
494 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
495 // Reject unauthorized requests.
496 if !IsSystemAuth(GetAPIToken(req)) {
497 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
501 hash := mux.Vars(req)["hash"]
503 if len(KeepVM.AllWritable()) == 0 {
504 http.Error(resp, "No writable volumes", http.StatusNotFound)
508 var untrashedOn, failedOn []string
510 for _, vol := range KeepVM.AllWritable() {
511 err := vol.Untrash(hash)
513 if os.IsNotExist(err) {
515 } else if err != nil {
516 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
517 failedOn = append(failedOn, vol.String())
519 log.Printf("Untrashed %v on volume %v", hash, vol.String())
520 untrashedOn = append(untrashedOn, vol.String())
524 if numNotFound == len(KeepVM.AllWritable()) {
525 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
529 if len(failedOn) == len(KeepVM.AllWritable()) {
530 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
532 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
533 if len(failedOn) > 0 {
534 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
536 resp.Write([]byte(respBody))
540 // GetBlock and PutBlock implement lower-level code for handling
541 // blocks by rooting through volumes connected to the local machine.
542 // Once the handler has determined that system policy permits the
543 // request, it calls these methods to perform the actual operation.
545 // TODO(twp): this code would probably be better located in the
546 // VolumeManager interface. As an abstraction, the VolumeManager
547 // should be the only part of the code that cares about which volume a
548 // block is stored on, so it should be responsible for figuring out
549 // which volume to check for fetching blocks, storing blocks, etc.
551 // GetBlock fetches the block identified by "hash" into the provided
552 // buf, and returns the data size.
554 // If the block cannot be found on any volume, returns NotFoundError.
556 // If the block found does not have the correct MD5 hash, returns
559 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
560 // Attempt to read the requested hash from a keep volume.
561 errorToCaller := NotFoundError
563 for _, vol := range KeepVM.AllReadable() {
564 size, err := vol.Get(ctx, hash, buf)
571 // IsNotExist is an expected error and may be
572 // ignored. All other errors are logged. In
573 // any case we continue trying to read other
574 // volumes. If all volumes report IsNotExist,
575 // we return a NotFoundError.
576 if !os.IsNotExist(err) {
577 log.Printf("%s: Get(%s): %s", vol, hash, err)
581 // Check the file checksum.
583 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
584 if filehash != hash {
585 // TODO: Try harder to tell a sysadmin about
587 log.Printf("%s: checksum mismatch for request %s (actual %s)",
589 errorToCaller = DiskHashError
592 if errorToCaller == DiskHashError {
593 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
598 return 0, errorToCaller
601 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
603 // PutBlock(block, hash)
604 // Stores the BLOCK (identified by the content id HASH) in Keep.
606 // The MD5 checksum of the block must be identical to the content id HASH.
607 // If not, an error is returned.
609 // PutBlock stores the BLOCK on the first Keep volume with free space.
610 // A failure code is returned to the user only if all volumes fail.
612 // On success, PutBlock returns nil.
613 // On failure, it returns a KeepError with one of the following codes:
616 // A different block with the same hash already exists on this
619 // The MD5 hash of the BLOCK does not match the argument HASH.
621 // There was not enough space left in any Keep volume to store
624 // The object could not be stored for some other reason (e.g.
625 // all writes failed). The text of the error message should
626 // provide as much detail as possible.
628 func PutBlock(block []byte, hash string) (int, error) {
629 // Check that BLOCK's checksum matches HASH.
630 blockhash := fmt.Sprintf("%x", md5.Sum(block))
631 if blockhash != hash {
632 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
633 return 0, RequestHashError
636 // If we already have this data, it's intact on disk, and we
637 // can update its timestamp, return success. If we have
638 // different data with the same hash, return failure.
639 if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
643 // Choose a Keep volume to write to.
644 // If this volume fails, try all of the volumes in order.
645 if vol := KeepVM.NextWritable(); vol != nil {
646 if err := vol.Put(hash, block); err == nil {
647 return vol.Replication(), nil // success!
651 writables := KeepVM.AllWritable()
652 if len(writables) == 0 {
653 log.Print("No writable volumes.")
658 for _, vol := range writables {
659 err := vol.Put(hash, block)
661 return vol.Replication(), nil // success!
663 if err != FullError {
664 // The volume is not full but the
665 // write did not succeed. Report the
666 // error and continue trying.
668 log.Printf("%s: Write(%s): %s", vol, hash, err)
673 log.Print("All volumes are full.")
676 // Already logged the non-full errors.
677 return 0, GenericError
680 // CompareAndTouch returns the current replication level if one of the
681 // volumes already has the given content and it successfully updates
682 // the relevant block's modification time in order to protect it from
683 // premature garbage collection. Otherwise, it returns a non-nil
685 func CompareAndTouch(hash string, buf []byte) (int, error) {
686 var bestErr error = NotFoundError
687 for _, vol := range KeepVM.AllWritable() {
688 if err := vol.Compare(hash, buf); err == CollisionError {
689 // Stop if we have a block with same hash but
690 // different content. (It will be impossible
691 // to tell which one is wanted if we have
692 // both, so there's no point writing it even
693 // on a different volume.)
694 log.Printf("%s: Compare(%s): %s", vol, hash, err)
696 } else if os.IsNotExist(err) {
697 // Block does not exist. This is the only
698 // "normal" error: we don't log anything.
700 } else if err != nil {
701 // Couldn't open file, data is corrupt on
702 // disk, etc.: log this abnormal condition,
703 // and try the next volume.
704 log.Printf("%s: Compare(%s): %s", vol, hash, err)
707 if err := vol.Touch(hash); err != nil {
708 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
712 // Compare and Touch both worked --> done.
713 return vol.Replication(), nil
718 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
720 // IsValidLocator returns true if the specified string is a valid Keep locator.
721 // When Keep is extended to support hash types other than MD5,
722 // this should be updated to cover those as well.
724 func IsValidLocator(loc string) bool {
725 return validLocatorRe.MatchString(loc)
728 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
730 // GetAPIToken returns the OAuth2 token from the Authorization
731 // header of a HTTP request, or an empty string if no matching
733 func GetAPIToken(req *http.Request) string {
734 if auth, ok := req.Header["Authorization"]; ok {
735 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
742 // IsExpired returns true if the given Unix timestamp (expressed as a
743 // hexadecimal string) is in the past, or if timestampHex cannot be
744 // parsed as a hexadecimal string.
745 func IsExpired(timestampHex string) bool {
746 ts, err := strconv.ParseInt(timestampHex, 16, 0)
748 log.Printf("IsExpired: %s", err)
751 return time.Unix(ts, 0).Before(time.Now())
754 // CanDelete returns true if the user identified by apiToken is
755 // allowed to delete blocks.
756 func CanDelete(apiToken string) bool {
760 // Blocks may be deleted only when Keep has been configured with a
762 if IsSystemAuth(apiToken) {
765 // TODO(twp): look up apiToken with the API server
766 // return true if is_admin is true and if the token
767 // has unlimited scope
771 // IsSystemAuth returns true if the given token is allowed to perform
772 // system level actions like deleting data.
773 func IsSystemAuth(token string) bool {
774 return token != "" && token == theConfig.systemAuthToken