3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
15 "github.com/gorilla/mux"
28 // MakeRESTRouter returns a new mux.Router that forwards all Keep
29 // requests to the appropriate handlers.
31 func MakeRESTRouter() *mux.Router {
32 rest := mux.NewRouter()
35 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37 `/{hash:[0-9a-f]{32}}+{hints}`,
38 GetBlockHandler).Methods("GET", "HEAD")
40 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
41 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
42 // List all blocks stored here. Privileged client only.
43 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
44 // List blocks stored here whose hash has the given prefix.
45 // Privileged client only.
46 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
48 // List volumes: path, device number, bytes used/avail.
49 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
51 // Replace the current pull queue.
52 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
54 // Replace the current trash queue.
55 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
57 // Untrash moves blocks from trash back into store
58 rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
60 // Any request which does not match any of these routes gets
62 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
67 // BadRequestHandler is a HandleFunc to address bad requests.
68 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
69 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
72 // GetBlockHandler is a HandleFunc to address Get block requests.
73 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
74 if enforcePermissions {
75 locator := req.URL.Path[1:] // strip leading slash
76 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
77 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
82 // TODO: Probe volumes to check whether the block _might_
83 // exist. Some volumes/types could support a quick existence
84 // check without causing other operations to suffer. If all
85 // volumes support that, and assure us the block definitely
86 // isn't here, we can return 404 now instead of waiting for a
89 buf, err := getBufferForResponseWriter(resp, bufs, BlockSize)
91 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
96 size, err := GetBlock(mux.Vars(req)["hash"], buf, resp)
98 code := http.StatusInternalServerError
99 if err, ok := err.(*KeepError); ok {
102 http.Error(resp, err.Error(), code)
106 resp.Header().Set("Content-Length", strconv.Itoa(size))
107 resp.Header().Set("Content-Type", "application/octet-stream")
108 resp.Write(buf[:size])
111 // Get a buffer from the pool -- but give up and return a non-nil
112 // error if resp implements http.CloseNotifier and tells us that the
113 // client has disconnected before we get a buffer.
114 func getBufferForResponseWriter(resp http.ResponseWriter, bufs *bufferPool, bufSize int) ([]byte, error) {
115 var closeNotifier <-chan bool
116 if resp, ok := resp.(http.CloseNotifier); ok {
117 closeNotifier = resp.CloseNotify()
120 bufReady := make(chan []byte)
122 bufReady <- bufs.Get(bufSize)
126 case buf = <-bufReady:
128 case <-closeNotifier:
130 // Even if closeNotifier happened first, we
131 // need to keep waiting for our buf so we can
132 // return it to the pool.
135 return nil, ErrClientDisconnect
139 // PutBlockHandler is a HandleFunc to address Put block requests.
140 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
141 hash := mux.Vars(req)["hash"]
143 // Detect as many error conditions as possible before reading
144 // the body: avoid transmitting data that will not end up
145 // being written anyway.
147 if req.ContentLength == -1 {
148 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
152 if req.ContentLength > BlockSize {
153 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
157 if len(KeepVM.AllWritable()) == 0 {
158 http.Error(resp, FullError.Error(), FullError.HTTPCode)
162 buf, err := getBufferForResponseWriter(resp, bufs, int(req.ContentLength))
164 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
168 _, err = io.ReadFull(req.Body, buf)
170 http.Error(resp, err.Error(), 500)
175 replication, err := PutBlock(buf, hash)
179 ke := err.(*KeepError)
180 http.Error(resp, ke.Error(), ke.HTTPCode)
184 // Success; add a size hint, sign the locator if possible, and
185 // return it to the client.
186 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
187 apiToken := GetAPIToken(req)
188 if PermissionSecret != nil && apiToken != "" {
189 expiry := time.Now().Add(blobSignatureTTL)
190 returnHash = SignLocator(returnHash, apiToken, expiry)
192 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
193 resp.Write([]byte(returnHash + "\n"))
196 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
197 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
198 // Reject unauthorized requests.
199 if !IsDataManagerToken(GetAPIToken(req)) {
200 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
204 prefix := mux.Vars(req)["prefix"]
206 for _, vol := range KeepVM.AllReadable() {
207 if err := vol.IndexTo(prefix, resp); err != nil {
208 // The only errors returned by IndexTo are
209 // write errors returned by resp.Write(),
210 // which probably means the client has
211 // disconnected and this error will never be
212 // reported to the client -- but it will
213 // appear in our own error log.
214 http.Error(resp, err.Error(), http.StatusInternalServerError)
218 // An empty line at EOF is the only way the client can be
219 // assured the entire index was received.
220 resp.Write([]byte{'\n'})
224 // Responds to /status.json requests with the current node status,
225 // described in a JSON structure.
227 // The data given in a status.json response includes:
228 // volumes - a list of Keep volumes currently in use by this server
229 // each volume is an object with the following fields:
231 // * device_num (an integer identifying the underlying filesystem)
236 type PoolStatus struct {
237 Alloc uint64 `json:"BytesAllocated"`
238 Cap int `json:"BuffersMax"`
239 Len int `json:"BuffersInUse"`
243 type NodeStatus struct {
244 Volumes []*VolumeStatus `json:"volumes"`
245 BufferPool PoolStatus
246 PullQueue WorkQueueStatus
247 TrashQueue WorkQueueStatus
248 Memory runtime.MemStats
252 var stLock sync.Mutex
254 // StatusHandler addresses /status.json requests.
255 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
258 jstat, err := json.Marshal(&st)
263 log.Printf("json.Marshal: %s", err)
264 log.Printf("NodeStatus = %v", &st)
265 http.Error(resp, err.Error(), 500)
269 // populate the given NodeStatus struct with current values.
270 func readNodeStatus(st *NodeStatus) {
271 vols := KeepVM.AllReadable()
272 if cap(st.Volumes) < len(vols) {
273 st.Volumes = make([]*VolumeStatus, len(vols))
275 st.Volumes = st.Volumes[:0]
276 for _, vol := range vols {
277 if s := vol.Status(); s != nil {
278 st.Volumes = append(st.Volumes, s)
281 st.BufferPool.Alloc = bufs.Alloc()
282 st.BufferPool.Cap = bufs.Cap()
283 st.BufferPool.Len = bufs.Len()
284 st.PullQueue = getWorkQueueStatus(pullq)
285 st.TrashQueue = getWorkQueueStatus(trashq)
286 runtime.ReadMemStats(&st.Memory)
289 // return a WorkQueueStatus for the given queue. If q is nil (which
290 // should never happen except in test suites), return a zero status
291 // value instead of crashing.
292 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
294 // This should only happen during tests.
295 return WorkQueueStatus{}
300 // DeleteHandler processes DELETE requests.
302 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
303 // from all connected volumes.
305 // Only the Data Manager, or an Arvados admin with scope "all", are
306 // allowed to issue DELETE requests. If a DELETE request is not
307 // authenticated or is issued by a non-admin user, the server returns
308 // a PermissionError.
310 // Upon receiving a valid request from an authorized user,
311 // DeleteHandler deletes all copies of the specified block on local
316 // If the requested blocks was not found on any volume, the response
317 // code is HTTP 404 Not Found.
319 // Otherwise, the response code is 200 OK, with a response body
320 // consisting of the JSON message
322 // {"copies_deleted":d,"copies_failed":f}
324 // where d and f are integers representing the number of blocks that
325 // were successfully and unsuccessfully deleted.
327 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
328 hash := mux.Vars(req)["hash"]
330 // Confirm that this user is an admin and has a token with unlimited scope.
331 var tok = GetAPIToken(req)
332 if tok == "" || !CanDelete(tok) {
333 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
338 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
342 // Delete copies of this block from all available volumes.
343 // Report how many blocks were successfully deleted, and how
344 // many were found on writable volumes but not deleted.
346 Deleted int `json:"copies_deleted"`
347 Failed int `json:"copies_failed"`
349 for _, vol := range KeepVM.AllWritable() {
350 if err := vol.Trash(hash); err == nil {
352 } else if os.IsNotExist(err) {
356 log.Println("DeleteHandler:", err)
362 if result.Deleted == 0 && result.Failed == 0 {
363 st = http.StatusNotFound
370 if st == http.StatusOK {
371 if body, err := json.Marshal(result); err == nil {
374 log.Printf("json.Marshal: %s (result = %v)", err, result)
375 http.Error(resp, err.Error(), 500)
380 /* PullHandler processes "PUT /pull" requests for the data manager.
381 The request body is a JSON message containing a list of pull
382 requests in the following format:
386 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
388 "keep0.qr1hi.arvadosapi.com:25107",
389 "keep1.qr1hi.arvadosapi.com:25108"
393 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
403 Each pull request in the list consists of a block locator string
404 and an ordered list of servers. Keepstore should try to fetch the
405 block from each server in turn.
407 If the request has not been sent by the Data Manager, return 401
410 If the JSON unmarshalling fails, return 400 Bad Request.
413 // PullRequest consists of a block locator and an ordered list of servers
414 type PullRequest struct {
415 Locator string `json:"locator"`
416 Servers []string `json:"servers"`
419 // PullHandler processes "PUT /pull" requests for the data manager.
420 func PullHandler(resp http.ResponseWriter, req *http.Request) {
421 // Reject unauthorized requests.
422 if !IsDataManagerToken(GetAPIToken(req)) {
423 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
427 // Parse the request body.
429 r := json.NewDecoder(req.Body)
430 if err := r.Decode(&pr); err != nil {
431 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
435 // We have a properly formatted pull list sent from the data
436 // manager. Report success and send the list to the pull list
437 // manager for further handling.
438 resp.WriteHeader(http.StatusOK)
440 fmt.Sprintf("Received %d pull requests\n", len(pr))))
443 for _, p := range pr {
446 pullq.ReplaceQueue(plist)
449 // TrashRequest consists of a block locator and it's Mtime
450 type TrashRequest struct {
451 Locator string `json:"locator"`
452 BlockMtime int64 `json:"block_mtime"`
455 // TrashHandler processes /trash requests.
456 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
457 // Reject unauthorized requests.
458 if !IsDataManagerToken(GetAPIToken(req)) {
459 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
463 // Parse the request body.
464 var trash []TrashRequest
465 r := json.NewDecoder(req.Body)
466 if err := r.Decode(&trash); err != nil {
467 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
471 // We have a properly formatted trash list sent from the data
472 // manager. Report success and send the list to the trash work
473 // queue for further handling.
474 resp.WriteHeader(http.StatusOK)
476 fmt.Sprintf("Received %d trash requests\n", len(trash))))
479 for _, t := range trash {
482 trashq.ReplaceQueue(tlist)
485 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
486 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
487 // Reject unauthorized requests.
488 if !IsDataManagerToken(GetAPIToken(req)) {
489 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
493 hash := mux.Vars(req)["hash"]
495 if len(KeepVM.AllWritable()) == 0 {
496 http.Error(resp, "No writable volumes", http.StatusNotFound)
500 var untrashedOn, failedOn []string
502 for _, vol := range KeepVM.AllWritable() {
503 err := vol.Untrash(hash)
505 if os.IsNotExist(err) {
507 } else if err != nil {
508 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
509 failedOn = append(failedOn, vol.String())
511 log.Printf("Untrashed %v on volume %v", hash, vol.String())
512 untrashedOn = append(untrashedOn, vol.String())
516 if numNotFound == len(KeepVM.AllWritable()) {
517 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
521 if len(failedOn) == len(KeepVM.AllWritable()) {
522 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
524 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
525 if len(failedOn) > 0 {
526 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
528 resp.Write([]byte(respBody))
532 // GetBlock and PutBlock implement lower-level code for handling
533 // blocks by rooting through volumes connected to the local machine.
534 // Once the handler has determined that system policy permits the
535 // request, it calls these methods to perform the actual operation.
537 // TODO(twp): this code would probably be better located in the
538 // VolumeManager interface. As an abstraction, the VolumeManager
539 // should be the only part of the code that cares about which volume a
540 // block is stored on, so it should be responsible for figuring out
541 // which volume to check for fetching blocks, storing blocks, etc.
543 // GetBlock fetches the block identified by "hash" into the provided
544 // buf, and returns the data size.
546 // If the block cannot be found on any volume, returns NotFoundError.
548 // If the block found does not have the correct MD5 hash, returns
551 func GetBlock(hash string, buf []byte, resp http.ResponseWriter) (int, error) {
552 // Attempt to read the requested hash from a keep volume.
553 errorToCaller := NotFoundError
555 for _, vol := range KeepVM.AllReadable() {
556 size, err := vol.Get(hash, buf)
558 // IsNotExist is an expected error and may be
559 // ignored. All other errors are logged. In
560 // any case we continue trying to read other
561 // volumes. If all volumes report IsNotExist,
562 // we return a NotFoundError.
563 if !os.IsNotExist(err) {
564 log.Printf("%s: Get(%s): %s", vol, hash, err)
568 // Check the file checksum.
570 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
571 if filehash != hash {
572 // TODO: Try harder to tell a sysadmin about
574 log.Printf("%s: checksum mismatch for request %s (actual %s)",
576 errorToCaller = DiskHashError
579 if errorToCaller == DiskHashError {
580 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
585 return 0, errorToCaller
588 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
590 // PutBlock(block, hash)
591 // Stores the BLOCK (identified by the content id HASH) in Keep.
593 // The MD5 checksum of the block must be identical to the content id HASH.
594 // If not, an error is returned.
596 // PutBlock stores the BLOCK on the first Keep volume with free space.
597 // A failure code is returned to the user only if all volumes fail.
599 // On success, PutBlock returns nil.
600 // On failure, it returns a KeepError with one of the following codes:
603 // A different block with the same hash already exists on this
606 // The MD5 hash of the BLOCK does not match the argument HASH.
608 // There was not enough space left in any Keep volume to store
611 // The object could not be stored for some other reason (e.g.
612 // all writes failed). The text of the error message should
613 // provide as much detail as possible.
615 func PutBlock(block []byte, hash string) (int, error) {
616 // Check that BLOCK's checksum matches HASH.
617 blockhash := fmt.Sprintf("%x", md5.Sum(block))
618 if blockhash != hash {
619 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
620 return 0, RequestHashError
623 // If we already have this data, it's intact on disk, and we
624 // can update its timestamp, return success. If we have
625 // different data with the same hash, return failure.
626 if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
630 // Choose a Keep volume to write to.
631 // If this volume fails, try all of the volumes in order.
632 if vol := KeepVM.NextWritable(); vol != nil {
633 if err := vol.Put(hash, block); err == nil {
634 return vol.Replication(), nil // success!
638 writables := KeepVM.AllWritable()
639 if len(writables) == 0 {
640 log.Print("No writable volumes.")
645 for _, vol := range writables {
646 err := vol.Put(hash, block)
648 return vol.Replication(), nil // success!
650 if err != FullError {
651 // The volume is not full but the
652 // write did not succeed. Report the
653 // error and continue trying.
655 log.Printf("%s: Write(%s): %s", vol, hash, err)
660 log.Print("All volumes are full.")
663 // Already logged the non-full errors.
664 return 0, GenericError
667 // CompareAndTouch returns the current replication level if one of the
668 // volumes already has the given content and it successfully updates
669 // the relevant block's modification time in order to protect it from
670 // premature garbage collection. Otherwise, it returns a non-nil
672 func CompareAndTouch(hash string, buf []byte) (int, error) {
673 var bestErr error = NotFoundError
674 for _, vol := range KeepVM.AllWritable() {
675 if err := vol.Compare(hash, buf); err == CollisionError {
676 // Stop if we have a block with same hash but
677 // different content. (It will be impossible
678 // to tell which one is wanted if we have
679 // both, so there's no point writing it even
680 // on a different volume.)
681 log.Printf("%s: Compare(%s): %s", vol, hash, err)
683 } else if os.IsNotExist(err) {
684 // Block does not exist. This is the only
685 // "normal" error: we don't log anything.
687 } else if err != nil {
688 // Couldn't open file, data is corrupt on
689 // disk, etc.: log this abnormal condition,
690 // and try the next volume.
691 log.Printf("%s: Compare(%s): %s", vol, hash, err)
694 if err := vol.Touch(hash); err != nil {
695 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
699 // Compare and Touch both worked --> done.
700 return vol.Replication(), nil
705 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
707 // IsValidLocator returns true if the specified string is a valid Keep locator.
708 // When Keep is extended to support hash types other than MD5,
709 // this should be updated to cover those as well.
711 func IsValidLocator(loc string) bool {
712 return validLocatorRe.MatchString(loc)
715 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
717 // GetAPIToken returns the OAuth2 token from the Authorization
718 // header of a HTTP request, or an empty string if no matching
720 func GetAPIToken(req *http.Request) string {
721 if auth, ok := req.Header["Authorization"]; ok {
722 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
729 // IsExpired returns true if the given Unix timestamp (expressed as a
730 // hexadecimal string) is in the past, or if timestampHex cannot be
731 // parsed as a hexadecimal string.
732 func IsExpired(timestampHex string) bool {
733 ts, err := strconv.ParseInt(timestampHex, 16, 0)
735 log.Printf("IsExpired: %s", err)
738 return time.Unix(ts, 0).Before(time.Now())
741 // CanDelete returns true if the user identified by apiToken is
742 // allowed to delete blocks.
743 func CanDelete(apiToken string) bool {
747 // Blocks may be deleted only when Keep has been configured with a
749 if IsDataManagerToken(apiToken) {
752 // TODO(twp): look up apiToken with the API server
753 // return true if is_admin is true and if the token
754 // has unlimited scope
758 // IsDataManagerToken returns true if apiToken represents the data
760 func IsDataManagerToken(apiToken string) bool {
761 return dataManagerToken != "" && apiToken == dataManagerToken