5043: Accept long stderr lines from crunch tasks.
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "bufio"
12         "bytes"
13         "container/list"
14         "crypto/md5"
15         "encoding/json"
16         "fmt"
17         "github.com/gorilla/mux"
18         "io"
19         "log"
20         "net/http"
21         "os"
22         "regexp"
23         "runtime"
24         "strconv"
25         "strings"
26         "syscall"
27         "time"
28 )
29
30 // MakeRESTRouter returns a new mux.Router that forwards all Keep
31 // requests to the appropriate handlers.
32 //
33 func MakeRESTRouter() *mux.Router {
34         rest := mux.NewRouter()
35
36         rest.HandleFunc(
37                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
38         rest.HandleFunc(
39                 `/{hash:[0-9a-f]{32}}+{hints}`,
40                 GetBlockHandler).Methods("GET", "HEAD")
41
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
43         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
44
45         // For IndexHandler we support:
46         //   /index           - returns all locators
47         //   /index/{prefix}  - returns all locators that begin with {prefix}
48         //      {prefix} is a string of hexadecimal digits between 0 and 32 digits.
49         //      If {prefix} is the empty string, return an index of all locators
50         //      (so /index and /index/ behave identically)
51         //      A client may supply a full 32-digit locator string, in which
52         //      case the server will return an index with either zero or one
53         //      entries. This usage allows a client to check whether a block is
54         //      present, and its size and upload time, without retrieving the
55         //      entire block.
56         //
57         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
58         rest.HandleFunc(
59                 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
60         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
61
62         // The PullHandler and TrashHandler process "PUT /pull" and "PUT
63         // /trash" requests from Data Manager.  These requests instruct
64         // Keep to replicate or delete blocks; see
65         // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
66         // for more details.
67         //
68         // Each handler parses the JSON list of block management requests
69         // in the message body, and replaces any existing pull queue or
70         // trash queue with their contentes.
71         //
72         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
73         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
74
75         // Any request which does not match any of these routes gets
76         // 400 Bad Request.
77         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
78
79         return rest
80 }
81
82 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
83         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
84 }
85
86 // FindKeepVolumes scans all mounted volumes on the system for Keep
87 // volumes, and returns a list of matching paths.
88 //
89 // A device is assumed to be a Keep volume if it is a normal or tmpfs
90 // volume and has a "/keep" directory directly underneath the mount
91 // point.
92 //
93 func FindKeepVolumes() []string {
94         vols := make([]string, 0)
95
96         if f, err := os.Open(PROC_MOUNTS); err != nil {
97                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
98         } else {
99                 scanner := bufio.NewScanner(f)
100                 for scanner.Scan() {
101                         args := strings.Fields(scanner.Text())
102                         dev, mount := args[0], args[1]
103                         if mount != "/" &&
104                                 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
105                                 keep := mount + "/keep"
106                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
107                                         vols = append(vols, keep)
108                                 }
109                         }
110                 }
111                 if err := scanner.Err(); err != nil {
112                         log.Fatal(err)
113                 }
114         }
115         return vols
116 }
117
118 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
119         hash := mux.Vars(req)["hash"]
120
121         hints := mux.Vars(req)["hints"]
122
123         // Parse the locator string and hints from the request.
124         // TODO(twp): implement a Locator type.
125         var signature, timestamp string
126         if hints != "" {
127                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
128                 for _, hint := range strings.Split(hints, "+") {
129                         if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
130                                 // Server ignores size hints
131                         } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
132                                 signature = m[1]
133                                 timestamp = m[2]
134                         } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
135                                 // Any unknown hint that starts with an uppercase letter is
136                                 // presumed to be valid and ignored, to permit forward compatibility.
137                         } else {
138                                 // Unknown format; not a valid locator.
139                                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
140                                 return
141                         }
142                 }
143         }
144
145         // If permission checking is in effect, verify this
146         // request's permission signature.
147         if enforce_permissions {
148                 if signature == "" || timestamp == "" {
149                         http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
150                         return
151                 } else if IsExpired(timestamp) {
152                         http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
153                         return
154                 } else {
155                         req_locator := req.URL.Path[1:] // strip leading slash
156                         if !VerifySignature(req_locator, GetApiToken(req)) {
157                                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
158                                 return
159                         }
160                 }
161         }
162
163         block, err := GetBlock(hash, false)
164
165         // Garbage collect after each GET. Fixes #2865.
166         // TODO(twp): review Keep memory usage and see if there's
167         // a better way to do this than blindly garbage collecting
168         // after every block.
169         defer runtime.GC()
170
171         if err != nil {
172                 // This type assertion is safe because the only errors
173                 // GetBlock can return are DiskHashError or NotFoundError.
174                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
175                 return
176         }
177
178         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
179
180         _, err = resp.Write(block)
181
182         return
183 }
184
185 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
186         // Garbage collect after each PUT. Fixes #2865.
187         // See also GetBlockHandler.
188         defer runtime.GC()
189
190         hash := mux.Vars(req)["hash"]
191
192         // Read the block data to be stored.
193         // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
194         //
195         if req.ContentLength > BLOCKSIZE {
196                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
197                 return
198         }
199
200         buf := make([]byte, req.ContentLength)
201         nread, err := io.ReadFull(req.Body, buf)
202         if err != nil {
203                 http.Error(resp, err.Error(), 500)
204         } else if int64(nread) < req.ContentLength {
205                 http.Error(resp, "request truncated", 500)
206         } else {
207                 if err := PutBlock(buf, hash); err == nil {
208                         // Success; add a size hint, sign the locator if
209                         // possible, and return it to the client.
210                         return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
211                         api_token := GetApiToken(req)
212                         if PermissionSecret != nil && api_token != "" {
213                                 expiry := time.Now().Add(permission_ttl)
214                                 return_hash = SignLocator(return_hash, api_token, expiry)
215                         }
216                         resp.Write([]byte(return_hash + "\n"))
217                 } else {
218                         ke := err.(*KeepError)
219                         http.Error(resp, ke.Error(), ke.HTTPCode)
220                 }
221         }
222         return
223 }
224
225 // IndexHandler
226 //     A HandleFunc to address /index and /index/{prefix} requests.
227 //
228 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
229         // Reject unauthorized requests.
230         if !IsDataManagerToken(GetApiToken(req)) {
231                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
232                 return
233         }
234
235         prefix := mux.Vars(req)["prefix"]
236
237         var index string
238         for _, vol := range KeepVM.Volumes() {
239                 index = index + vol.Index(prefix)
240         }
241         resp.Write([]byte(index))
242 }
243
244 // StatusHandler
245 //     Responds to /status.json requests with the current node status,
246 //     described in a JSON structure.
247 //
248 //     The data given in a status.json response includes:
249 //        volumes - a list of Keep volumes currently in use by this server
250 //          each volume is an object with the following fields:
251 //            * mount_point
252 //            * device_num (an integer identifying the underlying filesystem)
253 //            * bytes_free
254 //            * bytes_used
255 //
256 type VolumeStatus struct {
257         MountPoint string `json:"mount_point"`
258         DeviceNum  uint64 `json:"device_num"`
259         BytesFree  uint64 `json:"bytes_free"`
260         BytesUsed  uint64 `json:"bytes_used"`
261 }
262
263 type NodeStatus struct {
264         Volumes []*VolumeStatus `json:"volumes"`
265 }
266
267 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
268         st := GetNodeStatus()
269         if jstat, err := json.Marshal(st); err == nil {
270                 resp.Write(jstat)
271         } else {
272                 log.Printf("json.Marshal: %s\n", err)
273                 log.Printf("NodeStatus = %v\n", st)
274                 http.Error(resp, err.Error(), 500)
275         }
276 }
277
278 // GetNodeStatus
279 //     Returns a NodeStatus struct describing this Keep
280 //     node's current status.
281 //
282 func GetNodeStatus() *NodeStatus {
283         st := new(NodeStatus)
284
285         st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
286         for i, vol := range KeepVM.Volumes() {
287                 st.Volumes[i] = vol.Status()
288         }
289         return st
290 }
291
292 // GetVolumeStatus
293 //     Returns a VolumeStatus describing the requested volume.
294 //
295 func GetVolumeStatus(volume string) *VolumeStatus {
296         var fs syscall.Statfs_t
297         var devnum uint64
298
299         if fi, err := os.Stat(volume); err == nil {
300                 devnum = fi.Sys().(*syscall.Stat_t).Dev
301         } else {
302                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
303                 return nil
304         }
305
306         err := syscall.Statfs(volume, &fs)
307         if err != nil {
308                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
309                 return nil
310         }
311         // These calculations match the way df calculates disk usage:
312         // "free" space is measured by fs.Bavail, but "used" space
313         // uses fs.Blocks - fs.Bfree.
314         free := fs.Bavail * uint64(fs.Bsize)
315         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
316         return &VolumeStatus{volume, devnum, free, used}
317 }
318
319 // DeleteHandler processes DELETE requests.
320 //
321 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
322 // from all connected volumes.
323 //
324 // Only the Data Manager, or an Arvados admin with scope "all", are
325 // allowed to issue DELETE requests.  If a DELETE request is not
326 // authenticated or is issued by a non-admin user, the server returns
327 // a PermissionError.
328 //
329 // Upon receiving a valid request from an authorized user,
330 // DeleteHandler deletes all copies of the specified block on local
331 // writable volumes.
332 //
333 // Response format:
334 //
335 // If the requested blocks was not found on any volume, the response
336 // code is HTTP 404 Not Found.
337 //
338 // Otherwise, the response code is 200 OK, with a response body
339 // consisting of the JSON message
340 //
341 //    {"copies_deleted":d,"copies_failed":f}
342 //
343 // where d and f are integers representing the number of blocks that
344 // were successfully and unsuccessfully deleted.
345 //
346 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
347         hash := mux.Vars(req)["hash"]
348
349         // Confirm that this user is an admin and has a token with unlimited scope.
350         var tok = GetApiToken(req)
351         if tok == "" || !CanDelete(tok) {
352                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
353                 return
354         }
355
356         if never_delete {
357                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
358                 return
359         }
360
361         // Delete copies of this block from all available volumes.  Report
362         // how many blocks were successfully and unsuccessfully
363         // deleted.
364         var result struct {
365                 Deleted int `json:"copies_deleted"`
366                 Failed  int `json:"copies_failed"`
367         }
368         for _, vol := range KeepVM.Volumes() {
369                 if err := vol.Delete(hash); err == nil {
370                         result.Deleted++
371                 } else if os.IsNotExist(err) {
372                         continue
373                 } else {
374                         result.Failed++
375                         log.Println("DeleteHandler:", err)
376                 }
377         }
378
379         var st int
380
381         if result.Deleted == 0 && result.Failed == 0 {
382                 st = http.StatusNotFound
383         } else {
384                 st = http.StatusOK
385         }
386
387         resp.WriteHeader(st)
388
389         if st == http.StatusOK {
390                 if body, err := json.Marshal(result); err == nil {
391                         resp.Write(body)
392                 } else {
393                         log.Printf("json.Marshal: %s (result = %v)\n", err, result)
394                         http.Error(resp, err.Error(), 500)
395                 }
396         }
397 }
398
399 /* PullHandler processes "PUT /pull" requests for the data manager.
400    The request body is a JSON message containing a list of pull
401    requests in the following format:
402
403    [
404       {
405          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
406          "servers":[
407                         "keep0.qr1hi.arvadosapi.com:25107",
408                         "keep1.qr1hi.arvadosapi.com:25108"
409                  ]
410           },
411           {
412                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
413                  "servers":[
414                         "10.0.1.5:25107",
415                         "10.0.1.6:25107",
416                         "10.0.1.7:25108"
417                  ]
418           },
419           ...
420    ]
421
422    Each pull request in the list consists of a block locator string
423    and an ordered list of servers.  Keepstore should try to fetch the
424    block from each server in turn.
425
426    If the request has not been sent by the Data Manager, return 401
427    Unauthorized.
428
429    If the JSON unmarshalling fails, return 400 Bad Request.
430 */
431
432 type PullRequest struct {
433         Locator string   `json:"locator"`
434         Servers []string `json:"servers"`
435 }
436
437 func PullHandler(resp http.ResponseWriter, req *http.Request) {
438         // Reject unauthorized requests.
439         if !IsDataManagerToken(GetApiToken(req)) {
440                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
441                 return
442         }
443
444         // Parse the request body.
445         var pr []PullRequest
446         r := json.NewDecoder(req.Body)
447         if err := r.Decode(&pr); err != nil {
448                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
449                 return
450         }
451
452         // We have a properly formatted pull list sent from the data
453         // manager.  Report success and send the list to the pull list
454         // manager for further handling.
455         resp.WriteHeader(http.StatusOK)
456         resp.Write([]byte(
457                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
458
459         plist := list.New()
460         for _, p := range pr {
461                 plist.PushBack(p)
462         }
463
464         if pullq == nil {
465                 pullq = NewWorkQueue()
466         }
467         pullq.ReplaceQueue(plist)
468 }
469
470 type TrashRequest struct {
471         Locator    string `json:"locator"`
472         BlockMtime int64  `json:"block_mtime"`
473 }
474
475 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
476         // Reject unauthorized requests.
477         if !IsDataManagerToken(GetApiToken(req)) {
478                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
479                 return
480         }
481
482         // Parse the request body.
483         var trash []TrashRequest
484         r := json.NewDecoder(req.Body)
485         if err := r.Decode(&trash); err != nil {
486                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
487                 return
488         }
489
490         // We have a properly formatted trash list sent from the data
491         // manager.  Report success and send the list to the trash work
492         // queue for further handling.
493         resp.WriteHeader(http.StatusOK)
494         resp.Write([]byte(
495                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
496
497         tlist := list.New()
498         for _, t := range trash {
499                 tlist.PushBack(t)
500         }
501
502         if trashq == nil {
503                 trashq = NewWorkQueue()
504         }
505         trashq.ReplaceQueue(tlist)
506 }
507
508 // ==============================
509 // GetBlock and PutBlock implement lower-level code for handling
510 // blocks by rooting through volumes connected to the local machine.
511 // Once the handler has determined that system policy permits the
512 // request, it calls these methods to perform the actual operation.
513 //
514 // TODO(twp): this code would probably be better located in the
515 // VolumeManager interface. As an abstraction, the VolumeManager
516 // should be the only part of the code that cares about which volume a
517 // block is stored on, so it should be responsible for figuring out
518 // which volume to check for fetching blocks, storing blocks, etc.
519
520 // ==============================
521 // GetBlock fetches and returns the block identified by "hash".  If
522 // the update_timestamp argument is true, GetBlock also updates the
523 // block's file modification time (for the sake of PutBlock, which
524 // must update the file's timestamp when the block already exists).
525 //
526 // On success, GetBlock returns a byte slice with the block data, and
527 // a nil error.
528 //
529 // If the block cannot be found on any volume, returns NotFoundError.
530 //
531 // If the block found does not have the correct MD5 hash, returns
532 // DiskHashError.
533 //
534
535 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
536         // Attempt to read the requested hash from a keep volume.
537         error_to_caller := NotFoundError
538
539         for _, vol := range KeepVM.Volumes() {
540                 if buf, err := vol.Get(hash); err != nil {
541                         // IsNotExist is an expected error and may be ignored.
542                         // (If all volumes report IsNotExist, we return a NotFoundError)
543                         // All other errors should be logged but we continue trying to
544                         // read.
545                         switch {
546                         case os.IsNotExist(err):
547                                 continue
548                         default:
549                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
550                         }
551                 } else {
552                         // Double check the file checksum.
553                         //
554                         filehash := fmt.Sprintf("%x", md5.Sum(buf))
555                         if filehash != hash {
556                                 // TODO(twp): this condition probably represents a bad disk and
557                                 // should raise major alarm bells for an administrator: e.g.
558                                 // they should be sent directly to an event manager at high
559                                 // priority or logged as urgent problems.
560                                 //
561                                 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
562                                         vol, hash, filehash)
563                                 error_to_caller = DiskHashError
564                         } else {
565                                 // Success!
566                                 if error_to_caller != NotFoundError {
567                                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
568                                                 vol, hash)
569                                 }
570                                 // Update the timestamp if the caller requested.
571                                 // If we could not update the timestamp, continue looking on
572                                 // other volumes.
573                                 if update_timestamp {
574                                         if vol.Touch(hash) != nil {
575                                                 continue
576                                         }
577                                 }
578                                 return buf, nil
579                         }
580                 }
581         }
582
583         if error_to_caller != NotFoundError {
584                 log.Printf("%s: checksum mismatch, no good copy found\n", hash)
585         }
586         return nil, error_to_caller
587 }
588
589 /* PutBlock(block, hash)
590    Stores the BLOCK (identified by the content id HASH) in Keep.
591
592    The MD5 checksum of the block must be identical to the content id HASH.
593    If not, an error is returned.
594
595    PutBlock stores the BLOCK on the first Keep volume with free space.
596    A failure code is returned to the user only if all volumes fail.
597
598    On success, PutBlock returns nil.
599    On failure, it returns a KeepError with one of the following codes:
600
601    500 Collision
602           A different block with the same hash already exists on this
603           Keep server.
604    422 MD5Fail
605           The MD5 hash of the BLOCK does not match the argument HASH.
606    503 Full
607           There was not enough space left in any Keep volume to store
608           the object.
609    500 Fail
610           The object could not be stored for some other reason (e.g.
611           all writes failed). The text of the error message should
612           provide as much detail as possible.
613 */
614
615 func PutBlock(block []byte, hash string) error {
616         // Check that BLOCK's checksum matches HASH.
617         blockhash := fmt.Sprintf("%x", md5.Sum(block))
618         if blockhash != hash {
619                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
620                 return RequestHashError
621         }
622
623         // If we already have a block on disk under this identifier, return
624         // success (but check for MD5 collisions).  While fetching the block,
625         // update its timestamp.
626         // The only errors that GetBlock can return are DiskHashError and NotFoundError.
627         // In either case, we want to write our new (good) block to disk,
628         // so there is nothing special to do if err != nil.
629         //
630         if oldblock, err := GetBlock(hash, true); err == nil {
631                 if bytes.Compare(block, oldblock) == 0 {
632                         // The block already exists; return success.
633                         return nil
634                 } else {
635                         return CollisionError
636                 }
637         }
638
639         // Choose a Keep volume to write to.
640         // If this volume fails, try all of the volumes in order.
641         vol := KeepVM.Choose()
642         if err := vol.Put(hash, block); err == nil {
643                 return nil // success!
644         } else {
645                 allFull := true
646                 for _, vol := range KeepVM.Volumes() {
647                         err := vol.Put(hash, block)
648                         if err == nil {
649                                 return nil // success!
650                         }
651                         if err != FullError {
652                                 // The volume is not full but the write did not succeed.
653                                 // Report the error and continue trying.
654                                 allFull = false
655                                 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
656                         }
657                 }
658
659                 if allFull {
660                         log.Printf("all Keep volumes full")
661                         return FullError
662                 } else {
663                         log.Printf("all Keep volumes failed")
664                         return GenericError
665                 }
666         }
667 }
668
669 // IsValidLocator
670 //     Return true if the specified string is a valid Keep locator.
671 //     When Keep is extended to support hash types other than MD5,
672 //     this should be updated to cover those as well.
673 //
674 func IsValidLocator(loc string) bool {
675         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
676         if err == nil {
677                 return match
678         }
679         log.Printf("IsValidLocator: %s\n", err)
680         return false
681 }
682
683 // GetApiToken returns the OAuth2 token from the Authorization
684 // header of a HTTP request, or an empty string if no matching
685 // token is found.
686 func GetApiToken(req *http.Request) string {
687         if auth, ok := req.Header["Authorization"]; ok {
688                 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
689                         log.Println(err)
690                 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
691                         return match[1]
692                 }
693         }
694         return ""
695 }
696
697 // IsExpired returns true if the given Unix timestamp (expressed as a
698 // hexadecimal string) is in the past, or if timestamp_hex cannot be
699 // parsed as a hexadecimal string.
700 func IsExpired(timestamp_hex string) bool {
701         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
702         if err != nil {
703                 log.Printf("IsExpired: %s\n", err)
704                 return true
705         }
706         return time.Unix(ts, 0).Before(time.Now())
707 }
708
709 // CanDelete returns true if the user identified by api_token is
710 // allowed to delete blocks.
711 func CanDelete(api_token string) bool {
712         if api_token == "" {
713                 return false
714         }
715         // Blocks may be deleted only when Keep has been configured with a
716         // data manager.
717         if IsDataManagerToken(api_token) {
718                 return true
719         }
720         // TODO(twp): look up api_token with the API server
721         // return true if is_admin is true and if the token
722         // has unlimited scope
723         return false
724 }
725
726 // IsDataManagerToken returns true if api_token represents the data
727 // manager's token.
728 func IsDataManagerToken(api_token string) bool {
729         return data_manager_token != "" && api_token == data_manager_token
730 }