3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
16 "github.com/gorilla/mux"
27 "git.curoverse.com/arvados.git/sdk/go/httpserver"
28 log "github.com/Sirupsen/logrus"
33 limiter httpserver.RequestCounter
36 // MakeRESTRouter returns a new router that forwards all Keep requests
37 // to the appropriate handlers.
38 func MakeRESTRouter() *router {
39 rest := mux.NewRouter()
40 rtr := &router{Router: rest}
43 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
45 `/{hash:[0-9a-f]{32}}+{hints}`,
46 GetBlockHandler).Methods("GET", "HEAD")
48 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
49 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
50 // List all blocks stored here. Privileged client only.
51 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
52 // List blocks stored here whose hash has the given prefix.
53 // Privileged client only.
54 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
56 // Internals/debugging info (runtime.MemStats)
57 rest.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
59 // List volumes: path, device number, bytes used/avail.
60 rest.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
62 // Replace the current pull queue.
63 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
65 // Replace the current trash queue.
66 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
68 // Untrash moves blocks from trash back into store
69 rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
71 // Any request which does not match any of these routes gets
73 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
78 // BadRequestHandler is a HandleFunc to address bad requests.
79 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
80 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
83 // GetBlockHandler is a HandleFunc to address Get block requests.
84 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
85 ctx, cancel := contextForResponse(context.TODO(), resp)
88 if theConfig.RequireSignatures {
89 locator := req.URL.Path[1:] // strip leading slash
90 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
91 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
96 // TODO: Probe volumes to check whether the block _might_
97 // exist. Some volumes/types could support a quick existence
98 // check without causing other operations to suffer. If all
99 // volumes support that, and assure us the block definitely
100 // isn't here, we can return 404 now instead of waiting for a
103 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
105 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
110 size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
112 code := http.StatusInternalServerError
113 if err, ok := err.(*KeepError); ok {
116 http.Error(resp, err.Error(), code)
120 resp.Header().Set("Content-Length", strconv.Itoa(size))
121 resp.Header().Set("Content-Type", "application/octet-stream")
122 resp.Write(buf[:size])
125 // Return a new context that gets cancelled by resp's CloseNotifier.
126 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
127 ctx, cancel := context.WithCancel(parent)
128 if cn, ok := resp.(http.CloseNotifier); ok {
129 go func(c <-chan bool) {
132 theConfig.debugLogf("cancel context")
141 // Get a buffer from the pool -- but give up and return a non-nil
142 // error if ctx ends before we get a buffer.
143 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
144 bufReady := make(chan []byte)
146 bufReady <- bufs.Get(bufSize)
149 case buf := <-bufReady:
153 // Even if closeNotifier happened first, we
154 // need to keep waiting for our buf so we can
155 // return it to the pool.
158 return nil, ErrClientDisconnect
162 // PutBlockHandler is a HandleFunc to address Put block requests.
163 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
164 ctx, cancel := contextForResponse(context.TODO(), resp)
167 hash := mux.Vars(req)["hash"]
169 // Detect as many error conditions as possible before reading
170 // the body: avoid transmitting data that will not end up
171 // being written anyway.
173 if req.ContentLength == -1 {
174 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
178 if req.ContentLength > BlockSize {
179 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
183 if len(KeepVM.AllWritable()) == 0 {
184 http.Error(resp, FullError.Error(), FullError.HTTPCode)
188 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
190 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
194 _, err = io.ReadFull(req.Body, buf)
196 http.Error(resp, err.Error(), 500)
201 replication, err := PutBlock(ctx, buf, hash)
205 code := http.StatusInternalServerError
206 if err, ok := err.(*KeepError); ok {
209 http.Error(resp, err.Error(), code)
213 // Success; add a size hint, sign the locator if possible, and
214 // return it to the client.
215 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
216 apiToken := GetAPIToken(req)
217 if theConfig.blobSigningKey != nil && apiToken != "" {
218 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
219 returnHash = SignLocator(returnHash, apiToken, expiry)
221 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
222 resp.Write([]byte(returnHash + "\n"))
225 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
226 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
227 // Reject unauthorized requests.
228 if !IsSystemAuth(GetAPIToken(req)) {
229 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
233 prefix := mux.Vars(req)["prefix"]
235 for _, vol := range KeepVM.AllReadable() {
236 if err := vol.IndexTo(prefix, resp); err != nil {
237 // The only errors returned by IndexTo are
238 // write errors returned by resp.Write(),
239 // which probably means the client has
240 // disconnected and this error will never be
241 // reported to the client -- but it will
242 // appear in our own error log.
243 http.Error(resp, err.Error(), http.StatusInternalServerError)
247 // An empty line at EOF is the only way the client can be
248 // assured the entire index was received.
249 resp.Write([]byte{'\n'})
253 type PoolStatus struct {
254 Alloc uint64 `json:"BytesAllocated"`
255 Cap int `json:"BuffersMax"`
256 Len int `json:"BuffersInUse"`
259 type volumeStatusEnt struct {
261 Status *VolumeStatus `json:",omitempty"`
262 VolumeStats *ioStats `json:",omitempty"`
263 InternalStats interface{} `json:",omitempty"`
267 type NodeStatus struct {
268 Volumes []*volumeStatusEnt
269 BufferPool PoolStatus
270 PullQueue WorkQueueStatus
271 TrashQueue WorkQueueStatus
277 var stLock sync.Mutex
279 // DebugHandler addresses /debug.json requests.
280 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
281 type debugStats struct {
282 MemStats runtime.MemStats
285 runtime.ReadMemStats(&ds.MemStats)
286 err := json.NewEncoder(resp).Encode(&ds)
288 http.Error(resp, err.Error(), 500)
292 // StatusHandler addresses /status.json requests.
293 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
295 rtr.readNodeStatus(&st)
296 jstat, err := json.Marshal(&st)
301 log.Printf("json.Marshal: %s", err)
302 log.Printf("NodeStatus = %v", &st)
303 http.Error(resp, err.Error(), 500)
307 // populate the given NodeStatus struct with current values.
308 func (rtr *router) readNodeStatus(st *NodeStatus) {
309 vols := KeepVM.AllReadable()
310 if cap(st.Volumes) < len(vols) {
311 st.Volumes = make([]*volumeStatusEnt, len(vols))
313 st.Volumes = st.Volumes[:0]
314 for _, vol := range vols {
315 var internalStats interface{}
316 if vol, ok := vol.(InternalStatser); ok {
317 internalStats = vol.InternalStats()
319 st.Volumes = append(st.Volumes, &volumeStatusEnt{
321 Status: vol.Status(),
322 InternalStats: internalStats,
323 //VolumeStats: KeepVM.VolumeStats(vol),
326 st.BufferPool.Alloc = bufs.Alloc()
327 st.BufferPool.Cap = bufs.Cap()
328 st.BufferPool.Len = bufs.Len()
329 st.PullQueue = getWorkQueueStatus(pullq)
330 st.TrashQueue = getWorkQueueStatus(trashq)
331 if rtr.limiter != nil {
332 st.RequestsCurrent = rtr.limiter.Current()
333 st.RequestsMax = rtr.limiter.Max()
337 // return a WorkQueueStatus for the given queue. If q is nil (which
338 // should never happen except in test suites), return a zero status
339 // value instead of crashing.
340 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
342 // This should only happen during tests.
343 return WorkQueueStatus{}
348 // DeleteHandler processes DELETE requests.
350 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
351 // from all connected volumes.
353 // Only the Data Manager, or an Arvados admin with scope "all", are
354 // allowed to issue DELETE requests. If a DELETE request is not
355 // authenticated or is issued by a non-admin user, the server returns
356 // a PermissionError.
358 // Upon receiving a valid request from an authorized user,
359 // DeleteHandler deletes all copies of the specified block on local
364 // If the requested blocks was not found on any volume, the response
365 // code is HTTP 404 Not Found.
367 // Otherwise, the response code is 200 OK, with a response body
368 // consisting of the JSON message
370 // {"copies_deleted":d,"copies_failed":f}
372 // where d and f are integers representing the number of blocks that
373 // were successfully and unsuccessfully deleted.
375 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
376 hash := mux.Vars(req)["hash"]
378 // Confirm that this user is an admin and has a token with unlimited scope.
379 var tok = GetAPIToken(req)
380 if tok == "" || !CanDelete(tok) {
381 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
385 if !theConfig.EnableDelete {
386 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
390 // Delete copies of this block from all available volumes.
391 // Report how many blocks were successfully deleted, and how
392 // many were found on writable volumes but not deleted.
394 Deleted int `json:"copies_deleted"`
395 Failed int `json:"copies_failed"`
397 for _, vol := range KeepVM.AllWritable() {
398 if err := vol.Trash(hash); err == nil {
400 } else if os.IsNotExist(err) {
404 log.Println("DeleteHandler:", err)
410 if result.Deleted == 0 && result.Failed == 0 {
411 st = http.StatusNotFound
418 if st == http.StatusOK {
419 if body, err := json.Marshal(result); err == nil {
422 log.Printf("json.Marshal: %s (result = %v)", err, result)
423 http.Error(resp, err.Error(), 500)
428 /* PullHandler processes "PUT /pull" requests for the data manager.
429 The request body is a JSON message containing a list of pull
430 requests in the following format:
434 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
436 "keep0.qr1hi.arvadosapi.com:25107",
437 "keep1.qr1hi.arvadosapi.com:25108"
441 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
451 Each pull request in the list consists of a block locator string
452 and an ordered list of servers. Keepstore should try to fetch the
453 block from each server in turn.
455 If the request has not been sent by the Data Manager, return 401
458 If the JSON unmarshalling fails, return 400 Bad Request.
461 // PullRequest consists of a block locator and an ordered list of servers
462 type PullRequest struct {
463 Locator string `json:"locator"`
464 Servers []string `json:"servers"`
467 // PullHandler processes "PUT /pull" requests for the data manager.
468 func PullHandler(resp http.ResponseWriter, req *http.Request) {
469 // Reject unauthorized requests.
470 if !IsSystemAuth(GetAPIToken(req)) {
471 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
475 // Parse the request body.
477 r := json.NewDecoder(req.Body)
478 if err := r.Decode(&pr); err != nil {
479 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
483 // We have a properly formatted pull list sent from the data
484 // manager. Report success and send the list to the pull list
485 // manager for further handling.
486 resp.WriteHeader(http.StatusOK)
488 fmt.Sprintf("Received %d pull requests\n", len(pr))))
491 for _, p := range pr {
494 pullq.ReplaceQueue(plist)
497 // TrashRequest consists of a block locator and it's Mtime
498 type TrashRequest struct {
499 Locator string `json:"locator"`
500 BlockMtime int64 `json:"block_mtime"`
503 // TrashHandler processes /trash requests.
504 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
505 // Reject unauthorized requests.
506 if !IsSystemAuth(GetAPIToken(req)) {
507 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
511 // Parse the request body.
512 var trash []TrashRequest
513 r := json.NewDecoder(req.Body)
514 if err := r.Decode(&trash); err != nil {
515 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
519 // We have a properly formatted trash list sent from the data
520 // manager. Report success and send the list to the trash work
521 // queue for further handling.
522 resp.WriteHeader(http.StatusOK)
524 fmt.Sprintf("Received %d trash requests\n", len(trash))))
527 for _, t := range trash {
530 trashq.ReplaceQueue(tlist)
533 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
534 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
535 // Reject unauthorized requests.
536 if !IsSystemAuth(GetAPIToken(req)) {
537 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
541 hash := mux.Vars(req)["hash"]
543 if len(KeepVM.AllWritable()) == 0 {
544 http.Error(resp, "No writable volumes", http.StatusNotFound)
548 var untrashedOn, failedOn []string
550 for _, vol := range KeepVM.AllWritable() {
551 err := vol.Untrash(hash)
553 if os.IsNotExist(err) {
555 } else if err != nil {
556 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
557 failedOn = append(failedOn, vol.String())
559 log.Printf("Untrashed %v on volume %v", hash, vol.String())
560 untrashedOn = append(untrashedOn, vol.String())
564 if numNotFound == len(KeepVM.AllWritable()) {
565 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
569 if len(failedOn) == len(KeepVM.AllWritable()) {
570 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
572 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
573 if len(failedOn) > 0 {
574 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
576 resp.Write([]byte(respBody))
580 // GetBlock and PutBlock implement lower-level code for handling
581 // blocks by rooting through volumes connected to the local machine.
582 // Once the handler has determined that system policy permits the
583 // request, it calls these methods to perform the actual operation.
585 // TODO(twp): this code would probably be better located in the
586 // VolumeManager interface. As an abstraction, the VolumeManager
587 // should be the only part of the code that cares about which volume a
588 // block is stored on, so it should be responsible for figuring out
589 // which volume to check for fetching blocks, storing blocks, etc.
591 // GetBlock fetches the block identified by "hash" into the provided
592 // buf, and returns the data size.
594 // If the block cannot be found on any volume, returns NotFoundError.
596 // If the block found does not have the correct MD5 hash, returns
599 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
600 // Attempt to read the requested hash from a keep volume.
601 errorToCaller := NotFoundError
603 for _, vol := range KeepVM.AllReadable() {
604 size, err := vol.Get(ctx, hash, buf)
607 return 0, ErrClientDisconnect
611 // IsNotExist is an expected error and may be
612 // ignored. All other errors are logged. In
613 // any case we continue trying to read other
614 // volumes. If all volumes report IsNotExist,
615 // we return a NotFoundError.
616 if !os.IsNotExist(err) {
617 log.Printf("%s: Get(%s): %s", vol, hash, err)
621 // Check the file checksum.
623 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
624 if filehash != hash {
625 // TODO: Try harder to tell a sysadmin about
627 log.Printf("%s: checksum mismatch for request %s (actual %s)",
629 errorToCaller = DiskHashError
632 if errorToCaller == DiskHashError {
633 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
638 return 0, errorToCaller
641 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
643 // PutBlock(ctx, block, hash)
644 // Stores the BLOCK (identified by the content id HASH) in Keep.
646 // The MD5 checksum of the block must be identical to the content id HASH.
647 // If not, an error is returned.
649 // PutBlock stores the BLOCK on the first Keep volume with free space.
650 // A failure code is returned to the user only if all volumes fail.
652 // On success, PutBlock returns nil.
653 // On failure, it returns a KeepError with one of the following codes:
656 // A different block with the same hash already exists on this
659 // The MD5 hash of the BLOCK does not match the argument HASH.
661 // There was not enough space left in any Keep volume to store
664 // The object could not be stored for some other reason (e.g.
665 // all writes failed). The text of the error message should
666 // provide as much detail as possible.
668 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
669 // Check that BLOCK's checksum matches HASH.
670 blockhash := fmt.Sprintf("%x", md5.Sum(block))
671 if blockhash != hash {
672 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
673 return 0, RequestHashError
676 // If we already have this data, it's intact on disk, and we
677 // can update its timestamp, return success. If we have
678 // different data with the same hash, return failure.
679 if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
681 } else if ctx.Err() != nil {
682 return 0, ErrClientDisconnect
685 // Choose a Keep volume to write to.
686 // If this volume fails, try all of the volumes in order.
687 if vol := KeepVM.NextWritable(); vol != nil {
688 if err := vol.Put(ctx, hash, block); err == nil {
689 return vol.Replication(), nil // success!
691 if ctx.Err() != nil {
692 return 0, ErrClientDisconnect
696 writables := KeepVM.AllWritable()
697 if len(writables) == 0 {
698 log.Print("No writable volumes.")
703 for _, vol := range writables {
704 err := vol.Put(ctx, hash, block)
705 if ctx.Err() != nil {
706 return 0, ErrClientDisconnect
709 return vol.Replication(), nil // success!
711 if err != FullError {
712 // The volume is not full but the
713 // write did not succeed. Report the
714 // error and continue trying.
716 log.Printf("%s: Write(%s): %s", vol, hash, err)
721 log.Print("All volumes are full.")
724 // Already logged the non-full errors.
725 return 0, GenericError
728 // CompareAndTouch returns the current replication level if one of the
729 // volumes already has the given content and it successfully updates
730 // the relevant block's modification time in order to protect it from
731 // premature garbage collection. Otherwise, it returns a non-nil
733 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
734 var bestErr error = NotFoundError
735 for _, vol := range KeepVM.AllWritable() {
736 err := vol.Compare(ctx, hash, buf)
737 if ctx.Err() != nil {
739 } else if err == CollisionError {
740 // Stop if we have a block with same hash but
741 // different content. (It will be impossible
742 // to tell which one is wanted if we have
743 // both, so there's no point writing it even
744 // on a different volume.)
745 log.Printf("%s: Compare(%s): %s", vol, hash, err)
747 } else if os.IsNotExist(err) {
748 // Block does not exist. This is the only
749 // "normal" error: we don't log anything.
751 } else if err != nil {
752 // Couldn't open file, data is corrupt on
753 // disk, etc.: log this abnormal condition,
754 // and try the next volume.
755 log.Printf("%s: Compare(%s): %s", vol, hash, err)
758 if err := vol.Touch(hash); err != nil {
759 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
763 // Compare and Touch both worked --> done.
764 return vol.Replication(), nil
769 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
771 // IsValidLocator returns true if the specified string is a valid Keep locator.
772 // When Keep is extended to support hash types other than MD5,
773 // this should be updated to cover those as well.
775 func IsValidLocator(loc string) bool {
776 return validLocatorRe.MatchString(loc)
779 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
781 // GetAPIToken returns the OAuth2 token from the Authorization
782 // header of a HTTP request, or an empty string if no matching
784 func GetAPIToken(req *http.Request) string {
785 if auth, ok := req.Header["Authorization"]; ok {
786 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
793 // IsExpired returns true if the given Unix timestamp (expressed as a
794 // hexadecimal string) is in the past, or if timestampHex cannot be
795 // parsed as a hexadecimal string.
796 func IsExpired(timestampHex string) bool {
797 ts, err := strconv.ParseInt(timestampHex, 16, 0)
799 log.Printf("IsExpired: %s", err)
802 return time.Unix(ts, 0).Before(time.Now())
805 // CanDelete returns true if the user identified by apiToken is
806 // allowed to delete blocks.
807 func CanDelete(apiToken string) bool {
811 // Blocks may be deleted only when Keep has been configured with a
813 if IsSystemAuth(apiToken) {
816 // TODO(twp): look up apiToken with the API server
817 // return true if is_admin is true and if the token
818 // has unlimited scope
822 // IsSystemAuth returns true if the given token is allowed to perform
823 // system level actions like deleting data.
824 func IsSystemAuth(token string) bool {
825 return token != "" && token == theConfig.systemAuthToken