3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
16 "github.com/gorilla/mux"
28 // MakeRESTRouter returns a new mux.Router that forwards all Keep
29 // requests to the appropriate handlers.
31 func MakeRESTRouter() *mux.Router {
32 rest := mux.NewRouter()
35 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37 `/{hash:[0-9a-f]{32}}+{hints}`,
38 GetBlockHandler).Methods("GET", "HEAD")
40 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
41 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
42 // List all blocks stored here. Privileged client only.
43 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
44 // List blocks stored here whose hash has the given prefix.
45 // Privileged client only.
46 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
48 // List volumes: path, device number, bytes used/avail.
49 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
51 // Replace the current pull queue.
52 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
54 // Replace the current trash queue.
55 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
57 // Any request which does not match any of these routes gets
59 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
64 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
65 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
68 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
69 if enforce_permissions {
70 locator := req.URL.Path[1:] // strip leading slash
71 if err := VerifySignature(locator, GetApiToken(req)); err != nil {
72 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
77 block, err := GetBlock(mux.Vars(req)["hash"], false)
79 // This type assertion is safe because the only errors
80 // GetBlock can return are DiskHashError or NotFoundError.
81 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
86 resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
87 resp.Header().Set("Content-Type", "application/octet-stream")
91 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
92 hash := mux.Vars(req)["hash"]
94 // Detect as many error conditions as possible before reading
95 // the body: avoid transmitting data that will not end up
96 // being written anyway.
98 if req.ContentLength == -1 {
99 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
103 if req.ContentLength > BLOCKSIZE {
104 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
108 if len(KeepVM.AllWritable()) == 0 {
109 http.Error(resp, FullError.Error(), FullError.HTTPCode)
113 buf := bufs.Get(int(req.ContentLength))
114 _, err := io.ReadFull(req.Body, buf)
116 http.Error(resp, err.Error(), 500)
121 err = PutBlock(buf, hash)
125 ke := err.(*KeepError)
126 http.Error(resp, ke.Error(), ke.HTTPCode)
130 // Success; add a size hint, sign the locator if possible, and
131 // return it to the client.
132 return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
133 api_token := GetApiToken(req)
134 if PermissionSecret != nil && api_token != "" {
135 expiry := time.Now().Add(blob_signature_ttl)
136 return_hash = SignLocator(return_hash, api_token, expiry)
138 resp.Write([]byte(return_hash + "\n"))
142 // A HandleFunc to address /index and /index/{prefix} requests.
144 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
145 // Reject unauthorized requests.
146 if !IsDataManagerToken(GetApiToken(req)) {
147 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
151 prefix := mux.Vars(req)["prefix"]
153 for _, vol := range KeepVM.AllReadable() {
154 if err := vol.IndexTo(prefix, resp); err != nil {
155 // The only errors returned by IndexTo are
156 // write errors returned by resp.Write(),
157 // which probably means the client has
158 // disconnected and this error will never be
159 // reported to the client -- but it will
160 // appear in our own error log.
161 http.Error(resp, err.Error(), http.StatusInternalServerError)
165 // An empty line at EOF is the only way the client can be
166 // assured the entire index was received.
167 resp.Write([]byte{'\n'})
171 // Responds to /status.json requests with the current node status,
172 // described in a JSON structure.
174 // The data given in a status.json response includes:
175 // volumes - a list of Keep volumes currently in use by this server
176 // each volume is an object with the following fields:
178 // * device_num (an integer identifying the underlying filesystem)
182 type VolumeStatus struct {
183 MountPoint string `json:"mount_point"`
184 DeviceNum uint64 `json:"device_num"`
185 BytesFree uint64 `json:"bytes_free"`
186 BytesUsed uint64 `json:"bytes_used"`
189 type PoolStatus struct {
190 Alloc uint64 `json:"BytesAllocated"`
191 Cap int `json:"BuffersMax"`
192 Len int `json:"BuffersInUse"`
195 type NodeStatus struct {
196 Volumes []*VolumeStatus `json:"volumes"`
197 BufferPool PoolStatus
198 Memory runtime.MemStats
202 var stLock sync.Mutex
203 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
206 jstat, err := json.Marshal(&st)
211 log.Printf("json.Marshal: %s\n", err)
212 log.Printf("NodeStatus = %v\n", &st)
213 http.Error(resp, err.Error(), 500)
217 // ReadNodeStatus populates the given NodeStatus struct with current
220 func ReadNodeStatus(st *NodeStatus) {
221 vols := KeepVM.AllReadable()
222 if cap(st.Volumes) < len(vols) {
223 st.Volumes = make([]*VolumeStatus, len(vols))
225 st.Volumes = st.Volumes[:0]
226 for _, vol := range vols {
227 if s := vol.Status(); s != nil {
228 st.Volumes = append(st.Volumes, s)
231 st.BufferPool.Alloc = bufs.Alloc()
232 st.BufferPool.Cap = bufs.Cap()
233 st.BufferPool.Len = bufs.Len()
234 runtime.ReadMemStats(&st.Memory)
237 // DeleteHandler processes DELETE requests.
239 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
240 // from all connected volumes.
242 // Only the Data Manager, or an Arvados admin with scope "all", are
243 // allowed to issue DELETE requests. If a DELETE request is not
244 // authenticated or is issued by a non-admin user, the server returns
245 // a PermissionError.
247 // Upon receiving a valid request from an authorized user,
248 // DeleteHandler deletes all copies of the specified block on local
253 // If the requested blocks was not found on any volume, the response
254 // code is HTTP 404 Not Found.
256 // Otherwise, the response code is 200 OK, with a response body
257 // consisting of the JSON message
259 // {"copies_deleted":d,"copies_failed":f}
261 // where d and f are integers representing the number of blocks that
262 // were successfully and unsuccessfully deleted.
264 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
265 hash := mux.Vars(req)["hash"]
267 // Confirm that this user is an admin and has a token with unlimited scope.
268 var tok = GetApiToken(req)
269 if tok == "" || !CanDelete(tok) {
270 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
275 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
279 // Delete copies of this block from all available volumes.
280 // Report how many blocks were successfully deleted, and how
281 // many were found on writable volumes but not deleted.
283 Deleted int `json:"copies_deleted"`
284 Failed int `json:"copies_failed"`
286 for _, vol := range KeepVM.AllWritable() {
287 if err := vol.Delete(hash); err == nil {
289 } else if os.IsNotExist(err) {
293 log.Println("DeleteHandler:", err)
299 if result.Deleted == 0 && result.Failed == 0 {
300 st = http.StatusNotFound
307 if st == http.StatusOK {
308 if body, err := json.Marshal(result); err == nil {
311 log.Printf("json.Marshal: %s (result = %v)\n", err, result)
312 http.Error(resp, err.Error(), 500)
317 /* PullHandler processes "PUT /pull" requests for the data manager.
318 The request body is a JSON message containing a list of pull
319 requests in the following format:
323 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
325 "keep0.qr1hi.arvadosapi.com:25107",
326 "keep1.qr1hi.arvadosapi.com:25108"
330 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
340 Each pull request in the list consists of a block locator string
341 and an ordered list of servers. Keepstore should try to fetch the
342 block from each server in turn.
344 If the request has not been sent by the Data Manager, return 401
347 If the JSON unmarshalling fails, return 400 Bad Request.
350 type PullRequest struct {
351 Locator string `json:"locator"`
352 Servers []string `json:"servers"`
355 func PullHandler(resp http.ResponseWriter, req *http.Request) {
356 // Reject unauthorized requests.
357 if !IsDataManagerToken(GetApiToken(req)) {
358 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
362 // Parse the request body.
364 r := json.NewDecoder(req.Body)
365 if err := r.Decode(&pr); err != nil {
366 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
370 // We have a properly formatted pull list sent from the data
371 // manager. Report success and send the list to the pull list
372 // manager for further handling.
373 resp.WriteHeader(http.StatusOK)
375 fmt.Sprintf("Received %d pull requests\n", len(pr))))
378 for _, p := range pr {
381 pullq.ReplaceQueue(plist)
384 type TrashRequest struct {
385 Locator string `json:"locator"`
386 BlockMtime int64 `json:"block_mtime"`
389 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
390 // Reject unauthorized requests.
391 if !IsDataManagerToken(GetApiToken(req)) {
392 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
396 // Parse the request body.
397 var trash []TrashRequest
398 r := json.NewDecoder(req.Body)
399 if err := r.Decode(&trash); err != nil {
400 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
404 // We have a properly formatted trash list sent from the data
405 // manager. Report success and send the list to the trash work
406 // queue for further handling.
407 resp.WriteHeader(http.StatusOK)
409 fmt.Sprintf("Received %d trash requests\n", len(trash))))
412 for _, t := range trash {
415 trashq.ReplaceQueue(tlist)
418 // ==============================
419 // GetBlock and PutBlock implement lower-level code for handling
420 // blocks by rooting through volumes connected to the local machine.
421 // Once the handler has determined that system policy permits the
422 // request, it calls these methods to perform the actual operation.
424 // TODO(twp): this code would probably be better located in the
425 // VolumeManager interface. As an abstraction, the VolumeManager
426 // should be the only part of the code that cares about which volume a
427 // block is stored on, so it should be responsible for figuring out
428 // which volume to check for fetching blocks, storing blocks, etc.
430 // ==============================
431 // GetBlock fetches and returns the block identified by "hash". If
432 // the update_timestamp argument is true, GetBlock also updates the
433 // block's file modification time (for the sake of PutBlock, which
434 // must update the file's timestamp when the block already exists).
436 // On success, GetBlock returns a byte slice with the block data, and
439 // If the block cannot be found on any volume, returns NotFoundError.
441 // If the block found does not have the correct MD5 hash, returns
445 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
446 // Attempt to read the requested hash from a keep volume.
447 error_to_caller := NotFoundError
450 if update_timestamp {
451 // Pointless to find the block on an unwritable volume
452 // because Touch() will fail -- this is as good as
453 // "not found" for purposes of callers who need to
455 vols = KeepVM.AllWritable()
457 vols = KeepVM.AllReadable()
460 for _, vol := range vols {
461 buf, err := vol.Get(hash)
463 // IsNotExist is an expected error and may be
464 // ignored. All other errors are logged. In
465 // any case we continue trying to read other
466 // volumes. If all volumes report IsNotExist,
467 // we return a NotFoundError.
468 if !os.IsNotExist(err) {
469 log.Printf("GetBlock: reading %s: %s\n", hash, err)
473 // Check the file checksum.
475 filehash := fmt.Sprintf("%x", md5.Sum(buf))
476 if filehash != hash {
477 // TODO: Try harder to tell a sysadmin about
479 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
481 error_to_caller = DiskHashError
485 if error_to_caller == DiskHashError {
486 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
489 if update_timestamp {
490 if err := vol.Touch(hash); err != nil {
491 error_to_caller = GenericError
492 log.Printf("%s: Touch %s failed: %s",
493 vol, hash, error_to_caller)
500 return nil, error_to_caller
503 /* PutBlock(block, hash)
504 Stores the BLOCK (identified by the content id HASH) in Keep.
506 The MD5 checksum of the block must be identical to the content id HASH.
507 If not, an error is returned.
509 PutBlock stores the BLOCK on the first Keep volume with free space.
510 A failure code is returned to the user only if all volumes fail.
512 On success, PutBlock returns nil.
513 On failure, it returns a KeepError with one of the following codes:
516 A different block with the same hash already exists on this
519 The MD5 hash of the BLOCK does not match the argument HASH.
521 There was not enough space left in any Keep volume to store
524 The object could not be stored for some other reason (e.g.
525 all writes failed). The text of the error message should
526 provide as much detail as possible.
529 func PutBlock(block []byte, hash string) error {
530 // Check that BLOCK's checksum matches HASH.
531 blockhash := fmt.Sprintf("%x", md5.Sum(block))
532 if blockhash != hash {
533 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
534 return RequestHashError
537 // If we already have a block on disk under this identifier, return
538 // success (but check for MD5 collisions). While fetching the block,
539 // update its timestamp.
540 // The only errors that GetBlock can return are DiskHashError and NotFoundError.
541 // In either case, we want to write our new (good) block to disk,
542 // so there is nothing special to do if err != nil.
544 if oldblock, err := GetBlock(hash, true); err == nil {
545 defer bufs.Put(oldblock)
546 if bytes.Compare(block, oldblock) == 0 {
547 // The block already exists; return success.
550 return CollisionError
554 // Choose a Keep volume to write to.
555 // If this volume fails, try all of the volumes in order.
556 if vol := KeepVM.NextWritable(); vol != nil {
557 if err := vol.Put(hash, block); err == nil {
558 return nil // success!
562 writables := KeepVM.AllWritable()
563 if len(writables) == 0 {
564 log.Print("No writable volumes.")
569 for _, vol := range writables {
570 err := vol.Put(hash, block)
572 return nil // success!
574 if err != FullError {
575 // The volume is not full but the
576 // write did not succeed. Report the
577 // error and continue trying.
579 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
584 log.Print("All volumes are full.")
587 // Already logged the non-full errors.
592 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
595 // Return true if the specified string is a valid Keep locator.
596 // When Keep is extended to support hash types other than MD5,
597 // this should be updated to cover those as well.
599 func IsValidLocator(loc string) bool {
600 return validLocatorRe.MatchString(loc)
603 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
605 // GetApiToken returns the OAuth2 token from the Authorization
606 // header of a HTTP request, or an empty string if no matching
608 func GetApiToken(req *http.Request) string {
609 if auth, ok := req.Header["Authorization"]; ok {
610 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
617 // IsExpired returns true if the given Unix timestamp (expressed as a
618 // hexadecimal string) is in the past, or if timestamp_hex cannot be
619 // parsed as a hexadecimal string.
620 func IsExpired(timestamp_hex string) bool {
621 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
623 log.Printf("IsExpired: %s\n", err)
626 return time.Unix(ts, 0).Before(time.Now())
629 // CanDelete returns true if the user identified by api_token is
630 // allowed to delete blocks.
631 func CanDelete(api_token string) bool {
635 // Blocks may be deleted only when Keep has been configured with a
637 if IsDataManagerToken(api_token) {
640 // TODO(twp): look up api_token with the API server
641 // return true if is_admin is true and if the token
642 // has unlimited scope
646 // IsDataManagerToken returns true if api_token represents the data
648 func IsDataManagerToken(api_token string) bool {
649 return data_manager_token != "" && api_token == data_manager_token