3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
16 "git.curoverse.com/arvados.git/services/keepstore/pull_list"
17 "github.com/gorilla/mux"
30 // MakeRESTRouter returns a new mux.Router that forwards all Keep
31 // requests to the appropriate handlers.
33 func MakeRESTRouter() *mux.Router {
34 rest := mux.NewRouter()
37 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
39 `/{hash:[0-9a-f]{32}}+{hints}`,
40 GetBlockHandler).Methods("GET", "HEAD")
42 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
43 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
45 // For IndexHandler we support:
46 // /index - returns all locators
47 // /index/{prefix} - returns all locators that begin with {prefix}
48 // {prefix} is a string of hexadecimal digits between 0 and 32 digits.
49 // If {prefix} is the empty string, return an index of all locators
50 // (so /index and /index/ behave identically)
51 // A client may supply a full 32-digit locator string, in which
52 // case the server will return an index with either zero or one
53 // entries. This usage allows a client to check whether a block is
54 // present, and its size and upload time, without retrieving the
57 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
59 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
60 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
62 // The PullHandler processes "PUT /pull" commands from Data Manager.
63 // It parses the JSON list of pull requests in the request body, and
64 // delivers them to the pull list manager for replication.
65 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
67 // Any request which does not match any of these routes gets
69 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
74 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
75 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
78 // FindKeepVolumes scans all mounted volumes on the system for Keep
79 // volumes, and returns a list of matching paths.
81 // A device is assumed to be a Keep volume if it is a normal or tmpfs
82 // volume and has a "/keep" directory directly underneath the mount
85 func FindKeepVolumes() []string {
86 vols := make([]string, 0)
88 if f, err := os.Open(PROC_MOUNTS); err != nil {
89 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
91 scanner := bufio.NewScanner(f)
93 args := strings.Fields(scanner.Text())
94 dev, mount := args[0], args[1]
96 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
97 keep := mount + "/keep"
98 if st, err := os.Stat(keep); err == nil && st.IsDir() {
99 vols = append(vols, keep)
103 if err := scanner.Err(); err != nil {
110 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
111 hash := mux.Vars(req)["hash"]
113 log.Printf("%s %s", req.Method, hash)
115 hints := mux.Vars(req)["hints"]
117 // Parse the locator string and hints from the request.
118 // TODO(twp): implement a Locator type.
119 var signature, timestamp string
121 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
122 for _, hint := range strings.Split(hints, "+") {
123 if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
124 // Server ignores size hints
125 } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
128 } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
129 // Any unknown hint that starts with an uppercase letter is
130 // presumed to be valid and ignored, to permit forward compatibility.
132 // Unknown format; not a valid locator.
133 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
139 // If permission checking is in effect, verify this
140 // request's permission signature.
141 if enforce_permissions {
142 if signature == "" || timestamp == "" {
143 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
145 } else if IsExpired(timestamp) {
146 http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
149 req_locator := req.URL.Path[1:] // strip leading slash
150 if !VerifySignature(req_locator, GetApiToken(req)) {
151 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
157 block, err := GetBlock(hash, false)
159 // Garbage collect after each GET. Fixes #2865.
160 // TODO(twp): review Keep memory usage and see if there's
161 // a better way to do this than blindly garbage collecting
162 // after every block.
166 // This type assertion is safe because the only errors
167 // GetBlock can return are DiskHashError or NotFoundError.
168 if err == NotFoundError {
169 log.Printf("%s: not found, giving up\n", hash)
171 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
175 resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
177 _, err = resp.Write(block)
179 log.Printf("GetBlockHandler: writing response: %s", err)
185 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
186 // Garbage collect after each PUT. Fixes #2865.
187 // See also GetBlockHandler.
190 hash := mux.Vars(req)["hash"]
192 log.Printf("%s %s", req.Method, hash)
194 // Read the block data to be stored.
195 // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
197 if req.ContentLength > BLOCKSIZE {
198 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
202 buf := make([]byte, req.ContentLength)
203 nread, err := io.ReadFull(req.Body, buf)
205 http.Error(resp, err.Error(), 500)
206 } else if int64(nread) < req.ContentLength {
207 http.Error(resp, "request truncated", 500)
209 if err := PutBlock(buf, hash); err == nil {
210 // Success; add a size hint, sign the locator if
211 // possible, and return it to the client.
212 return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
213 api_token := GetApiToken(req)
214 if PermissionSecret != nil && api_token != "" {
215 expiry := time.Now().Add(permission_ttl)
216 return_hash = SignLocator(return_hash, api_token, expiry)
218 resp.Write([]byte(return_hash + "\n"))
220 ke := err.(*KeepError)
221 http.Error(resp, ke.Error(), ke.HTTPCode)
228 // A HandleFunc to address /index and /index/{prefix} requests.
230 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
231 prefix := mux.Vars(req)["prefix"]
233 // Only the data manager may issue /index requests,
234 // and only if enforce_permissions is enabled.
235 // All other requests return 403 Forbidden.
236 api_token := GetApiToken(req)
237 if !enforce_permissions ||
239 data_manager_token != api_token {
240 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
244 for _, vol := range KeepVM.Volumes() {
245 index = index + vol.Index(prefix)
247 resp.Write([]byte(index))
251 // Responds to /status.json requests with the current node status,
252 // described in a JSON structure.
254 // The data given in a status.json response includes:
255 // volumes - a list of Keep volumes currently in use by this server
256 // each volume is an object with the following fields:
258 // * device_num (an integer identifying the underlying filesystem)
262 type VolumeStatus struct {
263 MountPoint string `json:"mount_point"`
264 DeviceNum uint64 `json:"device_num"`
265 BytesFree uint64 `json:"bytes_free"`
266 BytesUsed uint64 `json:"bytes_used"`
269 type NodeStatus struct {
270 Volumes []*VolumeStatus `json:"volumes"`
273 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
274 st := GetNodeStatus()
275 if jstat, err := json.Marshal(st); err == nil {
278 log.Printf("json.Marshal: %s\n", err)
279 log.Printf("NodeStatus = %v\n", st)
280 http.Error(resp, err.Error(), 500)
285 // Returns a NodeStatus struct describing this Keep
286 // node's current status.
288 func GetNodeStatus() *NodeStatus {
289 st := new(NodeStatus)
291 st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
292 for i, vol := range KeepVM.Volumes() {
293 st.Volumes[i] = vol.Status()
299 // Returns a VolumeStatus describing the requested volume.
301 func GetVolumeStatus(volume string) *VolumeStatus {
302 var fs syscall.Statfs_t
305 if fi, err := os.Stat(volume); err == nil {
306 devnum = fi.Sys().(*syscall.Stat_t).Dev
308 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
312 err := syscall.Statfs(volume, &fs)
314 log.Printf("GetVolumeStatus: statfs: %s\n", err)
317 // These calculations match the way df calculates disk usage:
318 // "free" space is measured by fs.Bavail, but "used" space
319 // uses fs.Blocks - fs.Bfree.
320 free := fs.Bavail * uint64(fs.Bsize)
321 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
322 return &VolumeStatus{volume, devnum, free, used}
325 // DeleteHandler processes DELETE requests.
327 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
328 // from all connected volumes.
330 // Only the Data Manager, or an Arvados admin with scope "all", are
331 // allowed to issue DELETE requests. If a DELETE request is not
332 // authenticated or is issued by a non-admin user, the server returns
333 // a PermissionError.
335 // Upon receiving a valid request from an authorized user,
336 // DeleteHandler deletes all copies of the specified block on local
341 // If the requested blocks was not found on any volume, the response
342 // code is HTTP 404 Not Found.
344 // Otherwise, the response code is 200 OK, with a response body
345 // consisting of the JSON message
347 // {"copies_deleted":d,"copies_failed":f}
349 // where d and f are integers representing the number of blocks that
350 // were successfully and unsuccessfully deleted.
352 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
353 hash := mux.Vars(req)["hash"]
354 log.Printf("%s %s", req.Method, hash)
356 // Confirm that this user is an admin and has a token with unlimited scope.
357 var tok = GetApiToken(req)
358 if tok == "" || !CanDelete(tok) {
359 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
364 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
368 // Delete copies of this block from all available volumes. Report
369 // how many blocks were successfully and unsuccessfully
372 Deleted int `json:"copies_deleted"`
373 Failed int `json:"copies_failed"`
375 for _, vol := range KeepVM.Volumes() {
376 if err := vol.Delete(hash); err == nil {
378 } else if os.IsNotExist(err) {
382 log.Println("DeleteHandler:", err)
388 if result.Deleted == 0 && result.Failed == 0 {
389 st = http.StatusNotFound
396 if st == http.StatusOK {
397 if body, err := json.Marshal(result); err == nil {
400 log.Printf("json.Marshal: %s (result = %v)\n", err, result)
401 http.Error(resp, err.Error(), 500)
406 /* PullHandler processes "PUT /pull" requests for the data manager.
407 The request body is a JSON message containing a list of pull
408 requests in the following format:
412 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
414 "keep0.qr1hi.arvadosapi.com:25107",
415 "keep1.qr1hi.arvadosapi.com:25108"
419 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
429 Each pull request in the list consists of a block locator string
430 and an ordered list of servers. Keepstore should try to fetch the
431 block from each server in turn.
433 If the request has not been sent by the Data Manager, return 401
436 If the JSON unmarshalling fails, return 400 Bad Request.
439 func PullHandler(resp http.ResponseWriter, req *http.Request) {
440 // Reject unauthorized requests.
441 api_token := GetApiToken(req)
442 if !IsDataManagerToken(api_token) {
443 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
444 log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
448 // Parse the request body.
449 var plist []pull_list.PullRequest
450 r := json.NewDecoder(req.Body)
451 if err := r.Decode(&plist); err != nil {
452 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
453 log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
457 // We have a properly formatted pull list sent from the data
458 // manager. Report success and send the list to the pull list
459 // manager for further handling.
460 log.Printf("%s %s: received %v\n", req.Method, req.URL, plist)
461 resp.WriteHeader(http.StatusOK)
463 fmt.Sprintf("Received %d pull requests\n", len(plist))))
466 pullmgr = pull_list.NewManager()
468 pullmgr.SetList(plist)
471 // ==============================
472 // GetBlock and PutBlock implement lower-level code for handling
473 // blocks by rooting through volumes connected to the local machine.
474 // Once the handler has determined that system policy permits the
475 // request, it calls these methods to perform the actual operation.
477 // TODO(twp): this code would probably be better located in the
478 // VolumeManager interface. As an abstraction, the VolumeManager
479 // should be the only part of the code that cares about which volume a
480 // block is stored on, so it should be responsible for figuring out
481 // which volume to check for fetching blocks, storing blocks, etc.
483 // ==============================
484 // GetBlock fetches and returns the block identified by "hash". If
485 // the update_timestamp argument is true, GetBlock also updates the
486 // block's file modification time (for the sake of PutBlock, which
487 // must update the file's timestamp when the block already exists).
489 // On success, GetBlock returns a byte slice with the block data, and
492 // If the block cannot be found on any volume, returns NotFoundError.
494 // If the block found does not have the correct MD5 hash, returns
498 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
499 // Attempt to read the requested hash from a keep volume.
500 error_to_caller := NotFoundError
502 for _, vol := range KeepVM.Volumes() {
503 if buf, err := vol.Get(hash); err != nil {
504 // IsNotExist is an expected error and may be ignored.
505 // (If all volumes report IsNotExist, we return a NotFoundError)
506 // All other errors should be logged but we continue trying to
509 case os.IsNotExist(err):
512 log.Printf("GetBlock: reading %s: %s\n", hash, err)
515 // Double check the file checksum.
517 filehash := fmt.Sprintf("%x", md5.Sum(buf))
518 if filehash != hash {
519 // TODO(twp): this condition probably represents a bad disk and
520 // should raise major alarm bells for an administrator: e.g.
521 // they should be sent directly to an event manager at high
522 // priority or logged as urgent problems.
524 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
526 error_to_caller = DiskHashError
529 if error_to_caller != NotFoundError {
530 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
533 // Update the timestamp if the caller requested.
534 // If we could not update the timestamp, continue looking on
536 if update_timestamp {
537 if vol.Touch(hash) != nil {
546 if error_to_caller != NotFoundError {
547 log.Printf("%s: checksum mismatch, no good copy found\n", hash)
549 return nil, error_to_caller
552 /* PutBlock(block, hash)
553 Stores the BLOCK (identified by the content id HASH) in Keep.
555 The MD5 checksum of the block must be identical to the content id HASH.
556 If not, an error is returned.
558 PutBlock stores the BLOCK on the first Keep volume with free space.
559 A failure code is returned to the user only if all volumes fail.
561 On success, PutBlock returns nil.
562 On failure, it returns a KeepError with one of the following codes:
565 A different block with the same hash already exists on this
568 The MD5 hash of the BLOCK does not match the argument HASH.
570 There was not enough space left in any Keep volume to store
573 The object could not be stored for some other reason (e.g.
574 all writes failed). The text of the error message should
575 provide as much detail as possible.
578 func PutBlock(block []byte, hash string) error {
579 // Check that BLOCK's checksum matches HASH.
580 blockhash := fmt.Sprintf("%x", md5.Sum(block))
581 if blockhash != hash {
582 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
583 return RequestHashError
586 // If we already have a block on disk under this identifier, return
587 // success (but check for MD5 collisions). While fetching the block,
588 // update its timestamp.
589 // The only errors that GetBlock can return are DiskHashError and NotFoundError.
590 // In either case, we want to write our new (good) block to disk,
591 // so there is nothing special to do if err != nil.
593 if oldblock, err := GetBlock(hash, true); err == nil {
594 if bytes.Compare(block, oldblock) == 0 {
595 // The block already exists; return success.
598 return CollisionError
602 // Choose a Keep volume to write to.
603 // If this volume fails, try all of the volumes in order.
604 vol := KeepVM.Choose()
605 if err := vol.Put(hash, block); err == nil {
606 return nil // success!
609 for _, vol := range KeepVM.Volumes() {
610 err := vol.Put(hash, block)
612 return nil // success!
614 if err != FullError {
615 // The volume is not full but the write did not succeed.
616 // Report the error and continue trying.
618 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
623 log.Printf("all Keep volumes full")
626 log.Printf("all Keep volumes failed")
633 // Return true if the specified string is a valid Keep locator.
634 // When Keep is extended to support hash types other than MD5,
635 // this should be updated to cover those as well.
637 func IsValidLocator(loc string) bool {
638 match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
642 log.Printf("IsValidLocator: %s\n", err)
646 // GetApiToken returns the OAuth2 token from the Authorization
647 // header of a HTTP request, or an empty string if no matching
649 func GetApiToken(req *http.Request) string {
650 if auth, ok := req.Header["Authorization"]; ok {
651 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
653 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
660 // IsExpired returns true if the given Unix timestamp (expressed as a
661 // hexadecimal string) is in the past, or if timestamp_hex cannot be
662 // parsed as a hexadecimal string.
663 func IsExpired(timestamp_hex string) bool {
664 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
666 log.Printf("IsExpired: %s\n", err)
669 return time.Unix(ts, 0).Before(time.Now())
672 // CanDelete returns true if the user identified by api_token is
673 // allowed to delete blocks.
674 func CanDelete(api_token string) bool {
678 // Blocks may be deleted only when Keep has been configured with a
680 if IsDataManagerToken(api_token) {
683 // TODO(twp): look up api_token with the API server
684 // return true if is_admin is true and if the token
685 // has unlimited scope
689 // IsDataManagerToken returns true if api_token represents the data
691 func IsDataManagerToken(api_token string) bool {
692 return data_manager_token != "" && api_token == data_manager_token