Merge branch 'master' into 3889-functional-testing
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "bufio"
12         "bytes"
13         "container/list"
14         "crypto/md5"
15         "encoding/json"
16         "fmt"
17         "github.com/gorilla/mux"
18         "io"
19         "log"
20         "net/http"
21         "os"
22         "regexp"
23         "runtime"
24         "strconv"
25         "strings"
26         "syscall"
27         "time"
28 )
29
30 // MakeRESTRouter returns a new mux.Router that forwards all Keep
31 // requests to the appropriate handlers.
32 //
33 func MakeRESTRouter() *mux.Router {
34         rest := mux.NewRouter()
35
36         rest.HandleFunc(
37                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
38         rest.HandleFunc(
39                 `/{hash:[0-9a-f]{32}}+{hints}`,
40                 GetBlockHandler).Methods("GET", "HEAD")
41
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
43         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
44
45         // For IndexHandler we support:
46         //   /index           - returns all locators
47         //   /index/{prefix}  - returns all locators that begin with {prefix}
48         //      {prefix} is a string of hexadecimal digits between 0 and 32 digits.
49         //      If {prefix} is the empty string, return an index of all locators
50         //      (so /index and /index/ behave identically)
51         //      A client may supply a full 32-digit locator string, in which
52         //      case the server will return an index with either zero or one
53         //      entries. This usage allows a client to check whether a block is
54         //      present, and its size and upload time, without retrieving the
55         //      entire block.
56         //
57         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
58         rest.HandleFunc(
59                 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
60         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
61
62         // The PullHandler and TrashHandler process "PUT /pull" and "PUT
63         // /trash" requests from Data Manager.  These requests instruct
64         // Keep to replicate or delete blocks; see
65         // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
66         // for more details.
67         //
68         // Each handler parses the JSON list of block management requests
69         // in the message body, and replaces any existing pull queue or
70         // trash queue with their contentes.
71         //
72         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
73         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
74
75         // Any request which does not match any of these routes gets
76         // 400 Bad Request.
77         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
78
79         return rest
80 }
81
82 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
83         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
84 }
85
86 // FindKeepVolumes scans all mounted volumes on the system for Keep
87 // volumes, and returns a list of matching paths.
88 //
89 // A device is assumed to be a Keep volume if it is a normal or tmpfs
90 // volume and has a "/keep" directory directly underneath the mount
91 // point.
92 //
93 func FindKeepVolumes() []string {
94         vols := make([]string, 0)
95
96         if f, err := os.Open(PROC_MOUNTS); err != nil {
97                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
98         } else {
99                 scanner := bufio.NewScanner(f)
100                 for scanner.Scan() {
101                         args := strings.Fields(scanner.Text())
102                         dev, mount := args[0], args[1]
103                         if mount != "/" &&
104                                 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
105                                 keep := mount + "/keep"
106                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
107                                         vols = append(vols, keep)
108                                 }
109                         }
110                 }
111                 if err := scanner.Err(); err != nil {
112                         log.Fatal(err)
113                 }
114         }
115         return vols
116 }
117
118 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
119         hash := mux.Vars(req)["hash"]
120
121         hints := mux.Vars(req)["hints"]
122
123         // Parse the locator string and hints from the request.
124         // TODO(twp): implement a Locator type.
125         var signature, timestamp string
126         if hints != "" {
127                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
128                 for _, hint := range strings.Split(hints, "+") {
129                         if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
130                                 // Server ignores size hints
131                         } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
132                                 signature = m[1]
133                                 timestamp = m[2]
134                         } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
135                                 // Any unknown hint that starts with an uppercase letter is
136                                 // presumed to be valid and ignored, to permit forward compatibility.
137                         } else {
138                                 // Unknown format; not a valid locator.
139                                 log.Printf("%s %s %d %s", req.Method, hash, BadRequestError.HTTPCode, "-")
140                                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
141                                 return
142                         }
143                 }
144         }
145
146         // If permission checking is in effect, verify this
147         // request's permission signature.
148         if enforce_permissions {
149                 if signature == "" || timestamp == "" {
150                         log.Printf("%s %s %d %s", req.Method, hash, PermissionError.HTTPCode, "-")
151                         http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
152                         return
153                 } else if IsExpired(timestamp) {
154                         log.Printf("%s %s %d %s", req.Method, hash, ExpiredError.HTTPCode, "-")
155                         http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
156                         return
157                 } else {
158                         req_locator := req.URL.Path[1:] // strip leading slash
159                         if !VerifySignature(req_locator, GetApiToken(req)) {
160                                 log.Printf("%s %s %d %s", req.Method, hash, PermissionError.HTTPCode, "-")
161                                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
162                                 return
163                         }
164                 }
165         }
166
167         block, err := GetBlock(hash, false)
168
169         // Garbage collect after each GET. Fixes #2865.
170         // TODO(twp): review Keep memory usage and see if there's
171         // a better way to do this than blindly garbage collecting
172         // after every block.
173         defer runtime.GC()
174
175         if err != nil {
176                 // This type assertion is safe because the only errors
177                 // GetBlock can return are DiskHashError or NotFoundError.
178                 if err == NotFoundError {
179                         log.Printf("%s: not found, giving up\n", hash)
180                 }
181                 log.Printf("%s %s %d %s", req.Method, hash, err.(*KeepError).HTTPCode, "-")
182                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
183                 return
184         }
185
186         resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
187
188         _, err = resp.Write(block)
189         if err != nil {
190                 log.Printf("%s %s %d %s", req.Method, hash, err.(*KeepError).HTTPCode, len(block), "-")
191         } else {
192                 log.Printf("%s %s %d %d", req.Method, hash, 200, len(block))
193         }
194
195         return
196 }
197
198 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
199         // Garbage collect after each PUT. Fixes #2865.
200         // See also GetBlockHandler.
201         defer runtime.GC()
202
203         hash := mux.Vars(req)["hash"]
204
205         // Read the block data to be stored.
206         // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
207         //
208         if req.ContentLength > BLOCKSIZE {
209                 log.Printf("%s %s %d %d", req.Method, hash, TooLongError.HTTPCode, req.ContentLength)
210                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
211                 return
212         }
213
214         buf := make([]byte, req.ContentLength)
215         nread, err := io.ReadFull(req.Body, buf)
216         if err != nil {
217                 log.Printf("%s %s %d %d", req.Method, hash, 500, req.ContentLength)
218                 http.Error(resp, err.Error(), 500)
219         } else if int64(nread) < req.ContentLength {
220                 log.Printf("%s %s %d %d", req.Method, hash, 500, req.ContentLength)
221                 http.Error(resp, "request truncated", 500)
222         } else {
223                 if err := PutBlock(buf, hash); err == nil {
224                         // Success; add a size hint, sign the locator if
225                         // possible, and return it to the client.
226                         return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
227                         api_token := GetApiToken(req)
228                         if PermissionSecret != nil && api_token != "" {
229                                 expiry := time.Now().Add(permission_ttl)
230                                 return_hash = SignLocator(return_hash, api_token, expiry)
231                         }
232                         log.Printf("%s %s %d %d", req.Method, hash, 200, req.ContentLength)
233                         resp.Write([]byte(return_hash + "\n"))
234                 } else {
235                         ke := err.(*KeepError)
236                         log.Printf("%s %s %d %d", req.Method, hash, ke.HTTPCode, req.ContentLength)
237                         http.Error(resp, ke.Error(), ke.HTTPCode)
238                 }
239         }
240         return
241 }
242
243 // IndexHandler
244 //     A HandleFunc to address /index and /index/{prefix} requests.
245 //
246 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
247         prefix := mux.Vars(req)["prefix"]
248
249         // Only the data manager may issue /index requests,
250         // and only if enforce_permissions is enabled.
251         // All other requests return 403 Forbidden.
252         api_token := GetApiToken(req)
253         if !enforce_permissions ||
254                 api_token == "" ||
255                 data_manager_token != api_token {
256                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
257                 return
258         }
259         var index string
260         for _, vol := range KeepVM.Volumes() {
261                 index = index + vol.Index(prefix)
262         }
263         resp.Write([]byte(index))
264 }
265
266 // StatusHandler
267 //     Responds to /status.json requests with the current node status,
268 //     described in a JSON structure.
269 //
270 //     The data given in a status.json response includes:
271 //        volumes - a list of Keep volumes currently in use by this server
272 //          each volume is an object with the following fields:
273 //            * mount_point
274 //            * device_num (an integer identifying the underlying filesystem)
275 //            * bytes_free
276 //            * bytes_used
277 //
278 type VolumeStatus struct {
279         MountPoint string `json:"mount_point"`
280         DeviceNum  uint64 `json:"device_num"`
281         BytesFree  uint64 `json:"bytes_free"`
282         BytesUsed  uint64 `json:"bytes_used"`
283 }
284
285 type NodeStatus struct {
286         Volumes []*VolumeStatus `json:"volumes"`
287 }
288
289 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
290         st := GetNodeStatus()
291         if jstat, err := json.Marshal(st); err == nil {
292                 resp.Write(jstat)
293         } else {
294                 log.Printf("json.Marshal: %s\n", err)
295                 log.Printf("NodeStatus = %v\n", st)
296                 http.Error(resp, err.Error(), 500)
297         }
298 }
299
300 // GetNodeStatus
301 //     Returns a NodeStatus struct describing this Keep
302 //     node's current status.
303 //
304 func GetNodeStatus() *NodeStatus {
305         st := new(NodeStatus)
306
307         st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
308         for i, vol := range KeepVM.Volumes() {
309                 st.Volumes[i] = vol.Status()
310         }
311         return st
312 }
313
314 // GetVolumeStatus
315 //     Returns a VolumeStatus describing the requested volume.
316 //
317 func GetVolumeStatus(volume string) *VolumeStatus {
318         var fs syscall.Statfs_t
319         var devnum uint64
320
321         if fi, err := os.Stat(volume); err == nil {
322                 devnum = fi.Sys().(*syscall.Stat_t).Dev
323         } else {
324                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
325                 return nil
326         }
327
328         err := syscall.Statfs(volume, &fs)
329         if err != nil {
330                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
331                 return nil
332         }
333         // These calculations match the way df calculates disk usage:
334         // "free" space is measured by fs.Bavail, but "used" space
335         // uses fs.Blocks - fs.Bfree.
336         free := fs.Bavail * uint64(fs.Bsize)
337         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
338         return &VolumeStatus{volume, devnum, free, used}
339 }
340
341 // DeleteHandler processes DELETE requests.
342 //
343 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
344 // from all connected volumes.
345 //
346 // Only the Data Manager, or an Arvados admin with scope "all", are
347 // allowed to issue DELETE requests.  If a DELETE request is not
348 // authenticated or is issued by a non-admin user, the server returns
349 // a PermissionError.
350 //
351 // Upon receiving a valid request from an authorized user,
352 // DeleteHandler deletes all copies of the specified block on local
353 // writable volumes.
354 //
355 // Response format:
356 //
357 // If the requested blocks was not found on any volume, the response
358 // code is HTTP 404 Not Found.
359 //
360 // Otherwise, the response code is 200 OK, with a response body
361 // consisting of the JSON message
362 //
363 //    {"copies_deleted":d,"copies_failed":f}
364 //
365 // where d and f are integers representing the number of blocks that
366 // were successfully and unsuccessfully deleted.
367 //
368 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
369         hash := mux.Vars(req)["hash"]
370         log.Printf("%s %s", req.Method, hash)
371
372         // Confirm that this user is an admin and has a token with unlimited scope.
373         var tok = GetApiToken(req)
374         if tok == "" || !CanDelete(tok) {
375                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
376                 return
377         }
378
379         if never_delete {
380                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
381                 return
382         }
383
384         // Delete copies of this block from all available volumes.  Report
385         // how many blocks were successfully and unsuccessfully
386         // deleted.
387         var result struct {
388                 Deleted int `json:"copies_deleted"`
389                 Failed  int `json:"copies_failed"`
390         }
391         for _, vol := range KeepVM.Volumes() {
392                 if err := vol.Delete(hash); err == nil {
393                         result.Deleted++
394                 } else if os.IsNotExist(err) {
395                         continue
396                 } else {
397                         result.Failed++
398                         log.Println("DeleteHandler:", err)
399                 }
400         }
401
402         var st int
403
404         if result.Deleted == 0 && result.Failed == 0 {
405                 st = http.StatusNotFound
406         } else {
407                 st = http.StatusOK
408         }
409
410         resp.WriteHeader(st)
411
412         if st == http.StatusOK {
413                 if body, err := json.Marshal(result); err == nil {
414                         resp.Write(body)
415                 } else {
416                         log.Printf("json.Marshal: %s (result = %v)\n", err, result)
417                         http.Error(resp, err.Error(), 500)
418                 }
419         }
420 }
421
422 /* PullHandler processes "PUT /pull" requests for the data manager.
423    The request body is a JSON message containing a list of pull
424    requests in the following format:
425
426    [
427       {
428          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
429          "servers":[
430                         "keep0.qr1hi.arvadosapi.com:25107",
431                         "keep1.qr1hi.arvadosapi.com:25108"
432                  ]
433           },
434           {
435                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
436                  "servers":[
437                         "10.0.1.5:25107",
438                         "10.0.1.6:25107",
439                         "10.0.1.7:25108"
440                  ]
441           },
442           ...
443    ]
444
445    Each pull request in the list consists of a block locator string
446    and an ordered list of servers.  Keepstore should try to fetch the
447    block from each server in turn.
448
449    If the request has not been sent by the Data Manager, return 401
450    Unauthorized.
451
452    If the JSON unmarshalling fails, return 400 Bad Request.
453 */
454
455 type PullRequest struct {
456         Locator string   `json:"locator"`
457         Servers []string `json:"servers"`
458 }
459
460 func PullHandler(resp http.ResponseWriter, req *http.Request) {
461         // Reject unauthorized requests.
462         if !IsDataManagerToken(GetApiToken(req)) {
463                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
464                 log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
465                 return
466         }
467
468         // Parse the request body.
469         var pr []PullRequest
470         r := json.NewDecoder(req.Body)
471         if err := r.Decode(&pr); err != nil {
472                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
473                 log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
474                 return
475         }
476
477         // We have a properly formatted pull list sent from the data
478         // manager.  Report success and send the list to the pull list
479         // manager for further handling.
480         log.Printf("%s %s: received %v\n", req.Method, req.URL, pr)
481         resp.WriteHeader(http.StatusOK)
482         resp.Write([]byte(
483                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
484
485         plist := list.New()
486         for _, p := range pr {
487                 plist.PushBack(p)
488         }
489
490         if pullq == nil {
491                 pullq = NewWorkQueue()
492         }
493         pullq.ReplaceQueue(plist)
494 }
495
496 type TrashRequest struct {
497         Locator    string `json:"locator"`
498         BlockMtime int64  `json:"block_mtime"`
499 }
500
501 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
502         // Reject unauthorized requests.
503         if !IsDataManagerToken(GetApiToken(req)) {
504                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
505                 log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
506                 return
507         }
508
509         // Parse the request body.
510         var trash []TrashRequest
511         r := json.NewDecoder(req.Body)
512         if err := r.Decode(&trash); err != nil {
513                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
514                 log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
515                 return
516         }
517
518         // We have a properly formatted trash list sent from the data
519         // manager.  Report success and send the list to the trash work
520         // queue for further handling.
521         log.Printf("%s %s: received %v\n", req.Method, req.URL, trash)
522         resp.WriteHeader(http.StatusOK)
523         resp.Write([]byte(
524                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
525
526         tlist := list.New()
527         for _, t := range trash {
528                 tlist.PushBack(t)
529         }
530
531         if trashq == nil {
532                 trashq = NewWorkQueue()
533         }
534         trashq.ReplaceQueue(tlist)
535 }
536
537 // ==============================
538 // GetBlock and PutBlock implement lower-level code for handling
539 // blocks by rooting through volumes connected to the local machine.
540 // Once the handler has determined that system policy permits the
541 // request, it calls these methods to perform the actual operation.
542 //
543 // TODO(twp): this code would probably be better located in the
544 // VolumeManager interface. As an abstraction, the VolumeManager
545 // should be the only part of the code that cares about which volume a
546 // block is stored on, so it should be responsible for figuring out
547 // which volume to check for fetching blocks, storing blocks, etc.
548
549 // ==============================
550 // GetBlock fetches and returns the block identified by "hash".  If
551 // the update_timestamp argument is true, GetBlock also updates the
552 // block's file modification time (for the sake of PutBlock, which
553 // must update the file's timestamp when the block already exists).
554 //
555 // On success, GetBlock returns a byte slice with the block data, and
556 // a nil error.
557 //
558 // If the block cannot be found on any volume, returns NotFoundError.
559 //
560 // If the block found does not have the correct MD5 hash, returns
561 // DiskHashError.
562 //
563
564 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
565         // Attempt to read the requested hash from a keep volume.
566         error_to_caller := NotFoundError
567
568         for _, vol := range KeepVM.Volumes() {
569                 if buf, err := vol.Get(hash); err != nil {
570                         // IsNotExist is an expected error and may be ignored.
571                         // (If all volumes report IsNotExist, we return a NotFoundError)
572                         // All other errors should be logged but we continue trying to
573                         // read.
574                         switch {
575                         case os.IsNotExist(err):
576                                 continue
577                         default:
578                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
579                         }
580                 } else {
581                         // Double check the file checksum.
582                         //
583                         filehash := fmt.Sprintf("%x", md5.Sum(buf))
584                         if filehash != hash {
585                                 // TODO(twp): this condition probably represents a bad disk and
586                                 // should raise major alarm bells for an administrator: e.g.
587                                 // they should be sent directly to an event manager at high
588                                 // priority or logged as urgent problems.
589                                 //
590                                 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
591                                         vol, hash, filehash)
592                                 error_to_caller = DiskHashError
593                         } else {
594                                 // Success!
595                                 if error_to_caller != NotFoundError {
596                                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
597                                                 vol, hash)
598                                 }
599                                 // Update the timestamp if the caller requested.
600                                 // If we could not update the timestamp, continue looking on
601                                 // other volumes.
602                                 if update_timestamp {
603                                         if vol.Touch(hash) != nil {
604                                                 continue
605                                         }
606                                 }
607                                 return buf, nil
608                         }
609                 }
610         }
611
612         if error_to_caller != NotFoundError {
613                 log.Printf("%s: checksum mismatch, no good copy found\n", hash)
614         }
615         return nil, error_to_caller
616 }
617
618 /* PutBlock(block, hash)
619    Stores the BLOCK (identified by the content id HASH) in Keep.
620
621    The MD5 checksum of the block must be identical to the content id HASH.
622    If not, an error is returned.
623
624    PutBlock stores the BLOCK on the first Keep volume with free space.
625    A failure code is returned to the user only if all volumes fail.
626
627    On success, PutBlock returns nil.
628    On failure, it returns a KeepError with one of the following codes:
629
630    500 Collision
631           A different block with the same hash already exists on this
632           Keep server.
633    422 MD5Fail
634           The MD5 hash of the BLOCK does not match the argument HASH.
635    503 Full
636           There was not enough space left in any Keep volume to store
637           the object.
638    500 Fail
639           The object could not be stored for some other reason (e.g.
640           all writes failed). The text of the error message should
641           provide as much detail as possible.
642 */
643
644 func PutBlock(block []byte, hash string) error {
645         // Check that BLOCK's checksum matches HASH.
646         blockhash := fmt.Sprintf("%x", md5.Sum(block))
647         if blockhash != hash {
648                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
649                 return RequestHashError
650         }
651
652         // If we already have a block on disk under this identifier, return
653         // success (but check for MD5 collisions).  While fetching the block,
654         // update its timestamp.
655         // The only errors that GetBlock can return are DiskHashError and NotFoundError.
656         // In either case, we want to write our new (good) block to disk,
657         // so there is nothing special to do if err != nil.
658         //
659         if oldblock, err := GetBlock(hash, true); err == nil {
660                 if bytes.Compare(block, oldblock) == 0 {
661                         // The block already exists; return success.
662                         return nil
663                 } else {
664                         return CollisionError
665                 }
666         }
667
668         // Choose a Keep volume to write to.
669         // If this volume fails, try all of the volumes in order.
670         vol := KeepVM.Choose()
671         if err := vol.Put(hash, block); err == nil {
672                 return nil // success!
673         } else {
674                 allFull := true
675                 for _, vol := range KeepVM.Volumes() {
676                         err := vol.Put(hash, block)
677                         if err == nil {
678                                 return nil // success!
679                         }
680                         if err != FullError {
681                                 // The volume is not full but the write did not succeed.
682                                 // Report the error and continue trying.
683                                 allFull = false
684                                 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
685                         }
686                 }
687
688                 if allFull {
689                         log.Printf("all Keep volumes full")
690                         return FullError
691                 } else {
692                         log.Printf("all Keep volumes failed")
693                         return GenericError
694                 }
695         }
696 }
697
698 // IsValidLocator
699 //     Return true if the specified string is a valid Keep locator.
700 //     When Keep is extended to support hash types other than MD5,
701 //     this should be updated to cover those as well.
702 //
703 func IsValidLocator(loc string) bool {
704         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
705         if err == nil {
706                 return match
707         }
708         log.Printf("IsValidLocator: %s\n", err)
709         return false
710 }
711
712 // GetApiToken returns the OAuth2 token from the Authorization
713 // header of a HTTP request, or an empty string if no matching
714 // token is found.
715 func GetApiToken(req *http.Request) string {
716         if auth, ok := req.Header["Authorization"]; ok {
717                 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
718                         log.Println(err)
719                 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
720                         return match[1]
721                 }
722         }
723         return ""
724 }
725
726 // IsExpired returns true if the given Unix timestamp (expressed as a
727 // hexadecimal string) is in the past, or if timestamp_hex cannot be
728 // parsed as a hexadecimal string.
729 func IsExpired(timestamp_hex string) bool {
730         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
731         if err != nil {
732                 log.Printf("IsExpired: %s\n", err)
733                 return true
734         }
735         return time.Unix(ts, 0).Before(time.Now())
736 }
737
738 // CanDelete returns true if the user identified by api_token is
739 // allowed to delete blocks.
740 func CanDelete(api_token string) bool {
741         if api_token == "" {
742                 return false
743         }
744         // Blocks may be deleted only when Keep has been configured with a
745         // data manager.
746         if IsDataManagerToken(api_token) {
747                 return true
748         }
749         // TODO(twp): look up api_token with the API server
750         // return true if is_admin is true and if the token
751         // has unlimited scope
752         return false
753 }
754
755 // IsDataManagerToken returns true if api_token represents the data
756 // manager's token.
757 func IsDataManagerToken(api_token string) bool {
758         return data_manager_token != "" && api_token == data_manager_token
759 }