3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
15 "github.com/gorilla/mux"
27 // MakeRESTRouter returns a new mux.Router that forwards all Keep
28 // requests to the appropriate handlers.
30 func MakeRESTRouter() *mux.Router {
31 rest := mux.NewRouter()
34 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
36 `/{hash:[0-9a-f]{32}}+{hints}`,
37 GetBlockHandler).Methods("GET", "HEAD")
39 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
40 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
41 // List all blocks stored here. Privileged client only.
42 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
43 // List blocks stored here whose hash has the given prefix.
44 // Privileged client only.
45 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
47 // List volumes: path, device number, bytes used/avail.
48 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
50 // Replace the current pull queue.
51 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
53 // Replace the current trash queue.
54 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
56 // Undelete moves blocks from trash back into store
57 rest.HandleFunc(`/undelete/{hash:[0-9a-f]{32}}`, UndeleteHandler).Methods("PUT")
59 // Any request which does not match any of these routes gets
61 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
66 // BadRequestHandler is a HandleFunc to address bad requests.
67 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
68 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
71 // GetBlockHandler is a HandleFunc to address Get block requests.
72 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
73 if enforcePermissions {
74 locator := req.URL.Path[1:] // strip leading slash
75 if err := VerifySignature(locator, GetApiToken(req)); err != nil {
76 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
81 block, err := GetBlock(mux.Vars(req)["hash"])
83 // This type assertion is safe because the only errors
84 // GetBlock can return are DiskHashError or NotFoundError.
85 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
90 resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
91 resp.Header().Set("Content-Type", "application/octet-stream")
95 // PutBlockHandler is a HandleFunc to address Put block requests.
96 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
97 hash := mux.Vars(req)["hash"]
99 // Detect as many error conditions as possible before reading
100 // the body: avoid transmitting data that will not end up
101 // being written anyway.
103 if req.ContentLength == -1 {
104 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
108 if req.ContentLength > BlockSize {
109 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
113 if len(KeepVM.AllWritable()) == 0 {
114 http.Error(resp, FullError.Error(), FullError.HTTPCode)
118 buf := bufs.Get(int(req.ContentLength))
119 _, err := io.ReadFull(req.Body, buf)
121 http.Error(resp, err.Error(), 500)
126 replication, err := PutBlock(buf, hash)
130 ke := err.(*KeepError)
131 http.Error(resp, ke.Error(), ke.HTTPCode)
135 // Success; add a size hint, sign the locator if possible, and
136 // return it to the client.
137 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
138 apiToken := GetApiToken(req)
139 if PermissionSecret != nil && apiToken != "" {
140 expiry := time.Now().Add(blobSignatureTTL)
141 returnHash = SignLocator(returnHash, apiToken, expiry)
143 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
144 resp.Write([]byte(returnHash + "\n"))
147 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
148 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
149 // Reject unauthorized requests.
150 if !IsDataManagerToken(GetApiToken(req)) {
151 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
155 prefix := mux.Vars(req)["prefix"]
157 for _, vol := range KeepVM.AllReadable() {
158 if err := vol.IndexTo(prefix, resp); err != nil {
159 // The only errors returned by IndexTo are
160 // write errors returned by resp.Write(),
161 // which probably means the client has
162 // disconnected and this error will never be
163 // reported to the client -- but it will
164 // appear in our own error log.
165 http.Error(resp, err.Error(), http.StatusInternalServerError)
169 // An empty line at EOF is the only way the client can be
170 // assured the entire index was received.
171 resp.Write([]byte{'\n'})
175 // Responds to /status.json requests with the current node status,
176 // described in a JSON structure.
178 // The data given in a status.json response includes:
179 // volumes - a list of Keep volumes currently in use by this server
180 // each volume is an object with the following fields:
182 // * device_num (an integer identifying the underlying filesystem)
187 type PoolStatus struct {
188 Alloc uint64 `json:"BytesAllocated"`
189 Cap int `json:"BuffersMax"`
190 Len int `json:"BuffersInUse"`
194 type NodeStatus struct {
195 Volumes []*VolumeStatus `json:"volumes"`
196 BufferPool PoolStatus
197 PullQueue WorkQueueStatus
198 TrashQueue WorkQueueStatus
199 Memory runtime.MemStats
203 var stLock sync.Mutex
205 // StatusHandler addresses /status.json requests.
206 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
209 jstat, err := json.Marshal(&st)
214 log.Printf("json.Marshal: %s", err)
215 log.Printf("NodeStatus = %v", &st)
216 http.Error(resp, err.Error(), 500)
220 // populate the given NodeStatus struct with current values.
221 func readNodeStatus(st *NodeStatus) {
222 vols := KeepVM.AllReadable()
223 if cap(st.Volumes) < len(vols) {
224 st.Volumes = make([]*VolumeStatus, len(vols))
226 st.Volumes = st.Volumes[:0]
227 for _, vol := range vols {
228 if s := vol.Status(); s != nil {
229 st.Volumes = append(st.Volumes, s)
232 st.BufferPool.Alloc = bufs.Alloc()
233 st.BufferPool.Cap = bufs.Cap()
234 st.BufferPool.Len = bufs.Len()
235 st.PullQueue = getWorkQueueStatus(pullq)
236 st.TrashQueue = getWorkQueueStatus(trashq)
237 runtime.ReadMemStats(&st.Memory)
240 // return a WorkQueueStatus for the given queue. If q is nil (which
241 // should never happen except in test suites), return a zero status
242 // value instead of crashing.
243 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
245 // This should only happen during tests.
246 return WorkQueueStatus{}
251 // DeleteHandler processes DELETE requests.
253 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
254 // from all connected volumes.
256 // Only the Data Manager, or an Arvados admin with scope "all", are
257 // allowed to issue DELETE requests. If a DELETE request is not
258 // authenticated or is issued by a non-admin user, the server returns
259 // a PermissionError.
261 // Upon receiving a valid request from an authorized user,
262 // DeleteHandler deletes all copies of the specified block on local
267 // If the requested blocks was not found on any volume, the response
268 // code is HTTP 404 Not Found.
270 // Otherwise, the response code is 200 OK, with a response body
271 // consisting of the JSON message
273 // {"copies_deleted":d,"copies_failed":f}
275 // where d and f are integers representing the number of blocks that
276 // were successfully and unsuccessfully deleted.
278 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
279 hash := mux.Vars(req)["hash"]
281 // Confirm that this user is an admin and has a token with unlimited scope.
282 var tok = GetApiToken(req)
283 if tok == "" || !CanDelete(tok) {
284 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
289 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
293 // Delete copies of this block from all available volumes.
294 // Report how many blocks were successfully deleted, and how
295 // many were found on writable volumes but not deleted.
297 Deleted int `json:"copies_deleted"`
298 Failed int `json:"copies_failed"`
300 for _, vol := range KeepVM.AllWritable() {
301 if err := vol.Trash(hash); err == nil {
303 } else if os.IsNotExist(err) {
307 log.Println("DeleteHandler:", err)
313 if result.Deleted == 0 && result.Failed == 0 {
314 st = http.StatusNotFound
321 if st == http.StatusOK {
322 if body, err := json.Marshal(result); err == nil {
325 log.Printf("json.Marshal: %s (result = %v)", err, result)
326 http.Error(resp, err.Error(), 500)
331 /* PullHandler processes "PUT /pull" requests for the data manager.
332 The request body is a JSON message containing a list of pull
333 requests in the following format:
337 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
339 "keep0.qr1hi.arvadosapi.com:25107",
340 "keep1.qr1hi.arvadosapi.com:25108"
344 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
354 Each pull request in the list consists of a block locator string
355 and an ordered list of servers. Keepstore should try to fetch the
356 block from each server in turn.
358 If the request has not been sent by the Data Manager, return 401
361 If the JSON unmarshalling fails, return 400 Bad Request.
364 // PullRequest consists of a block locator and an ordered list of servers
365 type PullRequest struct {
366 Locator string `json:"locator"`
367 Servers []string `json:"servers"`
370 // PullHandler processes "PUT /pull" requests for the data manager.
371 func PullHandler(resp http.ResponseWriter, req *http.Request) {
372 // Reject unauthorized requests.
373 if !IsDataManagerToken(GetApiToken(req)) {
374 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
378 // Parse the request body.
380 r := json.NewDecoder(req.Body)
381 if err := r.Decode(&pr); err != nil {
382 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
386 // We have a properly formatted pull list sent from the data
387 // manager. Report success and send the list to the pull list
388 // manager for further handling.
389 resp.WriteHeader(http.StatusOK)
391 fmt.Sprintf("Received %d pull requests\n", len(pr))))
394 for _, p := range pr {
397 pullq.ReplaceQueue(plist)
400 // TrashRequest consists of a block locator and it's Mtime
401 type TrashRequest struct {
402 Locator string `json:"locator"`
403 BlockMtime int64 `json:"block_mtime"`
406 // TrashHandler processes /trash requests.
407 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
408 // Reject unauthorized requests.
409 if !IsDataManagerToken(GetApiToken(req)) {
410 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
414 // Parse the request body.
415 var trash []TrashRequest
416 r := json.NewDecoder(req.Body)
417 if err := r.Decode(&trash); err != nil {
418 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
422 // We have a properly formatted trash list sent from the data
423 // manager. Report success and send the list to the trash work
424 // queue for further handling.
425 resp.WriteHeader(http.StatusOK)
427 fmt.Sprintf("Received %d trash requests\n", len(trash))))
430 for _, t := range trash {
433 trashq.ReplaceQueue(tlist)
436 // UndeleteHandler processes "PUT /undelete/{hash:[0-9a-f]{32}}" requests for the data manager.
437 func UndeleteHandler(resp http.ResponseWriter, req *http.Request) {
438 // Reject unauthorized requests.
439 if !IsDataManagerToken(GetApiToken(req)) {
440 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
444 hash := mux.Vars(req)["hash"]
445 successResp := "Untrashed on volume: "
447 for _, vol := range KeepVM.AllWritable() {
448 if err := vol.Untrash(hash); err == nil {
449 log.Printf("Untrashed %v on volume %v", hash, vol.String())
451 successResp += vol.String()
454 log.Printf("Error untrashing %v on volume %v: %v", hash, vol.String(), err)
459 if st == http.StatusOK {
460 resp.Write([]byte(successResp))
466 // ==============================
467 // GetBlock and PutBlock implement lower-level code for handling
468 // blocks by rooting through volumes connected to the local machine.
469 // Once the handler has determined that system policy permits the
470 // request, it calls these methods to perform the actual operation.
472 // TODO(twp): this code would probably be better located in the
473 // VolumeManager interface. As an abstraction, the VolumeManager
474 // should be the only part of the code that cares about which volume a
475 // block is stored on, so it should be responsible for figuring out
476 // which volume to check for fetching blocks, storing blocks, etc.
477 // ==============================
479 // GetBlock fetches and returns the block identified by "hash".
481 // On success, GetBlock returns a byte slice with the block data, and
484 // If the block cannot be found on any volume, returns NotFoundError.
486 // If the block found does not have the correct MD5 hash, returns
489 func GetBlock(hash string) ([]byte, error) {
490 // Attempt to read the requested hash from a keep volume.
491 errorToCaller := NotFoundError
493 for _, vol := range KeepVM.AllReadable() {
494 buf, err := vol.Get(hash)
496 // IsNotExist is an expected error and may be
497 // ignored. All other errors are logged. In
498 // any case we continue trying to read other
499 // volumes. If all volumes report IsNotExist,
500 // we return a NotFoundError.
501 if !os.IsNotExist(err) {
502 log.Printf("%s: Get(%s): %s", vol, hash, err)
506 // Check the file checksum.
508 filehash := fmt.Sprintf("%x", md5.Sum(buf))
509 if filehash != hash {
510 // TODO: Try harder to tell a sysadmin about
512 log.Printf("%s: checksum mismatch for request %s (actual %s)",
514 errorToCaller = DiskHashError
518 if errorToCaller == DiskHashError {
519 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
524 return nil, errorToCaller
527 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
529 // PutBlock(block, hash)
530 // Stores the BLOCK (identified by the content id HASH) in Keep.
532 // The MD5 checksum of the block must be identical to the content id HASH.
533 // If not, an error is returned.
535 // PutBlock stores the BLOCK on the first Keep volume with free space.
536 // A failure code is returned to the user only if all volumes fail.
538 // On success, PutBlock returns nil.
539 // On failure, it returns a KeepError with one of the following codes:
542 // A different block with the same hash already exists on this
545 // The MD5 hash of the BLOCK does not match the argument HASH.
547 // There was not enough space left in any Keep volume to store
550 // The object could not be stored for some other reason (e.g.
551 // all writes failed). The text of the error message should
552 // provide as much detail as possible.
554 func PutBlock(block []byte, hash string) (int, error) {
555 // Check that BLOCK's checksum matches HASH.
556 blockhash := fmt.Sprintf("%x", md5.Sum(block))
557 if blockhash != hash {
558 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
559 return 0, RequestHashError
562 // If we already have this data, it's intact on disk, and we
563 // can update its timestamp, return success. If we have
564 // different data with the same hash, return failure.
565 if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
569 // Choose a Keep volume to write to.
570 // If this volume fails, try all of the volumes in order.
571 if vol := KeepVM.NextWritable(); vol != nil {
572 if err := vol.Put(hash, block); err == nil {
573 return vol.Replication(), nil // success!
577 writables := KeepVM.AllWritable()
578 if len(writables) == 0 {
579 log.Print("No writable volumes.")
584 for _, vol := range writables {
585 err := vol.Put(hash, block)
587 return vol.Replication(), nil // success!
589 if err != FullError {
590 // The volume is not full but the
591 // write did not succeed. Report the
592 // error and continue trying.
594 log.Printf("%s: Write(%s): %s", vol, hash, err)
599 log.Print("All volumes are full.")
602 // Already logged the non-full errors.
603 return 0, GenericError
606 // CompareAndTouch returns the current replication level if one of the
607 // volumes already has the given content and it successfully updates
608 // the relevant block's modification time in order to protect it from
609 // premature garbage collection. Otherwise, it returns a non-nil
611 func CompareAndTouch(hash string, buf []byte) (int, error) {
612 var bestErr error = NotFoundError
613 for _, vol := range KeepVM.AllWritable() {
614 if err := vol.Compare(hash, buf); err == CollisionError {
615 // Stop if we have a block with same hash but
616 // different content. (It will be impossible
617 // to tell which one is wanted if we have
618 // both, so there's no point writing it even
619 // on a different volume.)
620 log.Printf("%s: Compare(%s): %s", vol, hash, err)
622 } else if os.IsNotExist(err) {
623 // Block does not exist. This is the only
624 // "normal" error: we don't log anything.
626 } else if err != nil {
627 // Couldn't open file, data is corrupt on
628 // disk, etc.: log this abnormal condition,
629 // and try the next volume.
630 log.Printf("%s: Compare(%s): %s", vol, hash, err)
633 if err := vol.Touch(hash); err != nil {
634 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
638 // Compare and Touch both worked --> done.
639 return vol.Replication(), nil
644 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
646 // IsValidLocator returns true if the specified string is a valid Keep locator.
647 // When Keep is extended to support hash types other than MD5,
648 // this should be updated to cover those as well.
650 func IsValidLocator(loc string) bool {
651 return validLocatorRe.MatchString(loc)
654 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
656 // GetApiToken returns the OAuth2 token from the Authorization
657 // header of a HTTP request, or an empty string if no matching
659 func GetApiToken(req *http.Request) string {
660 if auth, ok := req.Header["Authorization"]; ok {
661 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
668 // IsExpired returns true if the given Unix timestamp (expressed as a
669 // hexadecimal string) is in the past, or if timestampHex cannot be
670 // parsed as a hexadecimal string.
671 func IsExpired(timestampHex string) bool {
672 ts, err := strconv.ParseInt(timestampHex, 16, 0)
674 log.Printf("IsExpired: %s", err)
677 return time.Unix(ts, 0).Before(time.Now())
680 // CanDelete returns true if the user identified by apiToken is
681 // allowed to delete blocks.
682 func CanDelete(apiToken string) bool {
686 // Blocks may be deleted only when Keep has been configured with a
688 if IsDataManagerToken(apiToken) {
691 // TODO(twp): look up apiToken with the API server
692 // return true if is_admin is true and if the token
693 // has unlimited scope
697 // IsDataManagerToken returns true if apiToken represents the data
699 func IsDataManagerToken(apiToken string) bool {
700 return dataManagerToken != "" && apiToken == dataManagerToken