3413: update for code review
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "bufio"
12         "bytes"
13         "container/list"
14         "crypto/md5"
15         "encoding/json"
16         "fmt"
17         "github.com/gorilla/mux"
18         "io"
19         "log"
20         "net/http"
21         "os"
22         "regexp"
23         "runtime"
24         "strconv"
25         "strings"
26         "syscall"
27         "time"
28 )
29
30 // MakeRESTRouter returns a new mux.Router that forwards all Keep
31 // requests to the appropriate handlers.
32 //
33 func MakeRESTRouter() *mux.Router {
34         rest := mux.NewRouter()
35
36         rest.HandleFunc(
37                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
38         rest.HandleFunc(
39                 `/{hash:[0-9a-f]{32}}+{hints}`,
40                 GetBlockHandler).Methods("GET", "HEAD")
41
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
43         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
44
45         // For IndexHandler we support:
46         //   /index           - returns all locators
47         //   /index/{prefix}  - returns all locators that begin with {prefix}
48         //      {prefix} is a string of hexadecimal digits between 0 and 32 digits.
49         //      If {prefix} is the empty string, return an index of all locators
50         //      (so /index and /index/ behave identically)
51         //      A client may supply a full 32-digit locator string, in which
52         //      case the server will return an index with either zero or one
53         //      entries. This usage allows a client to check whether a block is
54         //      present, and its size and upload time, without retrieving the
55         //      entire block.
56         //
57         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
58         rest.HandleFunc(
59                 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
60         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
61
62         // The PullHandler and TrashHandler process "PUT /pull" and "PUT
63         // /trash" requests from Data Manager.  These requests instruct
64         // Keep to replicate or delete blocks; see
65         // https://arvados.org/projects/orvos-private/wiki/Keep_Design_Doc
66         // for more details.
67         //
68         // Each handler parses the JSON list of block management requests
69         // in the message body, and delivers them to the pull queue or
70         // trash queue, respectively.
71         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
72         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
73
74         // Any request which does not match any of these routes gets
75         // 400 Bad Request.
76         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
77
78         return rest
79 }
80
81 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
82         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
83 }
84
85 // FindKeepVolumes scans all mounted volumes on the system for Keep
86 // volumes, and returns a list of matching paths.
87 //
88 // A device is assumed to be a Keep volume if it is a normal or tmpfs
89 // volume and has a "/keep" directory directly underneath the mount
90 // point.
91 //
92 func FindKeepVolumes() []string {
93         vols := make([]string, 0)
94
95         if f, err := os.Open(PROC_MOUNTS); err != nil {
96                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
97         } else {
98                 scanner := bufio.NewScanner(f)
99                 for scanner.Scan() {
100                         args := strings.Fields(scanner.Text())
101                         dev, mount := args[0], args[1]
102                         if mount != "/" &&
103                                 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
104                                 keep := mount + "/keep"
105                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
106                                         vols = append(vols, keep)
107                                 }
108                         }
109                 }
110                 if err := scanner.Err(); err != nil {
111                         log.Fatal(err)
112                 }
113         }
114         return vols
115 }
116
117 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
118         hash := mux.Vars(req)["hash"]
119
120         log.Printf("%s %s", req.Method, hash)
121
122         hints := mux.Vars(req)["hints"]
123
124         // Parse the locator string and hints from the request.
125         // TODO(twp): implement a Locator type.
126         var signature, timestamp string
127         if hints != "" {
128                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
129                 for _, hint := range strings.Split(hints, "+") {
130                         if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
131                                 // Server ignores size hints
132                         } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
133                                 signature = m[1]
134                                 timestamp = m[2]
135                         } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
136                                 // Any unknown hint that starts with an uppercase letter is
137                                 // presumed to be valid and ignored, to permit forward compatibility.
138                         } else {
139                                 // Unknown format; not a valid locator.
140                                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
141                                 return
142                         }
143                 }
144         }
145
146         // If permission checking is in effect, verify this
147         // request's permission signature.
148         if enforce_permissions {
149                 if signature == "" || timestamp == "" {
150                         http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
151                         return
152                 } else if IsExpired(timestamp) {
153                         http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
154                         return
155                 } else {
156                         req_locator := req.URL.Path[1:] // strip leading slash
157                         if !VerifySignature(req_locator, GetApiToken(req)) {
158                                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
159                                 return
160                         }
161                 }
162         }
163
164         block, err := GetBlock(hash, false)
165
166         // Garbage collect after each GET. Fixes #2865.
167         // TODO(twp): review Keep memory usage and see if there's
168         // a better way to do this than blindly garbage collecting
169         // after every block.
170         defer runtime.GC()
171
172         if err != nil {
173                 // This type assertion is safe because the only errors
174                 // GetBlock can return are DiskHashError or NotFoundError.
175                 if err == NotFoundError {
176                         log.Printf("%s: not found, giving up\n", hash)
177                 }
178                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
179                 return
180         }
181
182         resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
183
184         _, err = resp.Write(block)
185         if err != nil {
186                 log.Printf("GetBlockHandler: writing response: %s", err)
187         }
188
189         return
190 }
191
192 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
193         // Garbage collect after each PUT. Fixes #2865.
194         // See also GetBlockHandler.
195         defer runtime.GC()
196
197         hash := mux.Vars(req)["hash"]
198
199         log.Printf("%s %s", req.Method, hash)
200
201         // Read the block data to be stored.
202         // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
203         //
204         if req.ContentLength > BLOCKSIZE {
205                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
206                 return
207         }
208
209         buf := make([]byte, req.ContentLength)
210         nread, err := io.ReadFull(req.Body, buf)
211         if err != nil {
212                 http.Error(resp, err.Error(), 500)
213         } else if int64(nread) < req.ContentLength {
214                 http.Error(resp, "request truncated", 500)
215         } else {
216                 if err := PutBlock(buf, hash); err == nil {
217                         // Success; add a size hint, sign the locator if
218                         // possible, and return it to the client.
219                         return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
220                         api_token := GetApiToken(req)
221                         if PermissionSecret != nil && api_token != "" {
222                                 expiry := time.Now().Add(permission_ttl)
223                                 return_hash = SignLocator(return_hash, api_token, expiry)
224                         }
225                         resp.Write([]byte(return_hash + "\n"))
226                 } else {
227                         ke := err.(*KeepError)
228                         http.Error(resp, ke.Error(), ke.HTTPCode)
229                 }
230         }
231         return
232 }
233
234 // IndexHandler
235 //     A HandleFunc to address /index and /index/{prefix} requests.
236 //
237 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
238         prefix := mux.Vars(req)["prefix"]
239
240         // Only the data manager may issue /index requests,
241         // and only if enforce_permissions is enabled.
242         // All other requests return 403 Forbidden.
243         api_token := GetApiToken(req)
244         if !enforce_permissions ||
245                 api_token == "" ||
246                 data_manager_token != api_token {
247                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
248                 return
249         }
250         var index string
251         for _, vol := range KeepVM.Volumes() {
252                 index = index + vol.Index(prefix)
253         }
254         resp.Write([]byte(index))
255 }
256
257 // StatusHandler
258 //     Responds to /status.json requests with the current node status,
259 //     described in a JSON structure.
260 //
261 //     The data given in a status.json response includes:
262 //        volumes - a list of Keep volumes currently in use by this server
263 //          each volume is an object with the following fields:
264 //            * mount_point
265 //            * device_num (an integer identifying the underlying filesystem)
266 //            * bytes_free
267 //            * bytes_used
268 //
269 type VolumeStatus struct {
270         MountPoint string `json:"mount_point"`
271         DeviceNum  uint64 `json:"device_num"`
272         BytesFree  uint64 `json:"bytes_free"`
273         BytesUsed  uint64 `json:"bytes_used"`
274 }
275
276 type NodeStatus struct {
277         Volumes []*VolumeStatus `json:"volumes"`
278 }
279
280 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
281         st := GetNodeStatus()
282         if jstat, err := json.Marshal(st); err == nil {
283                 resp.Write(jstat)
284         } else {
285                 log.Printf("json.Marshal: %s\n", err)
286                 log.Printf("NodeStatus = %v\n", st)
287                 http.Error(resp, err.Error(), 500)
288         }
289 }
290
291 // GetNodeStatus
292 //     Returns a NodeStatus struct describing this Keep
293 //     node's current status.
294 //
295 func GetNodeStatus() *NodeStatus {
296         st := new(NodeStatus)
297
298         st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
299         for i, vol := range KeepVM.Volumes() {
300                 st.Volumes[i] = vol.Status()
301         }
302         return st
303 }
304
305 // GetVolumeStatus
306 //     Returns a VolumeStatus describing the requested volume.
307 //
308 func GetVolumeStatus(volume string) *VolumeStatus {
309         var fs syscall.Statfs_t
310         var devnum uint64
311
312         if fi, err := os.Stat(volume); err == nil {
313                 devnum = fi.Sys().(*syscall.Stat_t).Dev
314         } else {
315                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
316                 return nil
317         }
318
319         err := syscall.Statfs(volume, &fs)
320         if err != nil {
321                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
322                 return nil
323         }
324         // These calculations match the way df calculates disk usage:
325         // "free" space is measured by fs.Bavail, but "used" space
326         // uses fs.Blocks - fs.Bfree.
327         free := fs.Bavail * uint64(fs.Bsize)
328         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
329         return &VolumeStatus{volume, devnum, free, used}
330 }
331
332 // DeleteHandler processes DELETE requests.
333 //
334 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
335 // from all connected volumes.
336 //
337 // Only the Data Manager, or an Arvados admin with scope "all", are
338 // allowed to issue DELETE requests.  If a DELETE request is not
339 // authenticated or is issued by a non-admin user, the server returns
340 // a PermissionError.
341 //
342 // Upon receiving a valid request from an authorized user,
343 // DeleteHandler deletes all copies of the specified block on local
344 // writable volumes.
345 //
346 // Response format:
347 //
348 // If the requested blocks was not found on any volume, the response
349 // code is HTTP 404 Not Found.
350 //
351 // Otherwise, the response code is 200 OK, with a response body
352 // consisting of the JSON message
353 //
354 //    {"copies_deleted":d,"copies_failed":f}
355 //
356 // where d and f are integers representing the number of blocks that
357 // were successfully and unsuccessfully deleted.
358 //
359 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
360         hash := mux.Vars(req)["hash"]
361         log.Printf("%s %s", req.Method, hash)
362
363         // Confirm that this user is an admin and has a token with unlimited scope.
364         var tok = GetApiToken(req)
365         if tok == "" || !CanDelete(tok) {
366                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
367                 return
368         }
369
370         if never_delete {
371                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
372                 return
373         }
374
375         // Delete copies of this block from all available volumes.  Report
376         // how many blocks were successfully and unsuccessfully
377         // deleted.
378         var result struct {
379                 Deleted int `json:"copies_deleted"`
380                 Failed  int `json:"copies_failed"`
381         }
382         for _, vol := range KeepVM.Volumes() {
383                 if err := vol.Delete(hash); err == nil {
384                         result.Deleted++
385                 } else if os.IsNotExist(err) {
386                         continue
387                 } else {
388                         result.Failed++
389                         log.Println("DeleteHandler:", err)
390                 }
391         }
392
393         var st int
394
395         if result.Deleted == 0 && result.Failed == 0 {
396                 st = http.StatusNotFound
397         } else {
398                 st = http.StatusOK
399         }
400
401         resp.WriteHeader(st)
402
403         if st == http.StatusOK {
404                 if body, err := json.Marshal(result); err == nil {
405                         resp.Write(body)
406                 } else {
407                         log.Printf("json.Marshal: %s (result = %v)\n", err, result)
408                         http.Error(resp, err.Error(), 500)
409                 }
410         }
411 }
412
413 /* PullHandler processes "PUT /pull" requests for the data manager.
414    The request body is a JSON message containing a list of pull
415    requests in the following format:
416
417    [
418       {
419          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
420          "servers":[
421                         "keep0.qr1hi.arvadosapi.com:25107",
422                         "keep1.qr1hi.arvadosapi.com:25108"
423                  ]
424           },
425           {
426                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
427                  "servers":[
428                         "10.0.1.5:25107",
429                         "10.0.1.6:25107",
430                         "10.0.1.7:25108"
431                  ]
432           },
433           ...
434    ]
435
436    Each pull request in the list consists of a block locator string
437    and an ordered list of servers.  Keepstore should try to fetch the
438    block from each server in turn.
439
440    If the request has not been sent by the Data Manager, return 401
441    Unauthorized.
442
443    If the JSON unmarshalling fails, return 400 Bad Request.
444 */
445
446 type PullRequest struct {
447         Locator string   `json:"locator"`
448         Servers []string `json:"servers"`
449 }
450
451 func PullHandler(resp http.ResponseWriter, req *http.Request) {
452         // Reject unauthorized requests.
453         api_token := GetApiToken(req)
454         if !IsDataManagerToken(api_token) {
455                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
456                 log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
457                 return
458         }
459
460         // Parse the request body.
461         var pr []PullRequest
462         r := json.NewDecoder(req.Body)
463         if err := r.Decode(&pr); err != nil {
464                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
465                 log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
466                 return
467         }
468
469         // We have a properly formatted pull list sent from the data
470         // manager.  Report success and send the list to the pull list
471         // manager for further handling.
472         log.Printf("%s %s: received %v\n", req.Method, req.URL, pr)
473         resp.WriteHeader(http.StatusOK)
474         resp.Write([]byte(
475                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
476
477         plist := list.New()
478         for _, p := range pr {
479                 plist.PushBack(p)
480         }
481
482         if pullq == nil {
483                 pullq = NewWorkQueue()
484         }
485         pullq.ReplaceQueue(plist)
486 }
487
488 type TrashRequest struct {
489         Locator    string `json:"locator"`
490         BlockMtime int64  `json:"block_mtime"`
491 }
492
493 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
494         // Reject unauthorized requests.
495         api_token := GetApiToken(req)
496         if !IsDataManagerToken(api_token) {
497                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
498                 log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
499                 return
500         }
501
502         // Parse the request body.
503         var trash []TrashRequest
504         r := json.NewDecoder(req.Body)
505         if err := r.Decode(&trash); err != nil {
506                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
507                 log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
508                 return
509         }
510
511         // We have a properly formatted trash list sent from the data
512         // manager.  Report success and send the list to the trash work
513         // queue for further handling.
514         log.Printf("%s %s: received %v\n", req.Method, req.URL, trash)
515         resp.WriteHeader(http.StatusOK)
516         resp.Write([]byte(
517                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
518
519         tlist := list.New()
520         for _, t := range trash {
521                 tlist.PushBack(t)
522         }
523
524         if trashq == nil {
525                 trashq = NewWorkQueue()
526         }
527         trashq.ReplaceQueue(tlist)
528 }
529
530 // ==============================
531 // GetBlock and PutBlock implement lower-level code for handling
532 // blocks by rooting through volumes connected to the local machine.
533 // Once the handler has determined that system policy permits the
534 // request, it calls these methods to perform the actual operation.
535 //
536 // TODO(twp): this code would probably be better located in the
537 // VolumeManager interface. As an abstraction, the VolumeManager
538 // should be the only part of the code that cares about which volume a
539 // block is stored on, so it should be responsible for figuring out
540 // which volume to check for fetching blocks, storing blocks, etc.
541
542 // ==============================
543 // GetBlock fetches and returns the block identified by "hash".  If
544 // the update_timestamp argument is true, GetBlock also updates the
545 // block's file modification time (for the sake of PutBlock, which
546 // must update the file's timestamp when the block already exists).
547 //
548 // On success, GetBlock returns a byte slice with the block data, and
549 // a nil error.
550 //
551 // If the block cannot be found on any volume, returns NotFoundError.
552 //
553 // If the block found does not have the correct MD5 hash, returns
554 // DiskHashError.
555 //
556
557 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
558         // Attempt to read the requested hash from a keep volume.
559         error_to_caller := NotFoundError
560
561         for _, vol := range KeepVM.Volumes() {
562                 if buf, err := vol.Get(hash); err != nil {
563                         // IsNotExist is an expected error and may be ignored.
564                         // (If all volumes report IsNotExist, we return a NotFoundError)
565                         // All other errors should be logged but we continue trying to
566                         // read.
567                         switch {
568                         case os.IsNotExist(err):
569                                 continue
570                         default:
571                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
572                         }
573                 } else {
574                         // Double check the file checksum.
575                         //
576                         filehash := fmt.Sprintf("%x", md5.Sum(buf))
577                         if filehash != hash {
578                                 // TODO(twp): this condition probably represents a bad disk and
579                                 // should raise major alarm bells for an administrator: e.g.
580                                 // they should be sent directly to an event manager at high
581                                 // priority or logged as urgent problems.
582                                 //
583                                 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
584                                         vol, hash, filehash)
585                                 error_to_caller = DiskHashError
586                         } else {
587                                 // Success!
588                                 if error_to_caller != NotFoundError {
589                                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
590                                                 vol, hash)
591                                 }
592                                 // Update the timestamp if the caller requested.
593                                 // If we could not update the timestamp, continue looking on
594                                 // other volumes.
595                                 if update_timestamp {
596                                         if vol.Touch(hash) != nil {
597                                                 continue
598                                         }
599                                 }
600                                 return buf, nil
601                         }
602                 }
603         }
604
605         if error_to_caller != NotFoundError {
606                 log.Printf("%s: checksum mismatch, no good copy found\n", hash)
607         }
608         return nil, error_to_caller
609 }
610
611 /* PutBlock(block, hash)
612    Stores the BLOCK (identified by the content id HASH) in Keep.
613
614    The MD5 checksum of the block must be identical to the content id HASH.
615    If not, an error is returned.
616
617    PutBlock stores the BLOCK on the first Keep volume with free space.
618    A failure code is returned to the user only if all volumes fail.
619
620    On success, PutBlock returns nil.
621    On failure, it returns a KeepError with one of the following codes:
622
623    500 Collision
624           A different block with the same hash already exists on this
625           Keep server.
626    422 MD5Fail
627           The MD5 hash of the BLOCK does not match the argument HASH.
628    503 Full
629           There was not enough space left in any Keep volume to store
630           the object.
631    500 Fail
632           The object could not be stored for some other reason (e.g.
633           all writes failed). The text of the error message should
634           provide as much detail as possible.
635 */
636
637 func PutBlock(block []byte, hash string) error {
638         // Check that BLOCK's checksum matches HASH.
639         blockhash := fmt.Sprintf("%x", md5.Sum(block))
640         if blockhash != hash {
641                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
642                 return RequestHashError
643         }
644
645         // If we already have a block on disk under this identifier, return
646         // success (but check for MD5 collisions).  While fetching the block,
647         // update its timestamp.
648         // The only errors that GetBlock can return are DiskHashError and NotFoundError.
649         // In either case, we want to write our new (good) block to disk,
650         // so there is nothing special to do if err != nil.
651         //
652         if oldblock, err := GetBlock(hash, true); err == nil {
653                 if bytes.Compare(block, oldblock) == 0 {
654                         // The block already exists; return success.
655                         return nil
656                 } else {
657                         return CollisionError
658                 }
659         }
660
661         // Choose a Keep volume to write to.
662         // If this volume fails, try all of the volumes in order.
663         vol := KeepVM.Choose()
664         if err := vol.Put(hash, block); err == nil {
665                 return nil // success!
666         } else {
667                 allFull := true
668                 for _, vol := range KeepVM.Volumes() {
669                         err := vol.Put(hash, block)
670                         if err == nil {
671                                 return nil // success!
672                         }
673                         if err != FullError {
674                                 // The volume is not full but the write did not succeed.
675                                 // Report the error and continue trying.
676                                 allFull = false
677                                 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
678                         }
679                 }
680
681                 if allFull {
682                         log.Printf("all Keep volumes full")
683                         return FullError
684                 } else {
685                         log.Printf("all Keep volumes failed")
686                         return GenericError
687                 }
688         }
689 }
690
691 // IsValidLocator
692 //     Return true if the specified string is a valid Keep locator.
693 //     When Keep is extended to support hash types other than MD5,
694 //     this should be updated to cover those as well.
695 //
696 func IsValidLocator(loc string) bool {
697         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
698         if err == nil {
699                 return match
700         }
701         log.Printf("IsValidLocator: %s\n", err)
702         return false
703 }
704
705 // GetApiToken returns the OAuth2 token from the Authorization
706 // header of a HTTP request, or an empty string if no matching
707 // token is found.
708 func GetApiToken(req *http.Request) string {
709         if auth, ok := req.Header["Authorization"]; ok {
710                 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
711                         log.Println(err)
712                 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
713                         return match[1]
714                 }
715         }
716         return ""
717 }
718
719 // IsExpired returns true if the given Unix timestamp (expressed as a
720 // hexadecimal string) is in the past, or if timestamp_hex cannot be
721 // parsed as a hexadecimal string.
722 func IsExpired(timestamp_hex string) bool {
723         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
724         if err != nil {
725                 log.Printf("IsExpired: %s\n", err)
726                 return true
727         }
728         return time.Unix(ts, 0).Before(time.Now())
729 }
730
731 // CanDelete returns true if the user identified by api_token is
732 // allowed to delete blocks.
733 func CanDelete(api_token string) bool {
734         if api_token == "" {
735                 return false
736         }
737         // Blocks may be deleted only when Keep has been configured with a
738         // data manager.
739         if IsDataManagerToken(api_token) {
740                 return true
741         }
742         // TODO(twp): look up api_token with the API server
743         // return true if is_admin is true and if the token
744         // has unlimited scope
745         return false
746 }
747
748 // IsDataManagerToken returns true if api_token represents the data
749 // manager's token.
750 func IsDataManagerToken(api_token string) bool {
751         return data_manager_token != "" && api_token == data_manager_token
752 }