3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
16 "github.com/gorilla/mux"
28 // MakeRESTRouter returns a new mux.Router that forwards all Keep
29 // requests to the appropriate handlers.
31 func MakeRESTRouter() *mux.Router {
32 rest := mux.NewRouter()
35 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37 `/{hash:[0-9a-f]{32}}+{hints}`,
38 GetBlockHandler).Methods("GET", "HEAD")
40 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
41 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
42 // List all blocks stored here. Privileged client only.
43 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
44 // List blocks stored here whose hash has the given prefix.
45 // Privileged client only.
46 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
48 // List volumes: path, device number, bytes used/avail.
49 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
51 // Replace the current pull queue.
52 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
54 // Replace the current trash queue.
55 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
57 // Any request which does not match any of these routes gets
59 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
64 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
65 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
68 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
69 if enforce_permissions {
70 locator := req.URL.Path[1:] // strip leading slash
71 if err := VerifySignature(locator, GetApiToken(req)); err != nil {
72 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
77 block, err := GetBlock(mux.Vars(req)["hash"], false)
79 // This type assertion is safe because the only errors
80 // GetBlock can return are DiskHashError or NotFoundError.
81 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
86 resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
87 resp.Header().Set("Content-Type", "application/octet-stream")
91 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
92 hash := mux.Vars(req)["hash"]
94 // Detect as many error conditions as possible before reading
95 // the body: avoid transmitting data that will not end up
96 // being written anyway.
98 if req.ContentLength == -1 {
99 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
103 if req.ContentLength > BLOCKSIZE {
104 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
108 if len(KeepVM.AllWritable()) == 0 {
109 http.Error(resp, FullError.Error(), FullError.HTTPCode)
113 buf := bufs.Get(int(req.ContentLength))
114 _, err := io.ReadFull(req.Body, buf)
116 http.Error(resp, err.Error(), 500)
121 err = PutBlock(buf, hash)
125 ke := err.(*KeepError)
126 http.Error(resp, ke.Error(), ke.HTTPCode)
130 // Success; add a size hint, sign the locator if possible, and
131 // return it to the client.
132 return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
133 api_token := GetApiToken(req)
134 if PermissionSecret != nil && api_token != "" {
135 expiry := time.Now().Add(blob_signature_ttl)
136 return_hash = SignLocator(return_hash, api_token, expiry)
138 resp.Write([]byte(return_hash + "\n"))
142 // A HandleFunc to address /index and /index/{prefix} requests.
144 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
145 // Reject unauthorized requests.
146 if !IsDataManagerToken(GetApiToken(req)) {
147 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
151 prefix := mux.Vars(req)["prefix"]
153 for _, vol := range KeepVM.AllReadable() {
154 if err := vol.IndexTo(prefix, resp); err != nil {
155 // The only errors returned by IndexTo are
156 // write errors returned by resp.Write(),
157 // which probably means the client has
158 // disconnected and this error will never be
159 // reported to the client -- but it will
160 // appear in our own error log.
161 http.Error(resp, err.Error(), http.StatusInternalServerError)
165 // An empty line at EOF is the only way the client can be
166 // assured the entire index was received.
167 resp.Write([]byte{'\n'})
171 // Responds to /status.json requests with the current node status,
172 // described in a JSON structure.
174 // The data given in a status.json response includes:
175 // volumes - a list of Keep volumes currently in use by this server
176 // each volume is an object with the following fields:
178 // * device_num (an integer identifying the underlying filesystem)
182 type VolumeStatus struct {
183 MountPoint string `json:"mount_point"`
184 DeviceNum uint64 `json:"device_num"`
185 BytesFree uint64 `json:"bytes_free"`
186 BytesUsed uint64 `json:"bytes_used"`
189 type PoolStatus struct {
190 Alloc uint64 `json:"BytesAllocated"`
191 Cap int `json:"BuffersMax"`
192 Len int `json:"BuffersInUse"`
195 type NodeStatus struct {
196 Volumes []*VolumeStatus `json:"volumes"`
197 BufferPool PoolStatus
198 PullQueue WorkQueueStatus
199 TrashQueue WorkQueueStatus
200 Memory runtime.MemStats
204 var stLock sync.Mutex
206 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
209 jstat, err := json.Marshal(&st)
214 log.Printf("json.Marshal: %s\n", err)
215 log.Printf("NodeStatus = %v\n", &st)
216 http.Error(resp, err.Error(), 500)
220 // populate the given NodeStatus struct with current values.
221 func readNodeStatus(st *NodeStatus) {
222 vols := KeepVM.AllReadable()
223 if cap(st.Volumes) < len(vols) {
224 st.Volumes = make([]*VolumeStatus, len(vols))
226 st.Volumes = st.Volumes[:0]
227 for _, vol := range vols {
228 if s := vol.Status(); s != nil {
229 st.Volumes = append(st.Volumes, s)
232 st.BufferPool.Alloc = bufs.Alloc()
233 st.BufferPool.Cap = bufs.Cap()
234 st.BufferPool.Len = bufs.Len()
235 st.PullQueue = getWorkQueueStatus(pullq)
236 st.TrashQueue = getWorkQueueStatus(trashq)
237 runtime.ReadMemStats(&st.Memory)
240 // return a WorkQueueStatus for the given queue. If q is nil (which
241 // should never happen except in test suites), return a zero status
242 // value instead of crashing.
243 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
245 // This should only happen during tests.
246 return WorkQueueStatus{}
251 // DeleteHandler processes DELETE requests.
253 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
254 // from all connected volumes.
256 // Only the Data Manager, or an Arvados admin with scope "all", are
257 // allowed to issue DELETE requests. If a DELETE request is not
258 // authenticated or is issued by a non-admin user, the server returns
259 // a PermissionError.
261 // Upon receiving a valid request from an authorized user,
262 // DeleteHandler deletes all copies of the specified block on local
267 // If the requested blocks was not found on any volume, the response
268 // code is HTTP 404 Not Found.
270 // Otherwise, the response code is 200 OK, with a response body
271 // consisting of the JSON message
273 // {"copies_deleted":d,"copies_failed":f}
275 // where d and f are integers representing the number of blocks that
276 // were successfully and unsuccessfully deleted.
278 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
279 hash := mux.Vars(req)["hash"]
281 // Confirm that this user is an admin and has a token with unlimited scope.
282 var tok = GetApiToken(req)
283 if tok == "" || !CanDelete(tok) {
284 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
289 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
293 // Delete copies of this block from all available volumes.
294 // Report how many blocks were successfully deleted, and how
295 // many were found on writable volumes but not deleted.
297 Deleted int `json:"copies_deleted"`
298 Failed int `json:"copies_failed"`
300 for _, vol := range KeepVM.AllWritable() {
301 if err := vol.Delete(hash); err == nil {
303 } else if os.IsNotExist(err) {
307 log.Println("DeleteHandler:", err)
313 if result.Deleted == 0 && result.Failed == 0 {
314 st = http.StatusNotFound
321 if st == http.StatusOK {
322 if body, err := json.Marshal(result); err == nil {
325 log.Printf("json.Marshal: %s (result = %v)\n", err, result)
326 http.Error(resp, err.Error(), 500)
331 /* PullHandler processes "PUT /pull" requests for the data manager.
332 The request body is a JSON message containing a list of pull
333 requests in the following format:
337 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
339 "keep0.qr1hi.arvadosapi.com:25107",
340 "keep1.qr1hi.arvadosapi.com:25108"
344 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
354 Each pull request in the list consists of a block locator string
355 and an ordered list of servers. Keepstore should try to fetch the
356 block from each server in turn.
358 If the request has not been sent by the Data Manager, return 401
361 If the JSON unmarshalling fails, return 400 Bad Request.
364 type PullRequest struct {
365 Locator string `json:"locator"`
366 Servers []string `json:"servers"`
369 func PullHandler(resp http.ResponseWriter, req *http.Request) {
370 // Reject unauthorized requests.
371 if !IsDataManagerToken(GetApiToken(req)) {
372 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
376 // Parse the request body.
378 r := json.NewDecoder(req.Body)
379 if err := r.Decode(&pr); err != nil {
380 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
384 // We have a properly formatted pull list sent from the data
385 // manager. Report success and send the list to the pull list
386 // manager for further handling.
387 resp.WriteHeader(http.StatusOK)
389 fmt.Sprintf("Received %d pull requests\n", len(pr))))
392 for _, p := range pr {
395 pullq.ReplaceQueue(plist)
398 type TrashRequest struct {
399 Locator string `json:"locator"`
400 BlockMtime int64 `json:"block_mtime"`
403 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
404 // Reject unauthorized requests.
405 if !IsDataManagerToken(GetApiToken(req)) {
406 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
410 // Parse the request body.
411 var trash []TrashRequest
412 r := json.NewDecoder(req.Body)
413 if err := r.Decode(&trash); err != nil {
414 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
418 // We have a properly formatted trash list sent from the data
419 // manager. Report success and send the list to the trash work
420 // queue for further handling.
421 resp.WriteHeader(http.StatusOK)
423 fmt.Sprintf("Received %d trash requests\n", len(trash))))
426 for _, t := range trash {
429 trashq.ReplaceQueue(tlist)
432 // ==============================
433 // GetBlock and PutBlock implement lower-level code for handling
434 // blocks by rooting through volumes connected to the local machine.
435 // Once the handler has determined that system policy permits the
436 // request, it calls these methods to perform the actual operation.
438 // TODO(twp): this code would probably be better located in the
439 // VolumeManager interface. As an abstraction, the VolumeManager
440 // should be the only part of the code that cares about which volume a
441 // block is stored on, so it should be responsible for figuring out
442 // which volume to check for fetching blocks, storing blocks, etc.
444 // ==============================
445 // GetBlock fetches and returns the block identified by "hash". If
446 // the update_timestamp argument is true, GetBlock also updates the
447 // block's file modification time (for the sake of PutBlock, which
448 // must update the file's timestamp when the block already exists).
450 // On success, GetBlock returns a byte slice with the block data, and
453 // If the block cannot be found on any volume, returns NotFoundError.
455 // If the block found does not have the correct MD5 hash, returns
459 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
460 // Attempt to read the requested hash from a keep volume.
461 error_to_caller := NotFoundError
464 if update_timestamp {
465 // Pointless to find the block on an unwritable volume
466 // because Touch() will fail -- this is as good as
467 // "not found" for purposes of callers who need to
469 vols = KeepVM.AllWritable()
471 vols = KeepVM.AllReadable()
474 for _, vol := range vols {
475 buf, err := vol.Get(hash)
477 // IsNotExist is an expected error and may be
478 // ignored. All other errors are logged. In
479 // any case we continue trying to read other
480 // volumes. If all volumes report IsNotExist,
481 // we return a NotFoundError.
482 if !os.IsNotExist(err) {
483 log.Printf("GetBlock: reading %s: %s\n", hash, err)
487 // Check the file checksum.
489 filehash := fmt.Sprintf("%x", md5.Sum(buf))
490 if filehash != hash {
491 // TODO: Try harder to tell a sysadmin about
493 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
495 error_to_caller = DiskHashError
499 if error_to_caller == DiskHashError {
500 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
503 if update_timestamp {
504 if err := vol.Touch(hash); err != nil {
505 error_to_caller = GenericError
506 log.Printf("%s: Touch %s failed: %s",
507 vol, hash, error_to_caller)
514 return nil, error_to_caller
517 /* PutBlock(block, hash)
518 Stores the BLOCK (identified by the content id HASH) in Keep.
520 The MD5 checksum of the block must be identical to the content id HASH.
521 If not, an error is returned.
523 PutBlock stores the BLOCK on the first Keep volume with free space.
524 A failure code is returned to the user only if all volumes fail.
526 On success, PutBlock returns nil.
527 On failure, it returns a KeepError with one of the following codes:
530 A different block with the same hash already exists on this
533 The MD5 hash of the BLOCK does not match the argument HASH.
535 There was not enough space left in any Keep volume to store
538 The object could not be stored for some other reason (e.g.
539 all writes failed). The text of the error message should
540 provide as much detail as possible.
543 func PutBlock(block []byte, hash string) error {
544 // Check that BLOCK's checksum matches HASH.
545 blockhash := fmt.Sprintf("%x", md5.Sum(block))
546 if blockhash != hash {
547 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
548 return RequestHashError
551 // If we already have a block on disk under this identifier, return
552 // success (but check for MD5 collisions). While fetching the block,
553 // update its timestamp.
554 // The only errors that GetBlock can return are DiskHashError and NotFoundError.
555 // In either case, we want to write our new (good) block to disk,
556 // so there is nothing special to do if err != nil.
558 if oldblock, err := GetBlock(hash, true); err == nil {
559 defer bufs.Put(oldblock)
560 if bytes.Compare(block, oldblock) == 0 {
561 // The block already exists; return success.
564 return CollisionError
568 // Choose a Keep volume to write to.
569 // If this volume fails, try all of the volumes in order.
570 if vol := KeepVM.NextWritable(); vol != nil {
571 if err := vol.Put(hash, block); err == nil {
572 return nil // success!
576 writables := KeepVM.AllWritable()
577 if len(writables) == 0 {
578 log.Print("No writable volumes.")
583 for _, vol := range writables {
584 err := vol.Put(hash, block)
586 return nil // success!
588 if err != FullError {
589 // The volume is not full but the
590 // write did not succeed. Report the
591 // error and continue trying.
593 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
598 log.Print("All volumes are full.")
601 // Already logged the non-full errors.
606 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
609 // Return true if the specified string is a valid Keep locator.
610 // When Keep is extended to support hash types other than MD5,
611 // this should be updated to cover those as well.
613 func IsValidLocator(loc string) bool {
614 return validLocatorRe.MatchString(loc)
617 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
619 // GetApiToken returns the OAuth2 token from the Authorization
620 // header of a HTTP request, or an empty string if no matching
622 func GetApiToken(req *http.Request) string {
623 if auth, ok := req.Header["Authorization"]; ok {
624 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
631 // IsExpired returns true if the given Unix timestamp (expressed as a
632 // hexadecimal string) is in the past, or if timestamp_hex cannot be
633 // parsed as a hexadecimal string.
634 func IsExpired(timestamp_hex string) bool {
635 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
637 log.Printf("IsExpired: %s\n", err)
640 return time.Unix(ts, 0).Before(time.Now())
643 // CanDelete returns true if the user identified by api_token is
644 // allowed to delete blocks.
645 func CanDelete(api_token string) bool {
649 // Blocks may be deleted only when Keep has been configured with a
651 if IsDataManagerToken(api_token) {
654 // TODO(twp): look up api_token with the API server
655 // return true if is_admin is true and if the token
656 // has unlimited scope
660 // IsDataManagerToken returns true if api_token represents the data
662 func IsDataManagerToken(api_token string) bool {
663 return data_manager_token != "" && api_token == data_manager_token