4015: fix pipeline instance tests
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "bufio"
12         "bytes"
13         "container/list"
14         "crypto/md5"
15         "encoding/json"
16         "fmt"
17         "github.com/gorilla/mux"
18         "io"
19         "log"
20         "net/http"
21         "os"
22         "regexp"
23         "runtime"
24         "strconv"
25         "strings"
26         "syscall"
27         "time"
28 )
29
30 // MakeRESTRouter returns a new mux.Router that forwards all Keep
31 // requests to the appropriate handlers.
32 //
33 func MakeRESTRouter() *mux.Router {
34         rest := mux.NewRouter()
35
36         rest.HandleFunc(
37                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
38         rest.HandleFunc(
39                 `/{hash:[0-9a-f]{32}}+{hints}`,
40                 GetBlockHandler).Methods("GET", "HEAD")
41
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
43         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
44
45         // For IndexHandler we support:
46         //   /index           - returns all locators
47         //   /index/{prefix}  - returns all locators that begin with {prefix}
48         //      {prefix} is a string of hexadecimal digits between 0 and 32 digits.
49         //      If {prefix} is the empty string, return an index of all locators
50         //      (so /index and /index/ behave identically)
51         //      A client may supply a full 32-digit locator string, in which
52         //      case the server will return an index with either zero or one
53         //      entries. This usage allows a client to check whether a block is
54         //      present, and its size and upload time, without retrieving the
55         //      entire block.
56         //
57         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
58         rest.HandleFunc(
59                 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
60         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
61
62         // The PullHandler and TrashHandler process "PUT /pull" and "PUT
63         // /trash" requests from Data Manager.  These requests instruct
64         // Keep to replicate or delete blocks; see
65         // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
66         // for more details.
67         //
68         // Each handler parses the JSON list of block management requests
69         // in the message body, and replaces any existing pull queue or
70         // trash queue with their contentes.
71         //
72         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
73         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
74
75         // Any request which does not match any of these routes gets
76         // 400 Bad Request.
77         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
78
79         return rest
80 }
81
82 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
83         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
84 }
85
86 // FindKeepVolumes scans all mounted volumes on the system for Keep
87 // volumes, and returns a list of matching paths.
88 //
89 // A device is assumed to be a Keep volume if it is a normal or tmpfs
90 // volume and has a "/keep" directory directly underneath the mount
91 // point.
92 //
93 func FindKeepVolumes() []string {
94         vols := make([]string, 0)
95
96         if f, err := os.Open(PROC_MOUNTS); err != nil {
97                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
98         } else {
99                 scanner := bufio.NewScanner(f)
100                 for scanner.Scan() {
101                         args := strings.Fields(scanner.Text())
102                         dev, mount := args[0], args[1]
103                         if mount != "/" &&
104                                 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
105                                 keep := mount + "/keep"
106                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
107                                         vols = append(vols, keep)
108                                 }
109                         }
110                 }
111                 if err := scanner.Err(); err != nil {
112                         log.Fatal(err)
113                 }
114         }
115         return vols
116 }
117
118 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
119         hash := mux.Vars(req)["hash"]
120
121         hints := mux.Vars(req)["hints"]
122
123         // Parse the locator string and hints from the request.
124         // TODO(twp): implement a Locator type.
125         var signature, timestamp string
126         if hints != "" {
127                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
128                 for _, hint := range strings.Split(hints, "+") {
129                         if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
130                                 // Server ignores size hints
131                         } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
132                                 signature = m[1]
133                                 timestamp = m[2]
134                         } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
135                                 // Any unknown hint that starts with an uppercase letter is
136                                 // presumed to be valid and ignored, to permit forward compatibility.
137                         } else {
138                                 // Unknown format; not a valid locator.
139                                 log.Printf("%s %s %d %s", req.Method, hash, BadRequestError.HTTPCode, "-")
140                                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
141                                 return
142                         }
143                 }
144         }
145
146         // If permission checking is in effect, verify this
147         // request's permission signature.
148         if enforce_permissions {
149                 if signature == "" || timestamp == "" {
150                         log.Printf("%s %s %d %s", req.Method, hash, PermissionError.HTTPCode, "-")
151                         http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
152                         return
153                 } else if IsExpired(timestamp) {
154                         log.Printf("%s %s %d %s", req.Method, hash, ExpiredError.HTTPCode, "-")
155                         http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
156                         return
157                 } else {
158                         req_locator := req.URL.Path[1:] // strip leading slash
159                         if !VerifySignature(req_locator, GetApiToken(req)) {
160                                 log.Printf("%s %s %d %s", req.Method, hash, PermissionError.HTTPCode, "-")
161                                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
162                                 return
163                         }
164                 }
165         }
166
167         block, err := GetBlock(hash, false)
168
169         // Garbage collect after each GET. Fixes #2865.
170         // TODO(twp): review Keep memory usage and see if there's
171         // a better way to do this than blindly garbage collecting
172         // after every block.
173         defer runtime.GC()
174
175         if err != nil {
176                 // This type assertion is safe because the only errors
177                 // GetBlock can return are DiskHashError or NotFoundError.
178                 if err == NotFoundError {
179                         log.Printf("%s: not found, giving up\n", hash)
180                 }
181                 log.Printf("%s %s %d %s", req.Method, hash, err.(*KeepError).HTTPCode, "-")
182                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
183                 return
184         }
185
186         resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
187
188         _, err = resp.Write(block)
189         if err != nil {
190                 log.Printf("%s %s %d %s", req.Method, hash, err.(*KeepError).HTTPCode, len(block), "-")
191         } else {
192                 log.Printf("%s %s %d %d", req.Method, hash, 200, len(block))
193         }
194
195         return
196 }
197
198 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
199         // Garbage collect after each PUT. Fixes #2865.
200         // See also GetBlockHandler.
201         defer runtime.GC()
202
203         hash := mux.Vars(req)["hash"]
204
205         // Read the block data to be stored.
206         // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
207         //
208         if req.ContentLength > BLOCKSIZE {
209                 log.Printf("%s %s %d %d", req.Method, hash, TooLongError.HTTPCode, req.ContentLength)
210                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
211                 return
212         }
213
214         buf := make([]byte, req.ContentLength)
215         nread, err := io.ReadFull(req.Body, buf)
216         if err != nil {
217                 log.Printf("%s %s %d %d", req.Method, hash, 500, req.ContentLength)
218                 http.Error(resp, err.Error(), 500)
219         } else if int64(nread) < req.ContentLength {
220                 log.Printf("%s %s %d %d", req.Method, hash, 500, req.ContentLength)
221                 http.Error(resp, "request truncated", 500)
222         } else {
223                 if err := PutBlock(buf, hash); err == nil {
224                         // Success; add a size hint, sign the locator if
225                         // possible, and return it to the client.
226                         return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
227                         api_token := GetApiToken(req)
228                         if PermissionSecret != nil && api_token != "" {
229                                 expiry := time.Now().Add(permission_ttl)
230                                 return_hash = SignLocator(return_hash, api_token, expiry)
231                         }
232                         log.Printf("%s %s %d %d", req.Method, hash, 200, req.ContentLength)
233                         resp.Write([]byte(return_hash + "\n"))
234                 } else {
235                         ke := err.(*KeepError)
236                         log.Printf("%s %s %d %d", req.Method, hash, ke.HTTPCode, req.ContentLength)
237                         http.Error(resp, ke.Error(), ke.HTTPCode)
238                 }
239         }
240         return
241 }
242
243 // IndexHandler
244 //     A HandleFunc to address /index and /index/{prefix} requests.
245 //
246 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
247         // Reject unauthorized requests.
248         if !IsDataManagerToken(GetApiToken(req)) {
249                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
250                 log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
251                 return
252         }
253
254         prefix := mux.Vars(req)["prefix"]
255
256         var index string
257         for _, vol := range KeepVM.Volumes() {
258                 index = index + vol.Index(prefix)
259         }
260         resp.Write([]byte(index))
261 }
262
263 // StatusHandler
264 //     Responds to /status.json requests with the current node status,
265 //     described in a JSON structure.
266 //
267 //     The data given in a status.json response includes:
268 //        volumes - a list of Keep volumes currently in use by this server
269 //          each volume is an object with the following fields:
270 //            * mount_point
271 //            * device_num (an integer identifying the underlying filesystem)
272 //            * bytes_free
273 //            * bytes_used
274 //
275 type VolumeStatus struct {
276         MountPoint string `json:"mount_point"`
277         DeviceNum  uint64 `json:"device_num"`
278         BytesFree  uint64 `json:"bytes_free"`
279         BytesUsed  uint64 `json:"bytes_used"`
280 }
281
282 type NodeStatus struct {
283         Volumes []*VolumeStatus `json:"volumes"`
284 }
285
286 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
287         st := GetNodeStatus()
288         if jstat, err := json.Marshal(st); err == nil {
289                 resp.Write(jstat)
290         } else {
291                 log.Printf("json.Marshal: %s\n", err)
292                 log.Printf("NodeStatus = %v\n", st)
293                 http.Error(resp, err.Error(), 500)
294         }
295 }
296
297 // GetNodeStatus
298 //     Returns a NodeStatus struct describing this Keep
299 //     node's current status.
300 //
301 func GetNodeStatus() *NodeStatus {
302         st := new(NodeStatus)
303
304         st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
305         for i, vol := range KeepVM.Volumes() {
306                 st.Volumes[i] = vol.Status()
307         }
308         return st
309 }
310
311 // GetVolumeStatus
312 //     Returns a VolumeStatus describing the requested volume.
313 //
314 func GetVolumeStatus(volume string) *VolumeStatus {
315         var fs syscall.Statfs_t
316         var devnum uint64
317
318         if fi, err := os.Stat(volume); err == nil {
319                 devnum = fi.Sys().(*syscall.Stat_t).Dev
320         } else {
321                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
322                 return nil
323         }
324
325         err := syscall.Statfs(volume, &fs)
326         if err != nil {
327                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
328                 return nil
329         }
330         // These calculations match the way df calculates disk usage:
331         // "free" space is measured by fs.Bavail, but "used" space
332         // uses fs.Blocks - fs.Bfree.
333         free := fs.Bavail * uint64(fs.Bsize)
334         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
335         return &VolumeStatus{volume, devnum, free, used}
336 }
337
338 // DeleteHandler processes DELETE requests.
339 //
340 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
341 // from all connected volumes.
342 //
343 // Only the Data Manager, or an Arvados admin with scope "all", are
344 // allowed to issue DELETE requests.  If a DELETE request is not
345 // authenticated or is issued by a non-admin user, the server returns
346 // a PermissionError.
347 //
348 // Upon receiving a valid request from an authorized user,
349 // DeleteHandler deletes all copies of the specified block on local
350 // writable volumes.
351 //
352 // Response format:
353 //
354 // If the requested blocks was not found on any volume, the response
355 // code is HTTP 404 Not Found.
356 //
357 // Otherwise, the response code is 200 OK, with a response body
358 // consisting of the JSON message
359 //
360 //    {"copies_deleted":d,"copies_failed":f}
361 //
362 // where d and f are integers representing the number of blocks that
363 // were successfully and unsuccessfully deleted.
364 //
365 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
366         hash := mux.Vars(req)["hash"]
367         log.Printf("%s %s", req.Method, hash)
368
369         // Confirm that this user is an admin and has a token with unlimited scope.
370         var tok = GetApiToken(req)
371         if tok == "" || !CanDelete(tok) {
372                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
373                 return
374         }
375
376         if never_delete {
377                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
378                 return
379         }
380
381         // Delete copies of this block from all available volumes.  Report
382         // how many blocks were successfully and unsuccessfully
383         // deleted.
384         var result struct {
385                 Deleted int `json:"copies_deleted"`
386                 Failed  int `json:"copies_failed"`
387         }
388         for _, vol := range KeepVM.Volumes() {
389                 if err := vol.Delete(hash); err == nil {
390                         result.Deleted++
391                 } else if os.IsNotExist(err) {
392                         continue
393                 } else {
394                         result.Failed++
395                         log.Println("DeleteHandler:", err)
396                 }
397         }
398
399         var st int
400
401         if result.Deleted == 0 && result.Failed == 0 {
402                 st = http.StatusNotFound
403         } else {
404                 st = http.StatusOK
405         }
406
407         resp.WriteHeader(st)
408
409         if st == http.StatusOK {
410                 if body, err := json.Marshal(result); err == nil {
411                         resp.Write(body)
412                 } else {
413                         log.Printf("json.Marshal: %s (result = %v)\n", err, result)
414                         http.Error(resp, err.Error(), 500)
415                 }
416         }
417 }
418
419 /* PullHandler processes "PUT /pull" requests for the data manager.
420    The request body is a JSON message containing a list of pull
421    requests in the following format:
422
423    [
424       {
425          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
426          "servers":[
427                         "keep0.qr1hi.arvadosapi.com:25107",
428                         "keep1.qr1hi.arvadosapi.com:25108"
429                  ]
430           },
431           {
432                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
433                  "servers":[
434                         "10.0.1.5:25107",
435                         "10.0.1.6:25107",
436                         "10.0.1.7:25108"
437                  ]
438           },
439           ...
440    ]
441
442    Each pull request in the list consists of a block locator string
443    and an ordered list of servers.  Keepstore should try to fetch the
444    block from each server in turn.
445
446    If the request has not been sent by the Data Manager, return 401
447    Unauthorized.
448
449    If the JSON unmarshalling fails, return 400 Bad Request.
450 */
451
452 type PullRequest struct {
453         Locator string   `json:"locator"`
454         Servers []string `json:"servers"`
455 }
456
457 func PullHandler(resp http.ResponseWriter, req *http.Request) {
458         // Reject unauthorized requests.
459         if !IsDataManagerToken(GetApiToken(req)) {
460                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
461                 log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
462                 return
463         }
464
465         // Parse the request body.
466         var pr []PullRequest
467         r := json.NewDecoder(req.Body)
468         if err := r.Decode(&pr); err != nil {
469                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
470                 log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
471                 return
472         }
473
474         // We have a properly formatted pull list sent from the data
475         // manager.  Report success and send the list to the pull list
476         // manager for further handling.
477         log.Printf("%s %s: received %v\n", req.Method, req.URL, pr)
478         resp.WriteHeader(http.StatusOK)
479         resp.Write([]byte(
480                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
481
482         plist := list.New()
483         for _, p := range pr {
484                 plist.PushBack(p)
485         }
486
487         if pullq == nil {
488                 pullq = NewWorkQueue()
489         }
490         pullq.ReplaceQueue(plist)
491 }
492
493 type TrashRequest struct {
494         Locator    string `json:"locator"`
495         BlockMtime int64  `json:"block_mtime"`
496 }
497
498 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
499         // Reject unauthorized requests.
500         if !IsDataManagerToken(GetApiToken(req)) {
501                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
502                 log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
503                 return
504         }
505
506         // Parse the request body.
507         var trash []TrashRequest
508         r := json.NewDecoder(req.Body)
509         if err := r.Decode(&trash); err != nil {
510                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
511                 log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
512                 return
513         }
514
515         // We have a properly formatted trash list sent from the data
516         // manager.  Report success and send the list to the trash work
517         // queue for further handling.
518         log.Printf("%s %s: received %v\n", req.Method, req.URL, trash)
519         resp.WriteHeader(http.StatusOK)
520         resp.Write([]byte(
521                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
522
523         tlist := list.New()
524         for _, t := range trash {
525                 tlist.PushBack(t)
526         }
527
528         if trashq == nil {
529                 trashq = NewWorkQueue()
530         }
531         trashq.ReplaceQueue(tlist)
532 }
533
534 // ==============================
535 // GetBlock and PutBlock implement lower-level code for handling
536 // blocks by rooting through volumes connected to the local machine.
537 // Once the handler has determined that system policy permits the
538 // request, it calls these methods to perform the actual operation.
539 //
540 // TODO(twp): this code would probably be better located in the
541 // VolumeManager interface. As an abstraction, the VolumeManager
542 // should be the only part of the code that cares about which volume a
543 // block is stored on, so it should be responsible for figuring out
544 // which volume to check for fetching blocks, storing blocks, etc.
545
546 // ==============================
547 // GetBlock fetches and returns the block identified by "hash".  If
548 // the update_timestamp argument is true, GetBlock also updates the
549 // block's file modification time (for the sake of PutBlock, which
550 // must update the file's timestamp when the block already exists).
551 //
552 // On success, GetBlock returns a byte slice with the block data, and
553 // a nil error.
554 //
555 // If the block cannot be found on any volume, returns NotFoundError.
556 //
557 // If the block found does not have the correct MD5 hash, returns
558 // DiskHashError.
559 //
560
561 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
562         // Attempt to read the requested hash from a keep volume.
563         error_to_caller := NotFoundError
564
565         for _, vol := range KeepVM.Volumes() {
566                 if buf, err := vol.Get(hash); err != nil {
567                         // IsNotExist is an expected error and may be ignored.
568                         // (If all volumes report IsNotExist, we return a NotFoundError)
569                         // All other errors should be logged but we continue trying to
570                         // read.
571                         switch {
572                         case os.IsNotExist(err):
573                                 continue
574                         default:
575                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
576                         }
577                 } else {
578                         // Double check the file checksum.
579                         //
580                         filehash := fmt.Sprintf("%x", md5.Sum(buf))
581                         if filehash != hash {
582                                 // TODO(twp): this condition probably represents a bad disk and
583                                 // should raise major alarm bells for an administrator: e.g.
584                                 // they should be sent directly to an event manager at high
585                                 // priority or logged as urgent problems.
586                                 //
587                                 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
588                                         vol, hash, filehash)
589                                 error_to_caller = DiskHashError
590                         } else {
591                                 // Success!
592                                 if error_to_caller != NotFoundError {
593                                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
594                                                 vol, hash)
595                                 }
596                                 // Update the timestamp if the caller requested.
597                                 // If we could not update the timestamp, continue looking on
598                                 // other volumes.
599                                 if update_timestamp {
600                                         if vol.Touch(hash) != nil {
601                                                 continue
602                                         }
603                                 }
604                                 return buf, nil
605                         }
606                 }
607         }
608
609         if error_to_caller != NotFoundError {
610                 log.Printf("%s: checksum mismatch, no good copy found\n", hash)
611         }
612         return nil, error_to_caller
613 }
614
615 /* PutBlock(block, hash)
616    Stores the BLOCK (identified by the content id HASH) in Keep.
617
618    The MD5 checksum of the block must be identical to the content id HASH.
619    If not, an error is returned.
620
621    PutBlock stores the BLOCK on the first Keep volume with free space.
622    A failure code is returned to the user only if all volumes fail.
623
624    On success, PutBlock returns nil.
625    On failure, it returns a KeepError with one of the following codes:
626
627    500 Collision
628           A different block with the same hash already exists on this
629           Keep server.
630    422 MD5Fail
631           The MD5 hash of the BLOCK does not match the argument HASH.
632    503 Full
633           There was not enough space left in any Keep volume to store
634           the object.
635    500 Fail
636           The object could not be stored for some other reason (e.g.
637           all writes failed). The text of the error message should
638           provide as much detail as possible.
639 */
640
641 func PutBlock(block []byte, hash string) error {
642         // Check that BLOCK's checksum matches HASH.
643         blockhash := fmt.Sprintf("%x", md5.Sum(block))
644         if blockhash != hash {
645                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
646                 return RequestHashError
647         }
648
649         // If we already have a block on disk under this identifier, return
650         // success (but check for MD5 collisions).  While fetching the block,
651         // update its timestamp.
652         // The only errors that GetBlock can return are DiskHashError and NotFoundError.
653         // In either case, we want to write our new (good) block to disk,
654         // so there is nothing special to do if err != nil.
655         //
656         if oldblock, err := GetBlock(hash, true); err == nil {
657                 if bytes.Compare(block, oldblock) == 0 {
658                         // The block already exists; return success.
659                         return nil
660                 } else {
661                         return CollisionError
662                 }
663         }
664
665         // Choose a Keep volume to write to.
666         // If this volume fails, try all of the volumes in order.
667         vol := KeepVM.Choose()
668         if err := vol.Put(hash, block); err == nil {
669                 return nil // success!
670         } else {
671                 allFull := true
672                 for _, vol := range KeepVM.Volumes() {
673                         err := vol.Put(hash, block)
674                         if err == nil {
675                                 return nil // success!
676                         }
677                         if err != FullError {
678                                 // The volume is not full but the write did not succeed.
679                                 // Report the error and continue trying.
680                                 allFull = false
681                                 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
682                         }
683                 }
684
685                 if allFull {
686                         log.Printf("all Keep volumes full")
687                         return FullError
688                 } else {
689                         log.Printf("all Keep volumes failed")
690                         return GenericError
691                 }
692         }
693 }
694
695 // IsValidLocator
696 //     Return true if the specified string is a valid Keep locator.
697 //     When Keep is extended to support hash types other than MD5,
698 //     this should be updated to cover those as well.
699 //
700 func IsValidLocator(loc string) bool {
701         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
702         if err == nil {
703                 return match
704         }
705         log.Printf("IsValidLocator: %s\n", err)
706         return false
707 }
708
709 // GetApiToken returns the OAuth2 token from the Authorization
710 // header of a HTTP request, or an empty string if no matching
711 // token is found.
712 func GetApiToken(req *http.Request) string {
713         if auth, ok := req.Header["Authorization"]; ok {
714                 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
715                         log.Println(err)
716                 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
717                         return match[1]
718                 }
719         }
720         return ""
721 }
722
723 // IsExpired returns true if the given Unix timestamp (expressed as a
724 // hexadecimal string) is in the past, or if timestamp_hex cannot be
725 // parsed as a hexadecimal string.
726 func IsExpired(timestamp_hex string) bool {
727         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
728         if err != nil {
729                 log.Printf("IsExpired: %s\n", err)
730                 return true
731         }
732         return time.Unix(ts, 0).Before(time.Now())
733 }
734
735 // CanDelete returns true if the user identified by api_token is
736 // allowed to delete blocks.
737 func CanDelete(api_token string) bool {
738         if api_token == "" {
739                 return false
740         }
741         // Blocks may be deleted only when Keep has been configured with a
742         // data manager.
743         if IsDataManagerToken(api_token) {
744                 return true
745         }
746         // TODO(twp): look up api_token with the API server
747         // return true if is_admin is true and if the token
748         // has unlimited scope
749         return false
750 }
751
752 // IsDataManagerToken returns true if api_token represents the data
753 // manager's token.
754 func IsDataManagerToken(api_token string) bool {
755         return data_manager_token != "" && api_token == data_manager_token
756 }