3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
15 "github.com/gorilla/mux"
27 // MakeRESTRouter returns a new mux.Router that forwards all Keep
28 // requests to the appropriate handlers.
30 func MakeRESTRouter() *mux.Router {
31 rest := mux.NewRouter()
34 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
36 `/{hash:[0-9a-f]{32}}+{hints}`,
37 GetBlockHandler).Methods("GET", "HEAD")
39 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
40 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
41 // List all blocks stored here. Privileged client only.
42 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
43 // List blocks stored here whose hash has the given prefix.
44 // Privileged client only.
45 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
47 // List volumes: path, device number, bytes used/avail.
48 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
50 // Replace the current pull queue.
51 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
53 // Replace the current trash queue.
54 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
56 // Any request which does not match any of these routes gets
58 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
63 // BadRequestHandler is a HandleFunc to address bad requests.
64 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
65 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
68 // GetBlockHandler is a HandleFunc to address Get block requests.
69 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
70 if enforcePermissions {
71 locator := req.URL.Path[1:] // strip leading slash
72 if err := VerifySignature(locator, GetApiToken(req)); err != nil {
73 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
78 block, err := GetBlock(mux.Vars(req)["hash"])
80 // This type assertion is safe because the only errors
81 // GetBlock can return are DiskHashError or NotFoundError.
82 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
87 resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
88 resp.Header().Set("Content-Type", "application/octet-stream")
92 // PutBlockHandler is a HandleFunc to address Put block requests.
93 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
94 hash := mux.Vars(req)["hash"]
96 // Detect as many error conditions as possible before reading
97 // the body: avoid transmitting data that will not end up
98 // being written anyway.
100 if req.ContentLength == -1 {
101 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
105 if req.ContentLength > BlockSize {
106 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
110 if len(KeepVM.AllWritable()) == 0 {
111 http.Error(resp, FullError.Error(), FullError.HTTPCode)
115 buf := bufs.Get(int(req.ContentLength))
116 _, err := io.ReadFull(req.Body, buf)
118 http.Error(resp, err.Error(), 500)
123 err = PutBlock(buf, hash)
127 ke := err.(*KeepError)
128 http.Error(resp, ke.Error(), ke.HTTPCode)
132 // Success; add a size hint, sign the locator if possible, and
133 // return it to the client.
134 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
135 apiToken := GetApiToken(req)
136 if PermissionSecret != nil && apiToken != "" {
137 expiry := time.Now().Add(blobSignatureTTL)
138 returnHash = SignLocator(returnHash, apiToken, expiry)
140 resp.Write([]byte(returnHash + "\n"))
143 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
144 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
145 // Reject unauthorized requests.
146 if !IsDataManagerToken(GetApiToken(req)) {
147 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
151 prefix := mux.Vars(req)["prefix"]
153 for _, vol := range KeepVM.AllReadable() {
154 if err := vol.IndexTo(prefix, resp); err != nil {
155 // The only errors returned by IndexTo are
156 // write errors returned by resp.Write(),
157 // which probably means the client has
158 // disconnected and this error will never be
159 // reported to the client -- but it will
160 // appear in our own error log.
161 http.Error(resp, err.Error(), http.StatusInternalServerError)
165 // An empty line at EOF is the only way the client can be
166 // assured the entire index was received.
167 resp.Write([]byte{'\n'})
171 // Responds to /status.json requests with the current node status,
172 // described in a JSON structure.
174 // The data given in a status.json response includes:
175 // volumes - a list of Keep volumes currently in use by this server
176 // each volume is an object with the following fields:
178 // * device_num (an integer identifying the underlying filesystem)
183 type PoolStatus struct {
184 Alloc uint64 `json:"BytesAllocated"`
185 Cap int `json:"BuffersMax"`
186 Len int `json:"BuffersInUse"`
190 type NodeStatus struct {
191 Volumes []*VolumeStatus `json:"volumes"`
192 BufferPool PoolStatus
193 PullQueue WorkQueueStatus
194 TrashQueue WorkQueueStatus
195 Memory runtime.MemStats
199 var stLock sync.Mutex
201 // StatusHandler addresses /status.json requests.
202 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
205 jstat, err := json.Marshal(&st)
210 log.Printf("json.Marshal: %s", err)
211 log.Printf("NodeStatus = %v", &st)
212 http.Error(resp, err.Error(), 500)
216 // populate the given NodeStatus struct with current values.
217 func readNodeStatus(st *NodeStatus) {
218 vols := KeepVM.AllReadable()
219 if cap(st.Volumes) < len(vols) {
220 st.Volumes = make([]*VolumeStatus, len(vols))
222 st.Volumes = st.Volumes[:0]
223 for _, vol := range vols {
224 if s := vol.Status(); s != nil {
225 st.Volumes = append(st.Volumes, s)
228 st.BufferPool.Alloc = bufs.Alloc()
229 st.BufferPool.Cap = bufs.Cap()
230 st.BufferPool.Len = bufs.Len()
231 st.PullQueue = getWorkQueueStatus(pullq)
232 st.TrashQueue = getWorkQueueStatus(trashq)
233 runtime.ReadMemStats(&st.Memory)
236 // return a WorkQueueStatus for the given queue. If q is nil (which
237 // should never happen except in test suites), return a zero status
238 // value instead of crashing.
239 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
241 // This should only happen during tests.
242 return WorkQueueStatus{}
247 // DeleteHandler processes DELETE requests.
249 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
250 // from all connected volumes.
252 // Only the Data Manager, or an Arvados admin with scope "all", are
253 // allowed to issue DELETE requests. If a DELETE request is not
254 // authenticated or is issued by a non-admin user, the server returns
255 // a PermissionError.
257 // Upon receiving a valid request from an authorized user,
258 // DeleteHandler deletes all copies of the specified block on local
263 // If the requested blocks was not found on any volume, the response
264 // code is HTTP 404 Not Found.
266 // Otherwise, the response code is 200 OK, with a response body
267 // consisting of the JSON message
269 // {"copies_deleted":d,"copies_failed":f}
271 // where d and f are integers representing the number of blocks that
272 // were successfully and unsuccessfully deleted.
274 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
275 hash := mux.Vars(req)["hash"]
277 // Confirm that this user is an admin and has a token with unlimited scope.
278 var tok = GetApiToken(req)
279 if tok == "" || !CanDelete(tok) {
280 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
285 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
289 // Delete copies of this block from all available volumes.
290 // Report how many blocks were successfully deleted, and how
291 // many were found on writable volumes but not deleted.
293 Deleted int `json:"copies_deleted"`
294 Failed int `json:"copies_failed"`
296 for _, vol := range KeepVM.AllWritable() {
297 if err := vol.Delete(hash); err == nil {
299 } else if os.IsNotExist(err) {
303 log.Println("DeleteHandler:", err)
309 if result.Deleted == 0 && result.Failed == 0 {
310 st = http.StatusNotFound
317 if st == http.StatusOK {
318 if body, err := json.Marshal(result); err == nil {
321 log.Printf("json.Marshal: %s (result = %v)", err, result)
322 http.Error(resp, err.Error(), 500)
327 /* PullHandler processes "PUT /pull" requests for the data manager.
328 The request body is a JSON message containing a list of pull
329 requests in the following format:
333 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
335 "keep0.qr1hi.arvadosapi.com:25107",
336 "keep1.qr1hi.arvadosapi.com:25108"
340 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
350 Each pull request in the list consists of a block locator string
351 and an ordered list of servers. Keepstore should try to fetch the
352 block from each server in turn.
354 If the request has not been sent by the Data Manager, return 401
357 If the JSON unmarshalling fails, return 400 Bad Request.
360 // PullRequest consists of a block locator and an ordered list of servers
361 type PullRequest struct {
362 Locator string `json:"locator"`
363 Servers []string `json:"servers"`
366 // PullHandler processes "PUT /pull" requests for the data manager.
367 func PullHandler(resp http.ResponseWriter, req *http.Request) {
368 // Reject unauthorized requests.
369 if !IsDataManagerToken(GetApiToken(req)) {
370 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
374 // Parse the request body.
376 r := json.NewDecoder(req.Body)
377 if err := r.Decode(&pr); err != nil {
378 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
382 // We have a properly formatted pull list sent from the data
383 // manager. Report success and send the list to the pull list
384 // manager for further handling.
385 resp.WriteHeader(http.StatusOK)
387 fmt.Sprintf("Received %d pull requests\n", len(pr))))
390 for _, p := range pr {
393 pullq.ReplaceQueue(plist)
396 // TrashRequest consists of a block locator and it's Mtime
397 type TrashRequest struct {
398 Locator string `json:"locator"`
399 BlockMtime int64 `json:"block_mtime"`
402 // TrashHandler processes /trash requests.
403 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
404 // Reject unauthorized requests.
405 if !IsDataManagerToken(GetApiToken(req)) {
406 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
410 // Parse the request body.
411 var trash []TrashRequest
412 r := json.NewDecoder(req.Body)
413 if err := r.Decode(&trash); err != nil {
414 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
418 // We have a properly formatted trash list sent from the data
419 // manager. Report success and send the list to the trash work
420 // queue for further handling.
421 resp.WriteHeader(http.StatusOK)
423 fmt.Sprintf("Received %d trash requests\n", len(trash))))
426 for _, t := range trash {
429 trashq.ReplaceQueue(tlist)
432 // ==============================
433 // GetBlock and PutBlock implement lower-level code for handling
434 // blocks by rooting through volumes connected to the local machine.
435 // Once the handler has determined that system policy permits the
436 // request, it calls these methods to perform the actual operation.
438 // TODO(twp): this code would probably be better located in the
439 // VolumeManager interface. As an abstraction, the VolumeManager
440 // should be the only part of the code that cares about which volume a
441 // block is stored on, so it should be responsible for figuring out
442 // which volume to check for fetching blocks, storing blocks, etc.
443 // ==============================
445 // GetBlock fetches and returns the block identified by "hash".
447 // On success, GetBlock returns a byte slice with the block data, and
450 // If the block cannot be found on any volume, returns NotFoundError.
452 // If the block found does not have the correct MD5 hash, returns
455 func GetBlock(hash string) ([]byte, error) {
456 // Attempt to read the requested hash from a keep volume.
457 errorToCaller := NotFoundError
459 for _, vol := range KeepVM.AllReadable() {
460 buf, err := vol.Get(hash)
462 // IsNotExist is an expected error and may be
463 // ignored. All other errors are logged. In
464 // any case we continue trying to read other
465 // volumes. If all volumes report IsNotExist,
466 // we return a NotFoundError.
467 if !os.IsNotExist(err) {
468 log.Printf("%s: Get(%s): %s", vol, hash, err)
472 // Check the file checksum.
474 filehash := fmt.Sprintf("%x", md5.Sum(buf))
475 if filehash != hash {
476 // TODO: Try harder to tell a sysadmin about
478 log.Printf("%s: checksum mismatch for request %s (actual %s)",
480 errorToCaller = DiskHashError
484 if errorToCaller == DiskHashError {
485 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
490 return nil, errorToCaller
493 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
495 // PutBlock(block, hash)
496 // Stores the BLOCK (identified by the content id HASH) in Keep.
498 // The MD5 checksum of the block must be identical to the content id HASH.
499 // If not, an error is returned.
501 // PutBlock stores the BLOCK on the first Keep volume with free space.
502 // A failure code is returned to the user only if all volumes fail.
504 // On success, PutBlock returns nil.
505 // On failure, it returns a KeepError with one of the following codes:
508 // A different block with the same hash already exists on this
511 // The MD5 hash of the BLOCK does not match the argument HASH.
513 // There was not enough space left in any Keep volume to store
516 // The object could not be stored for some other reason (e.g.
517 // all writes failed). The text of the error message should
518 // provide as much detail as possible.
520 func PutBlock(block []byte, hash string) error {
521 // Check that BLOCK's checksum matches HASH.
522 blockhash := fmt.Sprintf("%x", md5.Sum(block))
523 if blockhash != hash {
524 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
525 return RequestHashError
528 // If we already have this data, it's intact on disk, and we
529 // can update its timestamp, return success. If we have
530 // different data with the same hash, return failure.
531 if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
535 // Choose a Keep volume to write to.
536 // If this volume fails, try all of the volumes in order.
537 if vol := KeepVM.NextWritable(); vol != nil {
538 if err := vol.Put(hash, block); err == nil {
539 return nil // success!
543 writables := KeepVM.AllWritable()
544 if len(writables) == 0 {
545 log.Print("No writable volumes.")
550 for _, vol := range writables {
551 err := vol.Put(hash, block)
553 return nil // success!
555 if err != FullError {
556 // The volume is not full but the
557 // write did not succeed. Report the
558 // error and continue trying.
560 log.Printf("%s: Write(%s): %s", vol, hash, err)
565 log.Print("All volumes are full.")
568 // Already logged the non-full errors.
572 // CompareAndTouch returns nil if one of the volumes already has the
573 // given content and it successfully updates the relevant block's
574 // modification time in order to protect it from premature garbage
576 func CompareAndTouch(hash string, buf []byte) error {
577 var bestErr error = NotFoundError
578 for _, vol := range KeepVM.AllWritable() {
579 if err := vol.Compare(hash, buf); err == CollisionError {
580 // Stop if we have a block with same hash but
581 // different content. (It will be impossible
582 // to tell which one is wanted if we have
583 // both, so there's no point writing it even
584 // on a different volume.)
585 log.Printf("%s: Compare(%s): %s", vol, hash, err)
587 } else if os.IsNotExist(err) {
588 // Block does not exist. This is the only
589 // "normal" error: we don't log anything.
591 } else if err != nil {
592 // Couldn't open file, data is corrupt on
593 // disk, etc.: log this abnormal condition,
594 // and try the next volume.
595 log.Printf("%s: Compare(%s): %s", vol, hash, err)
598 if err := vol.Touch(hash); err != nil {
599 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
603 // Compare and Touch both worked --> done.
609 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
611 // IsValidLocator returns true if the specified string is a valid Keep locator.
612 // When Keep is extended to support hash types other than MD5,
613 // this should be updated to cover those as well.
615 func IsValidLocator(loc string) bool {
616 return validLocatorRe.MatchString(loc)
619 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
621 // GetApiToken returns the OAuth2 token from the Authorization
622 // header of a HTTP request, or an empty string if no matching
624 func GetApiToken(req *http.Request) string {
625 if auth, ok := req.Header["Authorization"]; ok {
626 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
633 // IsExpired returns true if the given Unix timestamp (expressed as a
634 // hexadecimal string) is in the past, or if timestampHex cannot be
635 // parsed as a hexadecimal string.
636 func IsExpired(timestampHex string) bool {
637 ts, err := strconv.ParseInt(timestampHex, 16, 0)
639 log.Printf("IsExpired: %s", err)
642 return time.Unix(ts, 0).Before(time.Now())
645 // CanDelete returns true if the user identified by apiToken is
646 // allowed to delete blocks.
647 func CanDelete(apiToken string) bool {
651 // Blocks may be deleted only when Keep has been configured with a
653 if IsDataManagerToken(apiToken) {
656 // TODO(twp): look up apiToken with the API server
657 // return true if is_admin is true and if the token
658 // has unlimited scope
662 // IsDataManagerToken returns true if apiToken represents the data
664 func IsDataManagerToken(apiToken string) bool {
665 return dataManagerToken != "" && apiToken == dataManagerToken