3 // REST handlers for Keep are implemented here.
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler (GET /index, GET /index/prefix)
8 // StatusHandler (GET /status.json)
15 "github.com/gorilla/mux"
27 // MakeRESTRouter returns a new mux.Router that forwards all Keep
28 // requests to the appropriate handlers.
30 func MakeRESTRouter() *mux.Router {
31 rest := mux.NewRouter()
34 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
36 `/{hash:[0-9a-f]{32}}+{hints}`,
37 GetBlockHandler).Methods("GET", "HEAD")
39 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
40 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
41 // List all blocks stored here. Privileged client only.
42 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
43 // List blocks stored here whose hash has the given prefix.
44 // Privileged client only.
45 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
47 // List volumes: path, device number, bytes used/avail.
48 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
50 // Replace the current pull queue.
51 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
53 // Replace the current trash queue.
54 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
56 // Any request which does not match any of these routes gets
58 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
63 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
64 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
67 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
68 if enforce_permissions {
69 locator := req.URL.Path[1:] // strip leading slash
70 if err := VerifySignature(locator, GetApiToken(req)); err != nil {
71 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
76 block, err := GetBlock(mux.Vars(req)["hash"])
78 // This type assertion is safe because the only errors
79 // GetBlock can return are DiskHashError or NotFoundError.
80 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
85 resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
86 resp.Header().Set("Content-Type", "application/octet-stream")
90 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
91 hash := mux.Vars(req)["hash"]
93 // Detect as many error conditions as possible before reading
94 // the body: avoid transmitting data that will not end up
95 // being written anyway.
97 if req.ContentLength == -1 {
98 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
102 if req.ContentLength > BLOCKSIZE {
103 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
107 if len(KeepVM.AllWritable()) == 0 {
108 http.Error(resp, FullError.Error(), FullError.HTTPCode)
112 buf := bufs.Get(int(req.ContentLength))
113 _, err := io.ReadFull(req.Body, buf)
115 http.Error(resp, err.Error(), 500)
120 err = PutBlock(buf, hash)
124 ke := err.(*KeepError)
125 http.Error(resp, ke.Error(), ke.HTTPCode)
129 // Success; add a size hint, sign the locator if possible, and
130 // return it to the client.
131 return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
132 api_token := GetApiToken(req)
133 if PermissionSecret != nil && api_token != "" {
134 expiry := time.Now().Add(blob_signature_ttl)
135 return_hash = SignLocator(return_hash, api_token, expiry)
137 resp.Write([]byte(return_hash + "\n"))
141 // A HandleFunc to address /index and /index/{prefix} requests.
143 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
144 // Reject unauthorized requests.
145 if !IsDataManagerToken(GetApiToken(req)) {
146 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
150 prefix := mux.Vars(req)["prefix"]
152 for _, vol := range KeepVM.AllReadable() {
153 if err := vol.IndexTo(prefix, resp); err != nil {
154 // The only errors returned by IndexTo are
155 // write errors returned by resp.Write(),
156 // which probably means the client has
157 // disconnected and this error will never be
158 // reported to the client -- but it will
159 // appear in our own error log.
160 http.Error(resp, err.Error(), http.StatusInternalServerError)
164 // An empty line at EOF is the only way the client can be
165 // assured the entire index was received.
166 resp.Write([]byte{'\n'})
170 // Responds to /status.json requests with the current node status,
171 // described in a JSON structure.
173 // The data given in a status.json response includes:
174 // volumes - a list of Keep volumes currently in use by this server
175 // each volume is an object with the following fields:
177 // * device_num (an integer identifying the underlying filesystem)
181 type VolumeStatus struct {
182 MountPoint string `json:"mount_point"`
183 DeviceNum uint64 `json:"device_num"`
184 BytesFree uint64 `json:"bytes_free"`
185 BytesUsed uint64 `json:"bytes_used"`
188 type PoolStatus struct {
189 Alloc uint64 `json:"BytesAllocated"`
190 Cap int `json:"BuffersMax"`
191 Len int `json:"BuffersInUse"`
194 type NodeStatus struct {
195 Volumes []*VolumeStatus `json:"volumes"`
196 BufferPool PoolStatus
197 PullQueue WorkQueueStatus
198 TrashQueue WorkQueueStatus
199 Memory runtime.MemStats
203 var stLock sync.Mutex
205 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
208 jstat, err := json.Marshal(&st)
213 log.Printf("json.Marshal: %s\n", err)
214 log.Printf("NodeStatus = %v\n", &st)
215 http.Error(resp, err.Error(), 500)
219 // populate the given NodeStatus struct with current values.
220 func readNodeStatus(st *NodeStatus) {
221 vols := KeepVM.AllReadable()
222 if cap(st.Volumes) < len(vols) {
223 st.Volumes = make([]*VolumeStatus, len(vols))
225 st.Volumes = st.Volumes[:0]
226 for _, vol := range vols {
227 if s := vol.Status(); s != nil {
228 st.Volumes = append(st.Volumes, s)
231 st.BufferPool.Alloc = bufs.Alloc()
232 st.BufferPool.Cap = bufs.Cap()
233 st.BufferPool.Len = bufs.Len()
234 st.PullQueue = getWorkQueueStatus(pullq)
235 st.TrashQueue = getWorkQueueStatus(trashq)
236 runtime.ReadMemStats(&st.Memory)
239 // return a WorkQueueStatus for the given queue. If q is nil (which
240 // should never happen except in test suites), return a zero status
241 // value instead of crashing.
242 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
244 // This should only happen during tests.
245 return WorkQueueStatus{}
250 // DeleteHandler processes DELETE requests.
252 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
253 // from all connected volumes.
255 // Only the Data Manager, or an Arvados admin with scope "all", are
256 // allowed to issue DELETE requests. If a DELETE request is not
257 // authenticated or is issued by a non-admin user, the server returns
258 // a PermissionError.
260 // Upon receiving a valid request from an authorized user,
261 // DeleteHandler deletes all copies of the specified block on local
266 // If the requested blocks was not found on any volume, the response
267 // code is HTTP 404 Not Found.
269 // Otherwise, the response code is 200 OK, with a response body
270 // consisting of the JSON message
272 // {"copies_deleted":d,"copies_failed":f}
274 // where d and f are integers representing the number of blocks that
275 // were successfully and unsuccessfully deleted.
277 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
278 hash := mux.Vars(req)["hash"]
280 // Confirm that this user is an admin and has a token with unlimited scope.
281 var tok = GetApiToken(req)
282 if tok == "" || !CanDelete(tok) {
283 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
288 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
292 // Delete copies of this block from all available volumes.
293 // Report how many blocks were successfully deleted, and how
294 // many were found on writable volumes but not deleted.
296 Deleted int `json:"copies_deleted"`
297 Failed int `json:"copies_failed"`
299 for _, vol := range KeepVM.AllWritable() {
300 if err := vol.Delete(hash); err == nil {
302 } else if os.IsNotExist(err) {
306 log.Println("DeleteHandler:", err)
312 if result.Deleted == 0 && result.Failed == 0 {
313 st = http.StatusNotFound
320 if st == http.StatusOK {
321 if body, err := json.Marshal(result); err == nil {
324 log.Printf("json.Marshal: %s (result = %v)\n", err, result)
325 http.Error(resp, err.Error(), 500)
330 /* PullHandler processes "PUT /pull" requests for the data manager.
331 The request body is a JSON message containing a list of pull
332 requests in the following format:
336 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
338 "keep0.qr1hi.arvadosapi.com:25107",
339 "keep1.qr1hi.arvadosapi.com:25108"
343 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
353 Each pull request in the list consists of a block locator string
354 and an ordered list of servers. Keepstore should try to fetch the
355 block from each server in turn.
357 If the request has not been sent by the Data Manager, return 401
360 If the JSON unmarshalling fails, return 400 Bad Request.
363 type PullRequest struct {
364 Locator string `json:"locator"`
365 Servers []string `json:"servers"`
368 func PullHandler(resp http.ResponseWriter, req *http.Request) {
369 // Reject unauthorized requests.
370 if !IsDataManagerToken(GetApiToken(req)) {
371 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
375 // Parse the request body.
377 r := json.NewDecoder(req.Body)
378 if err := r.Decode(&pr); err != nil {
379 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
383 // We have a properly formatted pull list sent from the data
384 // manager. Report success and send the list to the pull list
385 // manager for further handling.
386 resp.WriteHeader(http.StatusOK)
388 fmt.Sprintf("Received %d pull requests\n", len(pr))))
391 for _, p := range pr {
394 pullq.ReplaceQueue(plist)
397 type TrashRequest struct {
398 Locator string `json:"locator"`
399 BlockMtime int64 `json:"block_mtime"`
402 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
403 // Reject unauthorized requests.
404 if !IsDataManagerToken(GetApiToken(req)) {
405 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
409 // Parse the request body.
410 var trash []TrashRequest
411 r := json.NewDecoder(req.Body)
412 if err := r.Decode(&trash); err != nil {
413 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
417 // We have a properly formatted trash list sent from the data
418 // manager. Report success and send the list to the trash work
419 // queue for further handling.
420 resp.WriteHeader(http.StatusOK)
422 fmt.Sprintf("Received %d trash requests\n", len(trash))))
425 for _, t := range trash {
428 trashq.ReplaceQueue(tlist)
431 // ==============================
432 // GetBlock and PutBlock implement lower-level code for handling
433 // blocks by rooting through volumes connected to the local machine.
434 // Once the handler has determined that system policy permits the
435 // request, it calls these methods to perform the actual operation.
437 // TODO(twp): this code would probably be better located in the
438 // VolumeManager interface. As an abstraction, the VolumeManager
439 // should be the only part of the code that cares about which volume a
440 // block is stored on, so it should be responsible for figuring out
441 // which volume to check for fetching blocks, storing blocks, etc.
443 // ==============================
444 // GetBlock fetches and returns the block identified by "hash".
446 // On success, GetBlock returns a byte slice with the block data, and
449 // If the block cannot be found on any volume, returns NotFoundError.
451 // If the block found does not have the correct MD5 hash, returns
455 func GetBlock(hash string) ([]byte, error) {
456 // Attempt to read the requested hash from a keep volume.
457 error_to_caller := NotFoundError
459 for _, vol := range KeepVM.AllReadable() {
460 buf, err := vol.Get(hash)
462 // IsNotExist is an expected error and may be
463 // ignored. All other errors are logged. In
464 // any case we continue trying to read other
465 // volumes. If all volumes report IsNotExist,
466 // we return a NotFoundError.
467 if !os.IsNotExist(err) {
468 log.Printf("GetBlock: reading %s: %s\n", hash, err)
472 // Check the file checksum.
474 filehash := fmt.Sprintf("%x", md5.Sum(buf))
475 if filehash != hash {
476 // TODO: Try harder to tell a sysadmin about
478 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
480 error_to_caller = DiskHashError
484 if error_to_caller == DiskHashError {
485 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
490 return nil, error_to_caller
493 /* PutBlock(block, hash)
494 Stores the BLOCK (identified by the content id HASH) in Keep.
496 The MD5 checksum of the block must be identical to the content id HASH.
497 If not, an error is returned.
499 PutBlock stores the BLOCK on the first Keep volume with free space.
500 A failure code is returned to the user only if all volumes fail.
502 On success, PutBlock returns nil.
503 On failure, it returns a KeepError with one of the following codes:
506 A different block with the same hash already exists on this
509 The MD5 hash of the BLOCK does not match the argument HASH.
511 There was not enough space left in any Keep volume to store
514 The object could not be stored for some other reason (e.g.
515 all writes failed). The text of the error message should
516 provide as much detail as possible.
519 func PutBlock(block []byte, hash string) error {
520 // Check that BLOCK's checksum matches HASH.
521 blockhash := fmt.Sprintf("%x", md5.Sum(block))
522 if blockhash != hash {
523 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
524 return RequestHashError
527 // If we already have this data, it's intact on disk, and we
528 // can update its timestamp, return success. If we have
529 // different data with the same hash, return failure.
530 if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
534 // Choose a Keep volume to write to.
535 // If this volume fails, try all of the volumes in order.
536 if vol := KeepVM.NextWritable(); vol != nil {
537 if err := vol.Put(hash, block); err == nil {
538 return nil // success!
542 writables := KeepVM.AllWritable()
543 if len(writables) == 0 {
544 log.Print("No writable volumes.")
549 for _, vol := range writables {
550 err := vol.Put(hash, block)
552 return nil // success!
554 if err != FullError {
555 // The volume is not full but the
556 // write did not succeed. Report the
557 // error and continue trying.
559 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
564 log.Print("All volumes are full.")
567 // Already logged the non-full errors.
572 // CompareAndTouch returns nil if one of the volumes already has the
573 // given content and it successfully updates the relevant block's
574 // modification time in order to protect it from premature garbage
576 func CompareAndTouch(hash string, buf []byte) error {
577 var bestErr error = NotFoundError
578 for _, vol := range KeepVM.AllWritable() {
579 if err := vol.Compare(hash, buf); err == CollisionError {
580 // Stop if we have a block with same hash but
581 // different content. (It will be impossible
582 // to tell which one is wanted if we have
583 // both, so there's no point writing it even
584 // on a different volume.)
586 } else if err != nil {
587 // Couldn't find, couldn't open, etc.: try next volume.
590 if err := vol.Touch(hash); err != nil {
591 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
595 // Compare and Touch both worked --> done.
601 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
604 // Return true if the specified string is a valid Keep locator.
605 // When Keep is extended to support hash types other than MD5,
606 // this should be updated to cover those as well.
608 func IsValidLocator(loc string) bool {
609 return validLocatorRe.MatchString(loc)
612 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
614 // GetApiToken returns the OAuth2 token from the Authorization
615 // header of a HTTP request, or an empty string if no matching
617 func GetApiToken(req *http.Request) string {
618 if auth, ok := req.Header["Authorization"]; ok {
619 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
626 // IsExpired returns true if the given Unix timestamp (expressed as a
627 // hexadecimal string) is in the past, or if timestamp_hex cannot be
628 // parsed as a hexadecimal string.
629 func IsExpired(timestamp_hex string) bool {
630 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
632 log.Printf("IsExpired: %s\n", err)
635 return time.Unix(ts, 0).Before(time.Now())
638 // CanDelete returns true if the user identified by api_token is
639 // allowed to delete blocks.
640 func CanDelete(api_token string) bool {
644 // Blocks may be deleted only when Keep has been configured with a
646 if IsDataManagerToken(api_token) {
649 // TODO(twp): look up api_token with the API server
650 // return true if is_admin is true and if the token
651 // has unlimited scope
655 // IsDataManagerToken returns true if api_token represents the data
657 func IsDataManagerToken(api_token string) bool {
658 return data_manager_token != "" && api_token == data_manager_token