3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
15 "github.com/gorilla/mux"
27 // MakeRESTRouter returns a new mux.Router that forwards all Keep
28 // requests to the appropriate handlers.
30 func MakeRESTRouter() *mux.Router {
31 rest := mux.NewRouter()
34 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
36 `/{hash:[0-9a-f]{32}}+{hints}`,
37 GetBlockHandler).Methods("GET", "HEAD")
39 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
40 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
41 // List all blocks stored here. Privileged client only.
42 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
43 // List blocks stored here whose hash has the given prefix.
44 // Privileged client only.
45 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
47 // List volumes: path, device number, bytes used/avail.
48 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
50 // Replace the current pull queue.
51 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
53 // Replace the current trash queue.
54 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
56 // Any request which does not match any of these routes gets
58 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
63 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
64 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
67 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
68 if enforce_permissions {
69 locator := req.URL.Path[1:] // strip leading slash
70 if err := VerifySignature(locator, GetApiToken(req)); err != nil {
71 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
76 block, err := GetBlock(mux.Vars(req)["hash"])
78 // This type assertion is safe because the only errors
79 // GetBlock can return are DiskHashError or NotFoundError.
80 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
85 resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
86 resp.Header().Set("Content-Type", "application/octet-stream")
90 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
91 hash := mux.Vars(req)["hash"]
93 // Detect as many error conditions as possible before reading
94 // the body: avoid transmitting data that will not end up
95 // being written anyway.
97 if req.ContentLength == -1 {
98 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
102 if req.ContentLength > BLOCKSIZE {
103 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
107 if len(KeepVM.AllWritable()) == 0 {
108 http.Error(resp, FullError.Error(), FullError.HTTPCode)
112 buf := bufs.Get(int(req.ContentLength))
113 _, err := io.ReadFull(req.Body, buf)
115 http.Error(resp, err.Error(), 500)
120 err = PutBlock(buf, hash)
124 ke := err.(*KeepError)
125 http.Error(resp, ke.Error(), ke.HTTPCode)
129 // Success; add a size hint, sign the locator if possible, and
130 // return it to the client.
131 return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
132 api_token := GetApiToken(req)
133 if PermissionSecret != nil && api_token != "" {
134 expiry := time.Now().Add(blob_signature_ttl)
135 return_hash = SignLocator(return_hash, api_token, expiry)
137 resp.Write([]byte(return_hash + "\n"))
141 // A HandleFunc to address /index and /index/{prefix} requests.
143 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
144 // Reject unauthorized requests.
145 if !IsDataManagerToken(GetApiToken(req)) {
146 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
150 prefix := mux.Vars(req)["prefix"]
152 for _, vol := range KeepVM.AllReadable() {
153 if err := vol.IndexTo(prefix, resp); err != nil {
154 // The only errors returned by IndexTo are
155 // write errors returned by resp.Write(),
156 // which probably means the client has
157 // disconnected and this error will never be
158 // reported to the client -- but it will
159 // appear in our own error log.
160 http.Error(resp, err.Error(), http.StatusInternalServerError)
164 // An empty line at EOF is the only way the client can be
165 // assured the entire index was received.
166 resp.Write([]byte{'\n'})
170 // Responds to /status.json requests with the current node status,
171 // described in a JSON structure.
173 // The data given in a status.json response includes:
174 // volumes - a list of Keep volumes currently in use by this server
175 // each volume is an object with the following fields:
177 // * device_num (an integer identifying the underlying filesystem)
181 type PoolStatus struct {
182 Alloc uint64 `json:"BytesAllocated"`
183 Cap int `json:"BuffersMax"`
184 Len int `json:"BuffersInUse"`
187 type NodeStatus struct {
188 Volumes []*VolumeStatus `json:"volumes"`
189 BufferPool PoolStatus
190 PullQueue WorkQueueStatus
191 TrashQueue WorkQueueStatus
192 Memory runtime.MemStats
196 var stLock sync.Mutex
198 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
201 jstat, err := json.Marshal(&st)
206 log.Printf("json.Marshal: %s", err)
207 log.Printf("NodeStatus = %v", &st)
208 http.Error(resp, err.Error(), 500)
212 // populate the given NodeStatus struct with current values.
213 func readNodeStatus(st *NodeStatus) {
214 vols := KeepVM.AllReadable()
215 if cap(st.Volumes) < len(vols) {
216 st.Volumes = make([]*VolumeStatus, len(vols))
218 st.Volumes = st.Volumes[:0]
219 for _, vol := range vols {
220 if s := vol.Status(); s != nil {
221 st.Volumes = append(st.Volumes, s)
224 st.BufferPool.Alloc = bufs.Alloc()
225 st.BufferPool.Cap = bufs.Cap()
226 st.BufferPool.Len = bufs.Len()
227 st.PullQueue = getWorkQueueStatus(pullq)
228 st.TrashQueue = getWorkQueueStatus(trashq)
229 runtime.ReadMemStats(&st.Memory)
232 // return a WorkQueueStatus for the given queue. If q is nil (which
233 // should never happen except in test suites), return a zero status
234 // value instead of crashing.
235 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
237 // This should only happen during tests.
238 return WorkQueueStatus{}
243 // DeleteHandler processes DELETE requests.
245 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
246 // from all connected volumes.
248 // Only the Data Manager, or an Arvados admin with scope "all", are
249 // allowed to issue DELETE requests. If a DELETE request is not
250 // authenticated or is issued by a non-admin user, the server returns
251 // a PermissionError.
253 // Upon receiving a valid request from an authorized user,
254 // DeleteHandler deletes all copies of the specified block on local
259 // If the requested blocks was not found on any volume, the response
260 // code is HTTP 404 Not Found.
262 // Otherwise, the response code is 200 OK, with a response body
263 // consisting of the JSON message
265 // {"copies_deleted":d,"copies_failed":f}
267 // where d and f are integers representing the number of blocks that
268 // were successfully and unsuccessfully deleted.
270 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
271 hash := mux.Vars(req)["hash"]
273 // Confirm that this user is an admin and has a token with unlimited scope.
274 var tok = GetApiToken(req)
275 if tok == "" || !CanDelete(tok) {
276 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
281 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
285 // Delete copies of this block from all available volumes.
286 // Report how many blocks were successfully deleted, and how
287 // many were found on writable volumes but not deleted.
289 Deleted int `json:"copies_deleted"`
290 Failed int `json:"copies_failed"`
292 for _, vol := range KeepVM.AllWritable() {
293 if err := vol.Delete(hash); err == nil {
295 } else if os.IsNotExist(err) {
299 log.Println("DeleteHandler:", err)
305 if result.Deleted == 0 && result.Failed == 0 {
306 st = http.StatusNotFound
313 if st == http.StatusOK {
314 if body, err := json.Marshal(result); err == nil {
317 log.Printf("json.Marshal: %s (result = %v)", err, result)
318 http.Error(resp, err.Error(), 500)
323 /* PullHandler processes "PUT /pull" requests for the data manager.
324 The request body is a JSON message containing a list of pull
325 requests in the following format:
329 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
331 "keep0.qr1hi.arvadosapi.com:25107",
332 "keep1.qr1hi.arvadosapi.com:25108"
336 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
346 Each pull request in the list consists of a block locator string
347 and an ordered list of servers. Keepstore should try to fetch the
348 block from each server in turn.
350 If the request has not been sent by the Data Manager, return 401
353 If the JSON unmarshalling fails, return 400 Bad Request.
356 type PullRequest struct {
357 Locator string `json:"locator"`
358 Servers []string `json:"servers"`
361 func PullHandler(resp http.ResponseWriter, req *http.Request) {
362 // Reject unauthorized requests.
363 if !IsDataManagerToken(GetApiToken(req)) {
364 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
368 // Parse the request body.
370 r := json.NewDecoder(req.Body)
371 if err := r.Decode(&pr); err != nil {
372 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
376 // We have a properly formatted pull list sent from the data
377 // manager. Report success and send the list to the pull list
378 // manager for further handling.
379 resp.WriteHeader(http.StatusOK)
381 fmt.Sprintf("Received %d pull requests\n", len(pr))))
384 for _, p := range pr {
387 pullq.ReplaceQueue(plist)
390 type TrashRequest struct {
391 Locator string `json:"locator"`
392 BlockMtime int64 `json:"block_mtime"`
395 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
396 // Reject unauthorized requests.
397 if !IsDataManagerToken(GetApiToken(req)) {
398 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
402 // Parse the request body.
403 var trash []TrashRequest
404 r := json.NewDecoder(req.Body)
405 if err := r.Decode(&trash); err != nil {
406 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
410 // We have a properly formatted trash list sent from the data
411 // manager. Report success and send the list to the trash work
412 // queue for further handling.
413 resp.WriteHeader(http.StatusOK)
415 fmt.Sprintf("Received %d trash requests\n", len(trash))))
418 for _, t := range trash {
421 trashq.ReplaceQueue(tlist)
424 // ==============================
425 // GetBlock and PutBlock implement lower-level code for handling
426 // blocks by rooting through volumes connected to the local machine.
427 // Once the handler has determined that system policy permits the
428 // request, it calls these methods to perform the actual operation.
430 // TODO(twp): this code would probably be better located in the
431 // VolumeManager interface. As an abstraction, the VolumeManager
432 // should be the only part of the code that cares about which volume a
433 // block is stored on, so it should be responsible for figuring out
434 // which volume to check for fetching blocks, storing blocks, etc.
436 // ==============================
437 // GetBlock fetches and returns the block identified by "hash".
439 // On success, GetBlock returns a byte slice with the block data, and
442 // If the block cannot be found on any volume, returns NotFoundError.
444 // If the block found does not have the correct MD5 hash, returns
448 func GetBlock(hash string) ([]byte, error) {
449 // Attempt to read the requested hash from a keep volume.
450 error_to_caller := NotFoundError
452 for _, vol := range KeepVM.AllReadable() {
453 buf, err := vol.Get(hash)
455 // IsNotExist is an expected error and may be
456 // ignored. All other errors are logged. In
457 // any case we continue trying to read other
458 // volumes. If all volumes report IsNotExist,
459 // we return a NotFoundError.
460 if !os.IsNotExist(err) {
461 log.Printf("%s: Get(%s): %s", vol, hash, err)
465 // Check the file checksum.
467 filehash := fmt.Sprintf("%x", md5.Sum(buf))
468 if filehash != hash {
469 // TODO: Try harder to tell a sysadmin about
471 log.Printf("%s: checksum mismatch for request %s (actual %s)",
473 error_to_caller = DiskHashError
477 if error_to_caller == DiskHashError {
478 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
483 return nil, error_to_caller
486 /* PutBlock(block, hash)
487 Stores the BLOCK (identified by the content id HASH) in Keep.
489 The MD5 checksum of the block must be identical to the content id HASH.
490 If not, an error is returned.
492 PutBlock stores the BLOCK on the first Keep volume with free space.
493 A failure code is returned to the user only if all volumes fail.
495 On success, PutBlock returns nil.
496 On failure, it returns a KeepError with one of the following codes:
499 A different block with the same hash already exists on this
502 The MD5 hash of the BLOCK does not match the argument HASH.
504 There was not enough space left in any Keep volume to store
507 The object could not be stored for some other reason (e.g.
508 all writes failed). The text of the error message should
509 provide as much detail as possible.
512 func PutBlock(block []byte, hash string) error {
513 // Check that BLOCK's checksum matches HASH.
514 blockhash := fmt.Sprintf("%x", md5.Sum(block))
515 if blockhash != hash {
516 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
517 return RequestHashError
520 // If we already have this data, it's intact on disk, and we
521 // can update its timestamp, return success. If we have
522 // different data with the same hash, return failure.
523 if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
527 // Choose a Keep volume to write to.
528 // If this volume fails, try all of the volumes in order.
529 if vol := KeepVM.NextWritable(); vol != nil {
530 if err := vol.Put(hash, block); err == nil {
531 return nil // success!
535 writables := KeepVM.AllWritable()
536 if len(writables) == 0 {
537 log.Print("No writable volumes.")
542 for _, vol := range writables {
543 err := vol.Put(hash, block)
545 return nil // success!
547 if err != FullError {
548 // The volume is not full but the
549 // write did not succeed. Report the
550 // error and continue trying.
552 log.Printf("%s: Write(%s): %s", vol, hash, err)
557 log.Print("All volumes are full.")
560 // Already logged the non-full errors.
565 // CompareAndTouch returns nil if one of the volumes already has the
566 // given content and it successfully updates the relevant block's
567 // modification time in order to protect it from premature garbage
569 func CompareAndTouch(hash string, buf []byte) error {
570 var bestErr error = NotFoundError
571 for _, vol := range KeepVM.AllWritable() {
572 if err := vol.Compare(hash, buf); err == CollisionError {
573 // Stop if we have a block with same hash but
574 // different content. (It will be impossible
575 // to tell which one is wanted if we have
576 // both, so there's no point writing it even
577 // on a different volume.)
578 log.Printf("%s: Compare(%s): %s", vol, hash, err)
580 } else if os.IsNotExist(err) {
581 // Block does not exist. This is the only
582 // "normal" error: we don't log anything.
584 } else if err != nil {
585 // Couldn't open file, data is corrupt on
586 // disk, etc.: log this abnormal condition,
587 // and try the next volume.
588 log.Printf("%s: Compare(%s): %s", vol, hash, err)
591 if err := vol.Touch(hash); err != nil {
592 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
596 // Compare and Touch both worked --> done.
602 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
605 // Return true if the specified string is a valid Keep locator.
606 // When Keep is extended to support hash types other than MD5,
607 // this should be updated to cover those as well.
609 func IsValidLocator(loc string) bool {
610 return validLocatorRe.MatchString(loc)
613 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
615 // GetApiToken returns the OAuth2 token from the Authorization
616 // header of a HTTP request, or an empty string if no matching
618 func GetApiToken(req *http.Request) string {
619 if auth, ok := req.Header["Authorization"]; ok {
620 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
627 // IsExpired returns true if the given Unix timestamp (expressed as a
628 // hexadecimal string) is in the past, or if timestamp_hex cannot be
629 // parsed as a hexadecimal string.
630 func IsExpired(timestamp_hex string) bool {
631 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
633 log.Printf("IsExpired: %s", err)
636 return time.Unix(ts, 0).Before(time.Now())
639 // CanDelete returns true if the user identified by api_token is
640 // allowed to delete blocks.
641 func CanDelete(api_token string) bool {
645 // Blocks may be deleted only when Keep has been configured with a
647 if IsDataManagerToken(api_token) {
650 // TODO(twp): look up api_token with the API server
651 // return true if is_admin is true and if the token
652 // has unlimited scope
656 // IsDataManagerToken returns true if api_token represents the data
658 func IsDataManagerToken(api_token string) bool {
659 return data_manager_token != "" && api_token == data_manager_token