3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
15 "github.com/gorilla/mux"
27 // MakeRESTRouter returns a new mux.Router that forwards all Keep
28 // requests to the appropriate handlers.
30 func MakeRESTRouter() *mux.Router {
31 rest := mux.NewRouter()
34 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
36 `/{hash:[0-9a-f]{32}}+{hints}`,
37 GetBlockHandler).Methods("GET", "HEAD")
39 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
40 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
41 // List all blocks stored here. Privileged client only.
42 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
43 // List blocks stored here whose hash has the given prefix.
44 // Privileged client only.
45 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
47 // List volumes: path, device number, bytes used/avail.
48 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
50 // Replace the current pull queue.
51 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
53 // Replace the current trash queue.
54 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
56 // Any request which does not match any of these routes gets
58 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
63 // BadRequestHandler is a HandleFunc to address bad requests.
64 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
65 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
68 // GetBlockHandler is a HandleFunc to address Get block requests.
69 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
70 if enforcePermissions {
71 locator := req.URL.Path[1:] // strip leading slash
72 if err := VerifySignature(locator, GetApiToken(req)); err != nil {
73 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
78 block, err := GetBlock(mux.Vars(req)["hash"])
80 // This type assertion is safe because the only errors
81 // GetBlock can return are DiskHashError or NotFoundError.
82 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
87 resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
88 resp.Header().Set("Content-Type", "application/octet-stream")
92 // PutBlockHandler is a HandleFunc to address Put block requests.
93 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
94 hash := mux.Vars(req)["hash"]
96 // Detect as many error conditions as possible before reading
97 // the body: avoid transmitting data that will not end up
98 // being written anyway.
100 if req.ContentLength == -1 {
101 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
105 if req.ContentLength > BlockSize {
106 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
110 if len(KeepVM.AllWritable()) == 0 {
111 http.Error(resp, FullError.Error(), FullError.HTTPCode)
115 buf := bufs.Get(int(req.ContentLength))
116 _, err := io.ReadFull(req.Body, buf)
118 http.Error(resp, err.Error(), 500)
123 replication, err := PutBlock(buf, hash)
127 ke := err.(*KeepError)
128 http.Error(resp, ke.Error(), ke.HTTPCode)
132 // Success; add a size hint, sign the locator if possible, and
133 // return it to the client.
134 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
135 apiToken := GetApiToken(req)
136 if PermissionSecret != nil && apiToken != "" {
137 expiry := time.Now().Add(blobSignatureTTL)
138 returnHash = SignLocator(returnHash, apiToken, expiry)
140 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
141 resp.Write([]byte(returnHash + "\n"))
144 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
145 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
146 // Reject unauthorized requests.
147 if !IsDataManagerToken(GetApiToken(req)) {
148 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
152 prefix := mux.Vars(req)["prefix"]
154 for _, vol := range KeepVM.AllReadable() {
155 if err := vol.IndexTo(prefix, resp); err != nil {
156 // The only errors returned by IndexTo are
157 // write errors returned by resp.Write(),
158 // which probably means the client has
159 // disconnected and this error will never be
160 // reported to the client -- but it will
161 // appear in our own error log.
162 http.Error(resp, err.Error(), http.StatusInternalServerError)
166 // An empty line at EOF is the only way the client can be
167 // assured the entire index was received.
168 resp.Write([]byte{'\n'})
172 // Responds to /status.json requests with the current node status,
173 // described in a JSON structure.
175 // The data given in a status.json response includes:
176 // volumes - a list of Keep volumes currently in use by this server
177 // each volume is an object with the following fields:
179 // * device_num (an integer identifying the underlying filesystem)
184 type PoolStatus struct {
185 Alloc uint64 `json:"BytesAllocated"`
186 Cap int `json:"BuffersMax"`
187 Len int `json:"BuffersInUse"`
191 type NodeStatus struct {
192 Volumes []*VolumeStatus `json:"volumes"`
193 BufferPool PoolStatus
194 PullQueue WorkQueueStatus
195 TrashQueue WorkQueueStatus
196 Memory runtime.MemStats
200 var stLock sync.Mutex
202 // StatusHandler addresses /status.json requests.
203 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
206 jstat, err := json.Marshal(&st)
211 log.Printf("json.Marshal: %s", err)
212 log.Printf("NodeStatus = %v", &st)
213 http.Error(resp, err.Error(), 500)
217 // populate the given NodeStatus struct with current values.
218 func readNodeStatus(st *NodeStatus) {
219 vols := KeepVM.AllReadable()
220 if cap(st.Volumes) < len(vols) {
221 st.Volumes = make([]*VolumeStatus, len(vols))
223 st.Volumes = st.Volumes[:0]
224 for _, vol := range vols {
225 if s := vol.Status(); s != nil {
226 st.Volumes = append(st.Volumes, s)
229 st.BufferPool.Alloc = bufs.Alloc()
230 st.BufferPool.Cap = bufs.Cap()
231 st.BufferPool.Len = bufs.Len()
232 st.PullQueue = getWorkQueueStatus(pullq)
233 st.TrashQueue = getWorkQueueStatus(trashq)
234 runtime.ReadMemStats(&st.Memory)
237 // return a WorkQueueStatus for the given queue. If q is nil (which
238 // should never happen except in test suites), return a zero status
239 // value instead of crashing.
240 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
242 // This should only happen during tests.
243 return WorkQueueStatus{}
248 // DeleteHandler processes DELETE requests.
250 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
251 // from all connected volumes.
253 // Only the Data Manager, or an Arvados admin with scope "all", are
254 // allowed to issue DELETE requests. If a DELETE request is not
255 // authenticated or is issued by a non-admin user, the server returns
256 // a PermissionError.
258 // Upon receiving a valid request from an authorized user,
259 // DeleteHandler deletes all copies of the specified block on local
264 // If the requested blocks was not found on any volume, the response
265 // code is HTTP 404 Not Found.
267 // Otherwise, the response code is 200 OK, with a response body
268 // consisting of the JSON message
270 // {"copies_deleted":d,"copies_failed":f}
272 // where d and f are integers representing the number of blocks that
273 // were successfully and unsuccessfully deleted.
275 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
276 hash := mux.Vars(req)["hash"]
278 // Confirm that this user is an admin and has a token with unlimited scope.
279 var tok = GetApiToken(req)
280 if tok == "" || !CanDelete(tok) {
281 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
286 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
290 // Delete copies of this block from all available volumes.
291 // Report how many blocks were successfully deleted, and how
292 // many were found on writable volumes but not deleted.
294 Deleted int `json:"copies_deleted"`
295 Failed int `json:"copies_failed"`
297 for _, vol := range KeepVM.AllWritable() {
298 if err := vol.Delete(hash); err == nil {
300 } else if os.IsNotExist(err) {
304 log.Println("DeleteHandler:", err)
310 if result.Deleted == 0 && result.Failed == 0 {
311 st = http.StatusNotFound
318 if st == http.StatusOK {
319 if body, err := json.Marshal(result); err == nil {
322 log.Printf("json.Marshal: %s (result = %v)", err, result)
323 http.Error(resp, err.Error(), 500)
328 /* PullHandler processes "PUT /pull" requests for the data manager.
329 The request body is a JSON message containing a list of pull
330 requests in the following format:
334 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
336 "keep0.qr1hi.arvadosapi.com:25107",
337 "keep1.qr1hi.arvadosapi.com:25108"
341 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
351 Each pull request in the list consists of a block locator string
352 and an ordered list of servers. Keepstore should try to fetch the
353 block from each server in turn.
355 If the request has not been sent by the Data Manager, return 401
358 If the JSON unmarshalling fails, return 400 Bad Request.
361 // PullRequest consists of a block locator and an ordered list of servers
362 type PullRequest struct {
363 Locator string `json:"locator"`
364 Servers []string `json:"servers"`
367 // PullHandler processes "PUT /pull" requests for the data manager.
368 func PullHandler(resp http.ResponseWriter, req *http.Request) {
369 // Reject unauthorized requests.
370 if !IsDataManagerToken(GetApiToken(req)) {
371 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
375 // Parse the request body.
377 r := json.NewDecoder(req.Body)
378 if err := r.Decode(&pr); err != nil {
379 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
383 // We have a properly formatted pull list sent from the data
384 // manager. Report success and send the list to the pull list
385 // manager for further handling.
386 resp.WriteHeader(http.StatusOK)
388 fmt.Sprintf("Received %d pull requests\n", len(pr))))
391 for _, p := range pr {
394 pullq.ReplaceQueue(plist)
397 // TrashRequest consists of a block locator and it's Mtime
398 type TrashRequest struct {
399 Locator string `json:"locator"`
400 BlockMtime int64 `json:"block_mtime"`
403 // TrashHandler processes /trash requests.
404 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
405 // Reject unauthorized requests.
406 if !IsDataManagerToken(GetApiToken(req)) {
407 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
411 // Parse the request body.
412 var trash []TrashRequest
413 r := json.NewDecoder(req.Body)
414 if err := r.Decode(&trash); err != nil {
415 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
419 // We have a properly formatted trash list sent from the data
420 // manager. Report success and send the list to the trash work
421 // queue for further handling.
422 resp.WriteHeader(http.StatusOK)
424 fmt.Sprintf("Received %d trash requests\n", len(trash))))
427 for _, t := range trash {
430 trashq.ReplaceQueue(tlist)
433 // ==============================
434 // GetBlock and PutBlock implement lower-level code for handling
435 // blocks by rooting through volumes connected to the local machine.
436 // Once the handler has determined that system policy permits the
437 // request, it calls these methods to perform the actual operation.
439 // TODO(twp): this code would probably be better located in the
440 // VolumeManager interface. As an abstraction, the VolumeManager
441 // should be the only part of the code that cares about which volume a
442 // block is stored on, so it should be responsible for figuring out
443 // which volume to check for fetching blocks, storing blocks, etc.
444 // ==============================
446 // GetBlock fetches and returns the block identified by "hash".
448 // On success, GetBlock returns a byte slice with the block data, and
451 // If the block cannot be found on any volume, returns NotFoundError.
453 // If the block found does not have the correct MD5 hash, returns
456 func GetBlock(hash string) ([]byte, error) {
457 // Attempt to read the requested hash from a keep volume.
458 errorToCaller := NotFoundError
460 for _, vol := range KeepVM.AllReadable() {
461 buf, err := vol.Get(hash)
463 // IsNotExist is an expected error and may be
464 // ignored. All other errors are logged. In
465 // any case we continue trying to read other
466 // volumes. If all volumes report IsNotExist,
467 // we return a NotFoundError.
468 if !os.IsNotExist(err) {
469 log.Printf("%s: Get(%s): %s", vol, hash, err)
473 // Check the file checksum.
475 filehash := fmt.Sprintf("%x", md5.Sum(buf))
476 if filehash != hash {
477 // TODO: Try harder to tell a sysadmin about
479 log.Printf("%s: checksum mismatch for request %s (actual %s)",
481 errorToCaller = DiskHashError
485 if errorToCaller == DiskHashError {
486 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
491 return nil, errorToCaller
494 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
496 // PutBlock(block, hash)
497 // Stores the BLOCK (identified by the content id HASH) in Keep.
499 // The MD5 checksum of the block must be identical to the content id HASH.
500 // If not, an error is returned.
502 // PutBlock stores the BLOCK on the first Keep volume with free space.
503 // A failure code is returned to the user only if all volumes fail.
505 // On success, PutBlock returns nil.
506 // On failure, it returns a KeepError with one of the following codes:
509 // A different block with the same hash already exists on this
512 // The MD5 hash of the BLOCK does not match the argument HASH.
514 // There was not enough space left in any Keep volume to store
517 // The object could not be stored for some other reason (e.g.
518 // all writes failed). The text of the error message should
519 // provide as much detail as possible.
521 func PutBlock(block []byte, hash string) (int, error) {
522 // Check that BLOCK's checksum matches HASH.
523 blockhash := fmt.Sprintf("%x", md5.Sum(block))
524 if blockhash != hash {
525 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
526 return 0, RequestHashError
529 // If we already have this data, it's intact on disk, and we
530 // can update its timestamp, return success. If we have
531 // different data with the same hash, return failure.
532 if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
536 // Choose a Keep volume to write to.
537 // If this volume fails, try all of the volumes in order.
538 if vol := KeepVM.NextWritable(); vol != nil {
539 if err := vol.Put(hash, block); err == nil {
540 return vol.Replication(), nil // success!
544 writables := KeepVM.AllWritable()
545 if len(writables) == 0 {
546 log.Print("No writable volumes.")
551 for _, vol := range writables {
552 err := vol.Put(hash, block)
554 return vol.Replication(), nil // success!
556 if err != FullError {
557 // The volume is not full but the
558 // write did not succeed. Report the
559 // error and continue trying.
561 log.Printf("%s: Write(%s): %s", vol, hash, err)
566 log.Print("All volumes are full.")
569 // Already logged the non-full errors.
570 return 0, GenericError
573 // CompareAndTouch returns the current replication level if one of the
574 // volumes already has the given content and it successfully updates
575 // the relevant block's modification time in order to protect it from
576 // premature garbage collection. Otherwise, it returns a non-nil
578 func CompareAndTouch(hash string, buf []byte) (int, error) {
579 var bestErr error = NotFoundError
580 for _, vol := range KeepVM.AllWritable() {
581 if err := vol.Compare(hash, buf); err == CollisionError {
582 // Stop if we have a block with same hash but
583 // different content. (It will be impossible
584 // to tell which one is wanted if we have
585 // both, so there's no point writing it even
586 // on a different volume.)
587 log.Printf("%s: Compare(%s): %s", vol, hash, err)
589 } else if os.IsNotExist(err) {
590 // Block does not exist. This is the only
591 // "normal" error: we don't log anything.
593 } else if err != nil {
594 // Couldn't open file, data is corrupt on
595 // disk, etc.: log this abnormal condition,
596 // and try the next volume.
597 log.Printf("%s: Compare(%s): %s", vol, hash, err)
600 if err := vol.Touch(hash); err != nil {
601 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
605 // Compare and Touch both worked --> done.
606 return vol.Replication(), nil
611 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
613 // IsValidLocator returns true if the specified string is a valid Keep locator.
614 // When Keep is extended to support hash types other than MD5,
615 // this should be updated to cover those as well.
617 func IsValidLocator(loc string) bool {
618 return validLocatorRe.MatchString(loc)
621 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
623 // GetApiToken returns the OAuth2 token from the Authorization
624 // header of a HTTP request, or an empty string if no matching
626 func GetApiToken(req *http.Request) string {
627 if auth, ok := req.Header["Authorization"]; ok {
628 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
635 // IsExpired returns true if the given Unix timestamp (expressed as a
636 // hexadecimal string) is in the past, or if timestampHex cannot be
637 // parsed as a hexadecimal string.
638 func IsExpired(timestampHex string) bool {
639 ts, err := strconv.ParseInt(timestampHex, 16, 0)
641 log.Printf("IsExpired: %s", err)
644 return time.Unix(ts, 0).Before(time.Now())
647 // CanDelete returns true if the user identified by apiToken is
648 // allowed to delete blocks.
649 func CanDelete(apiToken string) bool {
653 // Blocks may be deleted only when Keep has been configured with a
655 if IsDataManagerToken(apiToken) {
658 // TODO(twp): look up apiToken with the API server
659 // return true if is_admin is true and if the token
660 // has unlimited scope
664 // IsDataManagerToken returns true if apiToken represents the data
666 func IsDataManagerToken(apiToken string) bool {
667 return dataManagerToken != "" && apiToken == dataManagerToken