3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
16 "github.com/gorilla/mux"
27 // MakeRESTRouter returns a new mux.Router that forwards all Keep
28 // requests to the appropriate handlers.
30 func MakeRESTRouter() *mux.Router {
31 rest := mux.NewRouter()
34 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
36 `/{hash:[0-9a-f]{32}}+{hints}`,
37 GetBlockHandler).Methods("GET", "HEAD")
39 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
40 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
41 // List all blocks stored here. Privileged client only.
42 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
43 // List blocks stored here whose hash has the given prefix.
44 // Privileged client only.
45 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
47 // List volumes: path, device number, bytes used/avail.
48 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
50 // Replace the current pull queue.
51 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
53 // Replace the current trash queue.
54 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
56 // Any request which does not match any of these routes gets
58 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
63 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
64 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
67 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
68 if enforce_permissions {
69 locator := req.URL.Path[1:] // strip leading slash
70 if err := VerifySignature(locator, GetApiToken(req)); err != nil {
71 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
76 block, err := GetBlock(mux.Vars(req)["hash"], false)
78 // This type assertion is safe because the only errors
79 // GetBlock can return are DiskHashError or NotFoundError.
80 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
85 resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
86 resp.Header().Set("Content-Type", "application/octet-stream")
90 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
91 hash := mux.Vars(req)["hash"]
93 // Detect as many error conditions as possible before reading
94 // the body: avoid transmitting data that will not end up
95 // being written anyway.
97 if req.ContentLength == -1 {
98 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
102 if req.ContentLength > BLOCKSIZE {
103 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
107 if len(KeepVM.AllWritable()) == 0 {
108 http.Error(resp, FullError.Error(), FullError.HTTPCode)
112 buf := bufs.Get(int(req.ContentLength))
113 _, err := io.ReadFull(req.Body, buf)
115 http.Error(resp, err.Error(), 500)
120 err = PutBlock(buf, hash)
124 ke := err.(*KeepError)
125 http.Error(resp, ke.Error(), ke.HTTPCode)
129 // Success; add a size hint, sign the locator if possible, and
130 // return it to the client.
131 return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
132 api_token := GetApiToken(req)
133 if PermissionSecret != nil && api_token != "" {
134 expiry := time.Now().Add(blob_signature_ttl)
135 return_hash = SignLocator(return_hash, api_token, expiry)
137 resp.Write([]byte(return_hash + "\n"))
141 // A HandleFunc to address /index and /index/{prefix} requests.
143 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
144 // Reject unauthorized requests.
145 if !IsDataManagerToken(GetApiToken(req)) {
146 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
150 prefix := mux.Vars(req)["prefix"]
152 for _, vol := range KeepVM.AllReadable() {
153 if err := vol.IndexTo(prefix, resp); err != nil {
154 // The only errors returned by IndexTo are
155 // write errors returned by resp.Write(),
156 // which probably means the client has
157 // disconnected and this error will never be
158 // reported to the client -- but it will
159 // appear in our own error log.
160 http.Error(resp, err.Error(), http.StatusInternalServerError)
164 // An empty line at EOF is the only way the client can be
165 // assured the entire index was received.
166 resp.Write([]byte{'\n'})
170 // Responds to /status.json requests with the current node status,
171 // described in a JSON structure.
173 // The data given in a status.json response includes:
174 // volumes - a list of Keep volumes currently in use by this server
175 // each volume is an object with the following fields:
177 // * device_num (an integer identifying the underlying filesystem)
181 type VolumeStatus struct {
182 MountPoint string `json:"mount_point"`
183 DeviceNum uint64 `json:"device_num"`
184 BytesFree uint64 `json:"bytes_free"`
185 BytesUsed uint64 `json:"bytes_used"`
188 type NodeStatus struct {
189 Volumes []*VolumeStatus `json:"volumes"`
192 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
193 st := GetNodeStatus()
194 if jstat, err := json.Marshal(st); err == nil {
197 log.Printf("json.Marshal: %s\n", err)
198 log.Printf("NodeStatus = %v\n", st)
199 http.Error(resp, err.Error(), 500)
204 // Returns a NodeStatus struct describing this Keep
205 // node's current status.
207 func GetNodeStatus() *NodeStatus {
208 st := new(NodeStatus)
210 st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
211 for i, vol := range KeepVM.AllReadable() {
212 st.Volumes[i] = vol.Status()
218 // Returns a VolumeStatus describing the requested volume.
220 func GetVolumeStatus(volume string) *VolumeStatus {
221 var fs syscall.Statfs_t
224 if fi, err := os.Stat(volume); err == nil {
225 devnum = fi.Sys().(*syscall.Stat_t).Dev
227 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
231 err := syscall.Statfs(volume, &fs)
233 log.Printf("GetVolumeStatus: statfs: %s\n", err)
236 // These calculations match the way df calculates disk usage:
237 // "free" space is measured by fs.Bavail, but "used" space
238 // uses fs.Blocks - fs.Bfree.
239 free := fs.Bavail * uint64(fs.Bsize)
240 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
241 return &VolumeStatus{volume, devnum, free, used}
244 // DeleteHandler processes DELETE requests.
246 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
247 // from all connected volumes.
249 // Only the Data Manager, or an Arvados admin with scope "all", are
250 // allowed to issue DELETE requests. If a DELETE request is not
251 // authenticated or is issued by a non-admin user, the server returns
252 // a PermissionError.
254 // Upon receiving a valid request from an authorized user,
255 // DeleteHandler deletes all copies of the specified block on local
260 // If the requested blocks was not found on any volume, the response
261 // code is HTTP 404 Not Found.
263 // Otherwise, the response code is 200 OK, with a response body
264 // consisting of the JSON message
266 // {"copies_deleted":d,"copies_failed":f}
268 // where d and f are integers representing the number of blocks that
269 // were successfully and unsuccessfully deleted.
271 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
272 hash := mux.Vars(req)["hash"]
274 // Confirm that this user is an admin and has a token with unlimited scope.
275 var tok = GetApiToken(req)
276 if tok == "" || !CanDelete(tok) {
277 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
282 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
286 // Delete copies of this block from all available volumes.
287 // Report how many blocks were successfully deleted, and how
288 // many were found on writable volumes but not deleted.
290 Deleted int `json:"copies_deleted"`
291 Failed int `json:"copies_failed"`
293 for _, vol := range KeepVM.AllWritable() {
294 if err := vol.Delete(hash); err == nil {
296 } else if os.IsNotExist(err) {
300 log.Println("DeleteHandler:", err)
306 if result.Deleted == 0 && result.Failed == 0 {
307 st = http.StatusNotFound
314 if st == http.StatusOK {
315 if body, err := json.Marshal(result); err == nil {
318 log.Printf("json.Marshal: %s (result = %v)\n", err, result)
319 http.Error(resp, err.Error(), 500)
324 /* PullHandler processes "PUT /pull" requests for the data manager.
325 The request body is a JSON message containing a list of pull
326 requests in the following format:
330 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
332 "keep0.qr1hi.arvadosapi.com:25107",
333 "keep1.qr1hi.arvadosapi.com:25108"
337 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
347 Each pull request in the list consists of a block locator string
348 and an ordered list of servers. Keepstore should try to fetch the
349 block from each server in turn.
351 If the request has not been sent by the Data Manager, return 401
354 If the JSON unmarshalling fails, return 400 Bad Request.
357 type PullRequest struct {
358 Locator string `json:"locator"`
359 Servers []string `json:"servers"`
362 func PullHandler(resp http.ResponseWriter, req *http.Request) {
363 // Reject unauthorized requests.
364 if !IsDataManagerToken(GetApiToken(req)) {
365 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
369 // Parse the request body.
371 r := json.NewDecoder(req.Body)
372 if err := r.Decode(&pr); err != nil {
373 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
377 // We have a properly formatted pull list sent from the data
378 // manager. Report success and send the list to the pull list
379 // manager for further handling.
380 resp.WriteHeader(http.StatusOK)
382 fmt.Sprintf("Received %d pull requests\n", len(pr))))
385 for _, p := range pr {
388 pullq.ReplaceQueue(plist)
391 type TrashRequest struct {
392 Locator string `json:"locator"`
393 BlockMtime int64 `json:"block_mtime"`
396 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
397 // Reject unauthorized requests.
398 if !IsDataManagerToken(GetApiToken(req)) {
399 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
403 // Parse the request body.
404 var trash []TrashRequest
405 r := json.NewDecoder(req.Body)
406 if err := r.Decode(&trash); err != nil {
407 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
411 // We have a properly formatted trash list sent from the data
412 // manager. Report success and send the list to the trash work
413 // queue for further handling.
414 resp.WriteHeader(http.StatusOK)
416 fmt.Sprintf("Received %d trash requests\n", len(trash))))
419 for _, t := range trash {
422 trashq.ReplaceQueue(tlist)
425 // ==============================
426 // GetBlock and PutBlock implement lower-level code for handling
427 // blocks by rooting through volumes connected to the local machine.
428 // Once the handler has determined that system policy permits the
429 // request, it calls these methods to perform the actual operation.
431 // TODO(twp): this code would probably be better located in the
432 // VolumeManager interface. As an abstraction, the VolumeManager
433 // should be the only part of the code that cares about which volume a
434 // block is stored on, so it should be responsible for figuring out
435 // which volume to check for fetching blocks, storing blocks, etc.
437 // ==============================
438 // GetBlock fetches and returns the block identified by "hash". If
439 // the update_timestamp argument is true, GetBlock also updates the
440 // block's file modification time (for the sake of PutBlock, which
441 // must update the file's timestamp when the block already exists).
443 // On success, GetBlock returns a byte slice with the block data, and
446 // If the block cannot be found on any volume, returns NotFoundError.
448 // If the block found does not have the correct MD5 hash, returns
452 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
453 // Attempt to read the requested hash from a keep volume.
454 error_to_caller := NotFoundError
457 if update_timestamp {
458 // Pointless to find the block on an unwritable volume
459 // because Touch() will fail -- this is as good as
460 // "not found" for purposes of callers who need to
462 vols = KeepVM.AllWritable()
464 vols = KeepVM.AllReadable()
467 for _, vol := range vols {
468 buf, err := vol.Get(hash)
470 // IsNotExist is an expected error and may be
471 // ignored. All other errors are logged. In
472 // any case we continue trying to read other
473 // volumes. If all volumes report IsNotExist,
474 // we return a NotFoundError.
475 if !os.IsNotExist(err) {
476 log.Printf("GetBlock: reading %s: %s\n", hash, err)
480 // Check the file checksum.
482 filehash := fmt.Sprintf("%x", md5.Sum(buf))
483 if filehash != hash {
484 // TODO: Try harder to tell a sysadmin about
486 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
488 error_to_caller = DiskHashError
492 if error_to_caller == DiskHashError {
493 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
496 if update_timestamp {
497 if err := vol.Touch(hash); err != nil {
498 error_to_caller = GenericError
499 log.Printf("%s: Touch %s failed: %s",
500 vol, hash, error_to_caller)
507 return nil, error_to_caller
510 /* PutBlock(block, hash)
511 Stores the BLOCK (identified by the content id HASH) in Keep.
513 The MD5 checksum of the block must be identical to the content id HASH.
514 If not, an error is returned.
516 PutBlock stores the BLOCK on the first Keep volume with free space.
517 A failure code is returned to the user only if all volumes fail.
519 On success, PutBlock returns nil.
520 On failure, it returns a KeepError with one of the following codes:
523 A different block with the same hash already exists on this
526 The MD5 hash of the BLOCK does not match the argument HASH.
528 There was not enough space left in any Keep volume to store
531 The object could not be stored for some other reason (e.g.
532 all writes failed). The text of the error message should
533 provide as much detail as possible.
536 func PutBlock(block []byte, hash string) error {
537 // Check that BLOCK's checksum matches HASH.
538 blockhash := fmt.Sprintf("%x", md5.Sum(block))
539 if blockhash != hash {
540 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
541 return RequestHashError
544 // If we already have a block on disk under this identifier, return
545 // success (but check for MD5 collisions). While fetching the block,
546 // update its timestamp.
547 // The only errors that GetBlock can return are DiskHashError and NotFoundError.
548 // In either case, we want to write our new (good) block to disk,
549 // so there is nothing special to do if err != nil.
551 if oldblock, err := GetBlock(hash, true); err == nil {
552 defer bufs.Put(oldblock)
553 if bytes.Compare(block, oldblock) == 0 {
554 // The block already exists; return success.
557 return CollisionError
561 // Choose a Keep volume to write to.
562 // If this volume fails, try all of the volumes in order.
563 if vol := KeepVM.NextWritable(); vol != nil {
564 if err := vol.Put(hash, block); err == nil {
565 return nil // success!
569 writables := KeepVM.AllWritable()
570 if len(writables) == 0 {
571 log.Print("No writable volumes.")
576 for _, vol := range writables {
577 err := vol.Put(hash, block)
579 return nil // success!
581 if err != FullError {
582 // The volume is not full but the
583 // write did not succeed. Report the
584 // error and continue trying.
586 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
591 log.Print("All volumes are full.")
594 // Already logged the non-full errors.
599 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
602 // Return true if the specified string is a valid Keep locator.
603 // When Keep is extended to support hash types other than MD5,
604 // this should be updated to cover those as well.
606 func IsValidLocator(loc string) bool {
607 return validLocatorRe.MatchString(loc)
610 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
612 // GetApiToken returns the OAuth2 token from the Authorization
613 // header of a HTTP request, or an empty string if no matching
615 func GetApiToken(req *http.Request) string {
616 if auth, ok := req.Header["Authorization"]; ok {
617 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
624 // IsExpired returns true if the given Unix timestamp (expressed as a
625 // hexadecimal string) is in the past, or if timestamp_hex cannot be
626 // parsed as a hexadecimal string.
627 func IsExpired(timestamp_hex string) bool {
628 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
630 log.Printf("IsExpired: %s\n", err)
633 return time.Unix(ts, 0).Before(time.Now())
636 // CanDelete returns true if the user identified by api_token is
637 // allowed to delete blocks.
638 func CanDelete(api_token string) bool {
642 // Blocks may be deleted only when Keep has been configured with a
644 if IsDataManagerToken(api_token) {
647 // TODO(twp): look up api_token with the API server
648 // return true if is_admin is true and if the token
649 // has unlimited scope
653 // IsDataManagerToken returns true if api_token represents the data
655 func IsDataManagerToken(api_token string) bool {
656 return data_manager_token != "" && api_token == data_manager_token