4869: Keepstore now returns Content-Length headers, and logs the error message
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "bufio"
12         "bytes"
13         "container/list"
14         "crypto/md5"
15         "encoding/json"
16         "fmt"
17         "github.com/gorilla/mux"
18         "io"
19         "log"
20         "net/http"
21         "os"
22         "regexp"
23         "runtime"
24         "strconv"
25         "strings"
26         "syscall"
27         "time"
28 )
29
30 // MakeRESTRouter returns a new mux.Router that forwards all Keep
31 // requests to the appropriate handlers.
32 //
33 func MakeRESTRouter() *mux.Router {
34         rest := mux.NewRouter()
35
36         rest.HandleFunc(
37                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
38         rest.HandleFunc(
39                 `/{hash:[0-9a-f]{32}}+{hints}`,
40                 GetBlockHandler).Methods("GET", "HEAD")
41
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
43         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
44
45         // For IndexHandler we support:
46         //   /index           - returns all locators
47         //   /index/{prefix}  - returns all locators that begin with {prefix}
48         //      {prefix} is a string of hexadecimal digits between 0 and 32 digits.
49         //      If {prefix} is the empty string, return an index of all locators
50         //      (so /index and /index/ behave identically)
51         //      A client may supply a full 32-digit locator string, in which
52         //      case the server will return an index with either zero or one
53         //      entries. This usage allows a client to check whether a block is
54         //      present, and its size and upload time, without retrieving the
55         //      entire block.
56         //
57         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
58         rest.HandleFunc(
59                 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
60         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
61
62         // The PullHandler and TrashHandler process "PUT /pull" and "PUT
63         // /trash" requests from Data Manager.  These requests instruct
64         // Keep to replicate or delete blocks; see
65         // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
66         // for more details.
67         //
68         // Each handler parses the JSON list of block management requests
69         // in the message body, and replaces any existing pull queue or
70         // trash queue with their contentes.
71         //
72         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
73         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
74
75         // Any request which does not match any of these routes gets
76         // 400 Bad Request.
77         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
78
79         return rest
80 }
81
82 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
83         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
84 }
85
86 // FindKeepVolumes scans all mounted volumes on the system for Keep
87 // volumes, and returns a list of matching paths.
88 //
89 // A device is assumed to be a Keep volume if it is a normal or tmpfs
90 // volume and has a "/keep" directory directly underneath the mount
91 // point.
92 //
93 func FindKeepVolumes() []string {
94         vols := make([]string, 0)
95
96         if f, err := os.Open(PROC_MOUNTS); err != nil {
97                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
98         } else {
99                 scanner := bufio.NewScanner(f)
100                 for scanner.Scan() {
101                         args := strings.Fields(scanner.Text())
102                         dev, mount := args[0], args[1]
103                         if mount != "/" &&
104                                 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
105                                 keep := mount + "/keep"
106                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
107                                         vols = append(vols, keep)
108                                 }
109                         }
110                 }
111                 if err := scanner.Err(); err != nil {
112                         log.Fatal(err)
113                 }
114         }
115         return vols
116 }
117
118 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
119         hash := mux.Vars(req)["hash"]
120
121         hints := mux.Vars(req)["hints"]
122
123         // Parse the locator string and hints from the request.
124         // TODO(twp): implement a Locator type.
125         var signature, timestamp string
126         if hints != "" {
127                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
128                 for _, hint := range strings.Split(hints, "+") {
129                         if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
130                                 // Server ignores size hints
131                         } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
132                                 signature = m[1]
133                                 timestamp = m[2]
134                         } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
135                                 // Any unknown hint that starts with an uppercase letter is
136                                 // presumed to be valid and ignored, to permit forward compatibility.
137                         } else {
138                                 // Unknown format; not a valid locator.
139                                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
140                                 return
141                         }
142                 }
143         }
144
145         // If permission checking is in effect, verify this
146         // request's permission signature.
147         if enforce_permissions {
148                 if signature == "" || timestamp == "" {
149                         http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
150                         return
151                 } else if IsExpired(timestamp) {
152                         http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
153                         return
154                 } else {
155                         req_locator := req.URL.Path[1:] // strip leading slash
156                         if !VerifySignature(req_locator, GetApiToken(req)) {
157                                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
158                                 return
159                         }
160                 }
161         }
162
163         block, err := GetBlock(hash, false)
164
165         // Garbage collect after each GET. Fixes #2865.
166         // TODO(twp): review Keep memory usage and see if there's
167         // a better way to do this than blindly garbage collecting
168         // after every block.
169         defer runtime.GC()
170
171         if err != nil {
172                 // This type assertion is safe because the only errors
173                 // GetBlock can return are DiskHashError or NotFoundError.
174                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
175                 return
176         }
177
178         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
179
180         // If/when we support HTTP Range header (#3734), then Content-Length
181         // could be smaller than Block size
182         resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
183
184         _, err = resp.Write(block)
185
186         return
187 }
188
189 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
190         // Garbage collect after each PUT. Fixes #2865.
191         // See also GetBlockHandler.
192         defer runtime.GC()
193
194         hash := mux.Vars(req)["hash"]
195
196         // Read the block data to be stored.
197         // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
198         //
199         if req.ContentLength > BLOCKSIZE {
200                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
201                 return
202         }
203
204         buf := make([]byte, req.ContentLength)
205         nread, err := io.ReadFull(req.Body, buf)
206         if err != nil {
207                 http.Error(resp, err.Error(), 500)
208         } else if int64(nread) < req.ContentLength {
209                 http.Error(resp, "request truncated", 500)
210         } else {
211                 if err := PutBlock(buf, hash); err == nil {
212                         // Success; add a size hint, sign the locator if
213                         // possible, and return it to the client.
214                         return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
215                         api_token := GetApiToken(req)
216                         if PermissionSecret != nil && api_token != "" {
217                                 expiry := time.Now().Add(permission_ttl)
218                                 return_hash = SignLocator(return_hash, api_token, expiry)
219                         }
220                         resp.Write([]byte(return_hash + "\n"))
221                 } else {
222                         ke := err.(*KeepError)
223                         http.Error(resp, ke.Error(), ke.HTTPCode)
224                 }
225         }
226         return
227 }
228
229 // IndexHandler
230 //     A HandleFunc to address /index and /index/{prefix} requests.
231 //
232 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
233         // Reject unauthorized requests.
234         if !IsDataManagerToken(GetApiToken(req)) {
235                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
236                 return
237         }
238
239         prefix := mux.Vars(req)["prefix"]
240
241         var index string
242         for _, vol := range KeepVM.Volumes() {
243                 index = index + vol.Index(prefix)
244         }
245         resp.Write([]byte(index))
246 }
247
248 // StatusHandler
249 //     Responds to /status.json requests with the current node status,
250 //     described in a JSON structure.
251 //
252 //     The data given in a status.json response includes:
253 //        volumes - a list of Keep volumes currently in use by this server
254 //          each volume is an object with the following fields:
255 //            * mount_point
256 //            * device_num (an integer identifying the underlying filesystem)
257 //            * bytes_free
258 //            * bytes_used
259 //
260 type VolumeStatus struct {
261         MountPoint string `json:"mount_point"`
262         DeviceNum  uint64 `json:"device_num"`
263         BytesFree  uint64 `json:"bytes_free"`
264         BytesUsed  uint64 `json:"bytes_used"`
265 }
266
267 type NodeStatus struct {
268         Volumes []*VolumeStatus `json:"volumes"`
269 }
270
271 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
272         st := GetNodeStatus()
273         if jstat, err := json.Marshal(st); err == nil {
274                 resp.Write(jstat)
275         } else {
276                 log.Printf("json.Marshal: %s\n", err)
277                 log.Printf("NodeStatus = %v\n", st)
278                 http.Error(resp, err.Error(), 500)
279         }
280 }
281
282 // GetNodeStatus
283 //     Returns a NodeStatus struct describing this Keep
284 //     node's current status.
285 //
286 func GetNodeStatus() *NodeStatus {
287         st := new(NodeStatus)
288
289         st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
290         for i, vol := range KeepVM.Volumes() {
291                 st.Volumes[i] = vol.Status()
292         }
293         return st
294 }
295
296 // GetVolumeStatus
297 //     Returns a VolumeStatus describing the requested volume.
298 //
299 func GetVolumeStatus(volume string) *VolumeStatus {
300         var fs syscall.Statfs_t
301         var devnum uint64
302
303         if fi, err := os.Stat(volume); err == nil {
304                 devnum = fi.Sys().(*syscall.Stat_t).Dev
305         } else {
306                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
307                 return nil
308         }
309
310         err := syscall.Statfs(volume, &fs)
311         if err != nil {
312                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
313                 return nil
314         }
315         // These calculations match the way df calculates disk usage:
316         // "free" space is measured by fs.Bavail, but "used" space
317         // uses fs.Blocks - fs.Bfree.
318         free := fs.Bavail * uint64(fs.Bsize)
319         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
320         return &VolumeStatus{volume, devnum, free, used}
321 }
322
323 // DeleteHandler processes DELETE requests.
324 //
325 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
326 // from all connected volumes.
327 //
328 // Only the Data Manager, or an Arvados admin with scope "all", are
329 // allowed to issue DELETE requests.  If a DELETE request is not
330 // authenticated or is issued by a non-admin user, the server returns
331 // a PermissionError.
332 //
333 // Upon receiving a valid request from an authorized user,
334 // DeleteHandler deletes all copies of the specified block on local
335 // writable volumes.
336 //
337 // Response format:
338 //
339 // If the requested blocks was not found on any volume, the response
340 // code is HTTP 404 Not Found.
341 //
342 // Otherwise, the response code is 200 OK, with a response body
343 // consisting of the JSON message
344 //
345 //    {"copies_deleted":d,"copies_failed":f}
346 //
347 // where d and f are integers representing the number of blocks that
348 // were successfully and unsuccessfully deleted.
349 //
350 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
351         hash := mux.Vars(req)["hash"]
352
353         // Confirm that this user is an admin and has a token with unlimited scope.
354         var tok = GetApiToken(req)
355         if tok == "" || !CanDelete(tok) {
356                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
357                 return
358         }
359
360         if never_delete {
361                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
362                 return
363         }
364
365         // Delete copies of this block from all available volumes.  Report
366         // how many blocks were successfully and unsuccessfully
367         // deleted.
368         var result struct {
369                 Deleted int `json:"copies_deleted"`
370                 Failed  int `json:"copies_failed"`
371         }
372         for _, vol := range KeepVM.Volumes() {
373                 if err := vol.Delete(hash); err == nil {
374                         result.Deleted++
375                 } else if os.IsNotExist(err) {
376                         continue
377                 } else {
378                         result.Failed++
379                         log.Println("DeleteHandler:", err)
380                 }
381         }
382
383         var st int
384
385         if result.Deleted == 0 && result.Failed == 0 {
386                 st = http.StatusNotFound
387         } else {
388                 st = http.StatusOK
389         }
390
391         resp.WriteHeader(st)
392
393         if st == http.StatusOK {
394                 if body, err := json.Marshal(result); err == nil {
395                         resp.Write(body)
396                 } else {
397                         log.Printf("json.Marshal: %s (result = %v)\n", err, result)
398                         http.Error(resp, err.Error(), 500)
399                 }
400         }
401 }
402
403 /* PullHandler processes "PUT /pull" requests for the data manager.
404    The request body is a JSON message containing a list of pull
405    requests in the following format:
406
407    [
408       {
409          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
410          "servers":[
411                         "keep0.qr1hi.arvadosapi.com:25107",
412                         "keep1.qr1hi.arvadosapi.com:25108"
413                  ]
414           },
415           {
416                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
417                  "servers":[
418                         "10.0.1.5:25107",
419                         "10.0.1.6:25107",
420                         "10.0.1.7:25108"
421                  ]
422           },
423           ...
424    ]
425
426    Each pull request in the list consists of a block locator string
427    and an ordered list of servers.  Keepstore should try to fetch the
428    block from each server in turn.
429
430    If the request has not been sent by the Data Manager, return 401
431    Unauthorized.
432
433    If the JSON unmarshalling fails, return 400 Bad Request.
434 */
435
436 type PullRequest struct {
437         Locator string   `json:"locator"`
438         Servers []string `json:"servers"`
439 }
440
441 func PullHandler(resp http.ResponseWriter, req *http.Request) {
442         // Reject unauthorized requests.
443         if !IsDataManagerToken(GetApiToken(req)) {
444                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
445                 return
446         }
447
448         // Parse the request body.
449         var pr []PullRequest
450         r := json.NewDecoder(req.Body)
451         if err := r.Decode(&pr); err != nil {
452                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
453                 return
454         }
455
456         // We have a properly formatted pull list sent from the data
457         // manager.  Report success and send the list to the pull list
458         // manager for further handling.
459         resp.WriteHeader(http.StatusOK)
460         resp.Write([]byte(
461                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
462
463         plist := list.New()
464         for _, p := range pr {
465                 plist.PushBack(p)
466         }
467
468         if pullq == nil {
469                 pullq = NewWorkQueue()
470         }
471         pullq.ReplaceQueue(plist)
472 }
473
474 type TrashRequest struct {
475         Locator    string `json:"locator"`
476         BlockMtime int64  `json:"block_mtime"`
477 }
478
479 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
480         // Reject unauthorized requests.
481         if !IsDataManagerToken(GetApiToken(req)) {
482                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
483                 return
484         }
485
486         // Parse the request body.
487         var trash []TrashRequest
488         r := json.NewDecoder(req.Body)
489         if err := r.Decode(&trash); err != nil {
490                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
491                 return
492         }
493
494         // We have a properly formatted trash list sent from the data
495         // manager.  Report success and send the list to the trash work
496         // queue for further handling.
497         resp.WriteHeader(http.StatusOK)
498         resp.Write([]byte(
499                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
500
501         tlist := list.New()
502         for _, t := range trash {
503                 tlist.PushBack(t)
504         }
505
506         if trashq == nil {
507                 trashq = NewWorkQueue()
508         }
509         trashq.ReplaceQueue(tlist)
510 }
511
512 // ==============================
513 // GetBlock and PutBlock implement lower-level code for handling
514 // blocks by rooting through volumes connected to the local machine.
515 // Once the handler has determined that system policy permits the
516 // request, it calls these methods to perform the actual operation.
517 //
518 // TODO(twp): this code would probably be better located in the
519 // VolumeManager interface. As an abstraction, the VolumeManager
520 // should be the only part of the code that cares about which volume a
521 // block is stored on, so it should be responsible for figuring out
522 // which volume to check for fetching blocks, storing blocks, etc.
523
524 // ==============================
525 // GetBlock fetches and returns the block identified by "hash".  If
526 // the update_timestamp argument is true, GetBlock also updates the
527 // block's file modification time (for the sake of PutBlock, which
528 // must update the file's timestamp when the block already exists).
529 //
530 // On success, GetBlock returns a byte slice with the block data, and
531 // a nil error.
532 //
533 // If the block cannot be found on any volume, returns NotFoundError.
534 //
535 // If the block found does not have the correct MD5 hash, returns
536 // DiskHashError.
537 //
538
539 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
540         // Attempt to read the requested hash from a keep volume.
541         error_to_caller := NotFoundError
542
543         for _, vol := range KeepVM.Volumes() {
544                 if buf, err := vol.Get(hash); err != nil {
545                         // IsNotExist is an expected error and may be ignored.
546                         // (If all volumes report IsNotExist, we return a NotFoundError)
547                         // All other errors should be logged but we continue trying to
548                         // read.
549                         switch {
550                         case os.IsNotExist(err):
551                                 continue
552                         default:
553                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
554                         }
555                 } else {
556                         // Double check the file checksum.
557                         //
558                         filehash := fmt.Sprintf("%x", md5.Sum(buf))
559                         if filehash != hash {
560                                 // TODO(twp): this condition probably represents a bad disk and
561                                 // should raise major alarm bells for an administrator: e.g.
562                                 // they should be sent directly to an event manager at high
563                                 // priority or logged as urgent problems.
564                                 //
565                                 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
566                                         vol, hash, filehash)
567                                 error_to_caller = DiskHashError
568                         } else {
569                                 // Success!
570                                 if error_to_caller != NotFoundError {
571                                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
572                                                 vol, hash)
573                                 }
574                                 // Update the timestamp if the caller requested.
575                                 // If we could not update the timestamp, continue looking on
576                                 // other volumes.
577                                 if update_timestamp {
578                                         if vol.Touch(hash) != nil {
579                                                 continue
580                                         }
581                                 }
582                                 return buf, nil
583                         }
584                 }
585         }
586
587         if error_to_caller != NotFoundError {
588                 log.Printf("%s: checksum mismatch, no good copy found\n", hash)
589         }
590         return nil, error_to_caller
591 }
592
593 /* PutBlock(block, hash)
594    Stores the BLOCK (identified by the content id HASH) in Keep.
595
596    The MD5 checksum of the block must be identical to the content id HASH.
597    If not, an error is returned.
598
599    PutBlock stores the BLOCK on the first Keep volume with free space.
600    A failure code is returned to the user only if all volumes fail.
601
602    On success, PutBlock returns nil.
603    On failure, it returns a KeepError with one of the following codes:
604
605    500 Collision
606           A different block with the same hash already exists on this
607           Keep server.
608    422 MD5Fail
609           The MD5 hash of the BLOCK does not match the argument HASH.
610    503 Full
611           There was not enough space left in any Keep volume to store
612           the object.
613    500 Fail
614           The object could not be stored for some other reason (e.g.
615           all writes failed). The text of the error message should
616           provide as much detail as possible.
617 */
618
619 func PutBlock(block []byte, hash string) error {
620         // Check that BLOCK's checksum matches HASH.
621         blockhash := fmt.Sprintf("%x", md5.Sum(block))
622         if blockhash != hash {
623                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
624                 return RequestHashError
625         }
626
627         // If we already have a block on disk under this identifier, return
628         // success (but check for MD5 collisions).  While fetching the block,
629         // update its timestamp.
630         // The only errors that GetBlock can return are DiskHashError and NotFoundError.
631         // In either case, we want to write our new (good) block to disk,
632         // so there is nothing special to do if err != nil.
633         //
634         if oldblock, err := GetBlock(hash, true); err == nil {
635                 if bytes.Compare(block, oldblock) == 0 {
636                         // The block already exists; return success.
637                         return nil
638                 } else {
639                         return CollisionError
640                 }
641         }
642
643         // Choose a Keep volume to write to.
644         // If this volume fails, try all of the volumes in order.
645         vol := KeepVM.Choose()
646         if err := vol.Put(hash, block); err == nil {
647                 return nil // success!
648         } else {
649                 allFull := true
650                 for _, vol := range KeepVM.Volumes() {
651                         err := vol.Put(hash, block)
652                         if err == nil {
653                                 return nil // success!
654                         }
655                         if err != FullError {
656                                 // The volume is not full but the write did not succeed.
657                                 // Report the error and continue trying.
658                                 allFull = false
659                                 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
660                         }
661                 }
662
663                 if allFull {
664                         log.Printf("all Keep volumes full")
665                         return FullError
666                 } else {
667                         log.Printf("all Keep volumes failed")
668                         return GenericError
669                 }
670         }
671 }
672
673 // IsValidLocator
674 //     Return true if the specified string is a valid Keep locator.
675 //     When Keep is extended to support hash types other than MD5,
676 //     this should be updated to cover those as well.
677 //
678 func IsValidLocator(loc string) bool {
679         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
680         if err == nil {
681                 return match
682         }
683         log.Printf("IsValidLocator: %s\n", err)
684         return false
685 }
686
687 // GetApiToken returns the OAuth2 token from the Authorization
688 // header of a HTTP request, or an empty string if no matching
689 // token is found.
690 func GetApiToken(req *http.Request) string {
691         if auth, ok := req.Header["Authorization"]; ok {
692                 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
693                         log.Println(err)
694                 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
695                         return match[1]
696                 }
697         }
698         return ""
699 }
700
701 // IsExpired returns true if the given Unix timestamp (expressed as a
702 // hexadecimal string) is in the past, or if timestamp_hex cannot be
703 // parsed as a hexadecimal string.
704 func IsExpired(timestamp_hex string) bool {
705         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
706         if err != nil {
707                 log.Printf("IsExpired: %s\n", err)
708                 return true
709         }
710         return time.Unix(ts, 0).Before(time.Now())
711 }
712
713 // CanDelete returns true if the user identified by api_token is
714 // allowed to delete blocks.
715 func CanDelete(api_token string) bool {
716         if api_token == "" {
717                 return false
718         }
719         // Blocks may be deleted only when Keep has been configured with a
720         // data manager.
721         if IsDataManagerToken(api_token) {
722                 return true
723         }
724         // TODO(twp): look up api_token with the API server
725         // return true if is_admin is true and if the token
726         // has unlimited scope
727         return false
728 }
729
730 // IsDataManagerToken returns true if api_token represents the data
731 // manager's token.
732 func IsDataManagerToken(api_token string) bool {
733         return data_manager_token != "" && api_token == data_manager_token
734 }