Merge branch '3967-improve-keepstore-logging'
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "bufio"
12         "bytes"
13         "container/list"
14         "crypto/md5"
15         "encoding/json"
16         "fmt"
17         "github.com/gorilla/mux"
18         "io"
19         "log"
20         "net/http"
21         "os"
22         "regexp"
23         "runtime"
24         "strconv"
25         "strings"
26         "syscall"
27         "time"
28 )
29
30 // MakeRESTRouter returns a new mux.Router that forwards all Keep
31 // requests to the appropriate handlers.
32 //
33 func MakeRESTRouter() *mux.Router {
34         rest := mux.NewRouter()
35
36         rest.HandleFunc(
37                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
38         rest.HandleFunc(
39                 `/{hash:[0-9a-f]{32}}+{hints}`,
40                 GetBlockHandler).Methods("GET", "HEAD")
41
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
43         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
44
45         // For IndexHandler we support:
46         //   /index           - returns all locators
47         //   /index/{prefix}  - returns all locators that begin with {prefix}
48         //      {prefix} is a string of hexadecimal digits between 0 and 32 digits.
49         //      If {prefix} is the empty string, return an index of all locators
50         //      (so /index and /index/ behave identically)
51         //      A client may supply a full 32-digit locator string, in which
52         //      case the server will return an index with either zero or one
53         //      entries. This usage allows a client to check whether a block is
54         //      present, and its size and upload time, without retrieving the
55         //      entire block.
56         //
57         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
58         rest.HandleFunc(
59                 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
60         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
61
62         // The PullHandler processes "PUT /pull" commands from Data Manager.
63         // It parses the JSON list of pull requests in the request body, and
64         // delivers them to the pull list manager for replication.
65         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
66
67         // Any request which does not match any of these routes gets
68         // 400 Bad Request.
69         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
70
71         return rest
72 }
73
74 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
75         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
76 }
77
78 // FindKeepVolumes scans all mounted volumes on the system for Keep
79 // volumes, and returns a list of matching paths.
80 //
81 // A device is assumed to be a Keep volume if it is a normal or tmpfs
82 // volume and has a "/keep" directory directly underneath the mount
83 // point.
84 //
85 func FindKeepVolumes() []string {
86         vols := make([]string, 0)
87
88         if f, err := os.Open(PROC_MOUNTS); err != nil {
89                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
90         } else {
91                 scanner := bufio.NewScanner(f)
92                 for scanner.Scan() {
93                         args := strings.Fields(scanner.Text())
94                         dev, mount := args[0], args[1]
95                         if mount != "/" &&
96                                 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
97                                 keep := mount + "/keep"
98                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
99                                         vols = append(vols, keep)
100                                 }
101                         }
102                 }
103                 if err := scanner.Err(); err != nil {
104                         log.Fatal(err)
105                 }
106         }
107         return vols
108 }
109
110 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
111         hash := mux.Vars(req)["hash"]
112
113         hints := mux.Vars(req)["hints"]
114
115         // Parse the locator string and hints from the request.
116         // TODO(twp): implement a Locator type.
117         var signature, timestamp string
118         if hints != "" {
119                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
120                 for _, hint := range strings.Split(hints, "+") {
121                         if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
122                                 // Server ignores size hints
123                         } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
124                                 signature = m[1]
125                                 timestamp = m[2]
126                         } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
127                                 // Any unknown hint that starts with an uppercase letter is
128                                 // presumed to be valid and ignored, to permit forward compatibility.
129                         } else {
130                                 // Unknown format; not a valid locator.
131                                 log.Printf("%s %s %d %s", req.Method, hash, BadRequestError.HTTPCode, "-")
132                                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
133                                 return
134                         }
135                 }
136         }
137
138         // If permission checking is in effect, verify this
139         // request's permission signature.
140         if enforce_permissions {
141                 if signature == "" || timestamp == "" {
142                         log.Printf("%s %s %d %s", req.Method, hash, PermissionError.HTTPCode, "-")
143                         http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
144                         return
145                 } else if IsExpired(timestamp) {
146                         log.Printf("%s %s %d %s", req.Method, hash, ExpiredError.HTTPCode, "-")
147                         http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
148                         return
149                 } else {
150                         req_locator := req.URL.Path[1:] // strip leading slash
151                         if !VerifySignature(req_locator, GetApiToken(req)) {
152                                 log.Printf("%s %s %d %s", req.Method, hash, PermissionError.HTTPCode, "-")
153                                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
154                                 return
155                         }
156                 }
157         }
158
159         block, err := GetBlock(hash, false)
160
161         // Garbage collect after each GET. Fixes #2865.
162         // TODO(twp): review Keep memory usage and see if there's
163         // a better way to do this than blindly garbage collecting
164         // after every block.
165         defer runtime.GC()
166
167         if err != nil {
168                 // This type assertion is safe because the only errors
169                 // GetBlock can return are DiskHashError or NotFoundError.
170                 if err == NotFoundError {
171                         log.Printf("%s: not found, giving up\n", hash)
172                 }
173                 log.Printf("%s %s %d %s", req.Method, hash, err.(*KeepError).HTTPCode, "-")
174                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
175                 return
176         }
177
178         resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
179
180         _, err = resp.Write(block)
181         if err != nil {
182                 log.Printf("%s %s %d %s", req.Method, hash, err.(*KeepError).HTTPCode, len(block), "-")
183         } else {
184                 log.Printf("%s %s %d %d", req.Method, hash, 200, len(block))
185         }
186
187         return
188 }
189
190 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
191         // Garbage collect after each PUT. Fixes #2865.
192         // See also GetBlockHandler.
193         defer runtime.GC()
194
195         hash := mux.Vars(req)["hash"]
196
197         // Read the block data to be stored.
198         // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
199         //
200         if req.ContentLength > BLOCKSIZE {
201                 log.Printf("%s %s %d %d", req.Method, hash, TooLongError.HTTPCode, req.ContentLength)
202                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
203                 return
204         }
205
206         buf := make([]byte, req.ContentLength)
207         nread, err := io.ReadFull(req.Body, buf)
208         if err != nil {
209                 log.Printf("%s %s %d %d", req.Method, hash, 500, req.ContentLength)
210                 http.Error(resp, err.Error(), 500)
211         } else if int64(nread) < req.ContentLength {
212                 log.Printf("%s %s %d %d", req.Method, hash, 500, req.ContentLength)
213                 http.Error(resp, "request truncated", 500)
214         } else {
215                 if err := PutBlock(buf, hash); err == nil {
216                         // Success; add a size hint, sign the locator if
217                         // possible, and return it to the client.
218                         return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
219                         api_token := GetApiToken(req)
220                         if PermissionSecret != nil && api_token != "" {
221                                 expiry := time.Now().Add(permission_ttl)
222                                 return_hash = SignLocator(return_hash, api_token, expiry)
223                         }
224                         log.Printf("%s %s %d %d", req.Method, hash, 200, req.ContentLength)
225                         resp.Write([]byte(return_hash + "\n"))
226                 } else {
227                         ke := err.(*KeepError)
228                         log.Printf("%s %s %d %d", req.Method, hash, ke.HTTPCode, req.ContentLength)
229                         http.Error(resp, ke.Error(), ke.HTTPCode)
230                 }
231         }
232         return
233 }
234
235 // IndexHandler
236 //     A HandleFunc to address /index and /index/{prefix} requests.
237 //
238 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
239         prefix := mux.Vars(req)["prefix"]
240
241         // Only the data manager may issue /index requests,
242         // and only if enforce_permissions is enabled.
243         // All other requests return 403 Forbidden.
244         api_token := GetApiToken(req)
245         if !enforce_permissions ||
246                 api_token == "" ||
247                 data_manager_token != api_token {
248                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
249                 return
250         }
251         var index string
252         for _, vol := range KeepVM.Volumes() {
253                 index = index + vol.Index(prefix)
254         }
255         resp.Write([]byte(index))
256 }
257
258 // StatusHandler
259 //     Responds to /status.json requests with the current node status,
260 //     described in a JSON structure.
261 //
262 //     The data given in a status.json response includes:
263 //        volumes - a list of Keep volumes currently in use by this server
264 //          each volume is an object with the following fields:
265 //            * mount_point
266 //            * device_num (an integer identifying the underlying filesystem)
267 //            * bytes_free
268 //            * bytes_used
269 //
270 type VolumeStatus struct {
271         MountPoint string `json:"mount_point"`
272         DeviceNum  uint64 `json:"device_num"`
273         BytesFree  uint64 `json:"bytes_free"`
274         BytesUsed  uint64 `json:"bytes_used"`
275 }
276
277 type NodeStatus struct {
278         Volumes []*VolumeStatus `json:"volumes"`
279 }
280
281 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
282         st := GetNodeStatus()
283         if jstat, err := json.Marshal(st); err == nil {
284                 resp.Write(jstat)
285         } else {
286                 log.Printf("json.Marshal: %s\n", err)
287                 log.Printf("NodeStatus = %v\n", st)
288                 http.Error(resp, err.Error(), 500)
289         }
290 }
291
292 // GetNodeStatus
293 //     Returns a NodeStatus struct describing this Keep
294 //     node's current status.
295 //
296 func GetNodeStatus() *NodeStatus {
297         st := new(NodeStatus)
298
299         st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
300         for i, vol := range KeepVM.Volumes() {
301                 st.Volumes[i] = vol.Status()
302         }
303         return st
304 }
305
306 // GetVolumeStatus
307 //     Returns a VolumeStatus describing the requested volume.
308 //
309 func GetVolumeStatus(volume string) *VolumeStatus {
310         var fs syscall.Statfs_t
311         var devnum uint64
312
313         if fi, err := os.Stat(volume); err == nil {
314                 devnum = fi.Sys().(*syscall.Stat_t).Dev
315         } else {
316                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
317                 return nil
318         }
319
320         err := syscall.Statfs(volume, &fs)
321         if err != nil {
322                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
323                 return nil
324         }
325         // These calculations match the way df calculates disk usage:
326         // "free" space is measured by fs.Bavail, but "used" space
327         // uses fs.Blocks - fs.Bfree.
328         free := fs.Bavail * uint64(fs.Bsize)
329         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
330         return &VolumeStatus{volume, devnum, free, used}
331 }
332
333 // DeleteHandler processes DELETE requests.
334 //
335 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
336 // from all connected volumes.
337 //
338 // Only the Data Manager, or an Arvados admin with scope "all", are
339 // allowed to issue DELETE requests.  If a DELETE request is not
340 // authenticated or is issued by a non-admin user, the server returns
341 // a PermissionError.
342 //
343 // Upon receiving a valid request from an authorized user,
344 // DeleteHandler deletes all copies of the specified block on local
345 // writable volumes.
346 //
347 // Response format:
348 //
349 // If the requested blocks was not found on any volume, the response
350 // code is HTTP 404 Not Found.
351 //
352 // Otherwise, the response code is 200 OK, with a response body
353 // consisting of the JSON message
354 //
355 //    {"copies_deleted":d,"copies_failed":f}
356 //
357 // where d and f are integers representing the number of blocks that
358 // were successfully and unsuccessfully deleted.
359 //
360 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
361         hash := mux.Vars(req)["hash"]
362         log.Printf("%s %s", req.Method, hash)
363
364         // Confirm that this user is an admin and has a token with unlimited scope.
365         var tok = GetApiToken(req)
366         if tok == "" || !CanDelete(tok) {
367                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
368                 return
369         }
370
371         if never_delete {
372                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
373                 return
374         }
375
376         // Delete copies of this block from all available volumes.  Report
377         // how many blocks were successfully and unsuccessfully
378         // deleted.
379         var result struct {
380                 Deleted int `json:"copies_deleted"`
381                 Failed  int `json:"copies_failed"`
382         }
383         for _, vol := range KeepVM.Volumes() {
384                 if err := vol.Delete(hash); err == nil {
385                         result.Deleted++
386                 } else if os.IsNotExist(err) {
387                         continue
388                 } else {
389                         result.Failed++
390                         log.Println("DeleteHandler:", err)
391                 }
392         }
393
394         var st int
395
396         if result.Deleted == 0 && result.Failed == 0 {
397                 st = http.StatusNotFound
398         } else {
399                 st = http.StatusOK
400         }
401
402         resp.WriteHeader(st)
403
404         if st == http.StatusOK {
405                 if body, err := json.Marshal(result); err == nil {
406                         resp.Write(body)
407                 } else {
408                         log.Printf("json.Marshal: %s (result = %v)\n", err, result)
409                         http.Error(resp, err.Error(), 500)
410                 }
411         }
412 }
413
414 /* PullHandler processes "PUT /pull" requests for the data manager.
415    The request body is a JSON message containing a list of pull
416    requests in the following format:
417
418    [
419       {
420          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
421          "servers":[
422                         "keep0.qr1hi.arvadosapi.com:25107",
423                         "keep1.qr1hi.arvadosapi.com:25108"
424                  ]
425           },
426           {
427                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
428                  "servers":[
429                         "10.0.1.5:25107",
430                         "10.0.1.6:25107",
431                         "10.0.1.7:25108"
432                  ]
433           },
434           ...
435    ]
436
437    Each pull request in the list consists of a block locator string
438    and an ordered list of servers.  Keepstore should try to fetch the
439    block from each server in turn.
440
441    If the request has not been sent by the Data Manager, return 401
442    Unauthorized.
443
444    If the JSON unmarshalling fails, return 400 Bad Request.
445 */
446
447 type PullRequest struct {
448         Locator string   `json:"locator"`
449         Servers []string `json:"servers"`
450 }
451
452 func PullHandler(resp http.ResponseWriter, req *http.Request) {
453         // Reject unauthorized requests.
454         api_token := GetApiToken(req)
455         if !IsDataManagerToken(api_token) {
456                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
457                 log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
458                 return
459         }
460
461         // Parse the request body.
462         var pr []PullRequest
463         r := json.NewDecoder(req.Body)
464         if err := r.Decode(&pr); err != nil {
465                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
466                 log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
467                 return
468         }
469
470         // We have a properly formatted pull list sent from the data
471         // manager.  Report success and send the list to the pull list
472         // manager for further handling.
473         log.Printf("%s %s: received %v\n", req.Method, req.URL, pr)
474         resp.WriteHeader(http.StatusOK)
475         resp.Write([]byte(
476                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
477
478         plist := list.New()
479         for _, p := range pr {
480                 plist.PushBack(p)
481         }
482
483         if pullq == nil {
484                 pullq = NewWorkQueue()
485         }
486         pullq.ReplaceQueue(plist)
487 }
488
489 // ==============================
490 // GetBlock and PutBlock implement lower-level code for handling
491 // blocks by rooting through volumes connected to the local machine.
492 // Once the handler has determined that system policy permits the
493 // request, it calls these methods to perform the actual operation.
494 //
495 // TODO(twp): this code would probably be better located in the
496 // VolumeManager interface. As an abstraction, the VolumeManager
497 // should be the only part of the code that cares about which volume a
498 // block is stored on, so it should be responsible for figuring out
499 // which volume to check for fetching blocks, storing blocks, etc.
500
501 // ==============================
502 // GetBlock fetches and returns the block identified by "hash".  If
503 // the update_timestamp argument is true, GetBlock also updates the
504 // block's file modification time (for the sake of PutBlock, which
505 // must update the file's timestamp when the block already exists).
506 //
507 // On success, GetBlock returns a byte slice with the block data, and
508 // a nil error.
509 //
510 // If the block cannot be found on any volume, returns NotFoundError.
511 //
512 // If the block found does not have the correct MD5 hash, returns
513 // DiskHashError.
514 //
515
516 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
517         // Attempt to read the requested hash from a keep volume.
518         error_to_caller := NotFoundError
519
520         for _, vol := range KeepVM.Volumes() {
521                 if buf, err := vol.Get(hash); err != nil {
522                         // IsNotExist is an expected error and may be ignored.
523                         // (If all volumes report IsNotExist, we return a NotFoundError)
524                         // All other errors should be logged but we continue trying to
525                         // read.
526                         switch {
527                         case os.IsNotExist(err):
528                                 continue
529                         default:
530                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
531                         }
532                 } else {
533                         // Double check the file checksum.
534                         //
535                         filehash := fmt.Sprintf("%x", md5.Sum(buf))
536                         if filehash != hash {
537                                 // TODO(twp): this condition probably represents a bad disk and
538                                 // should raise major alarm bells for an administrator: e.g.
539                                 // they should be sent directly to an event manager at high
540                                 // priority or logged as urgent problems.
541                                 //
542                                 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
543                                         vol, hash, filehash)
544                                 error_to_caller = DiskHashError
545                         } else {
546                                 // Success!
547                                 if error_to_caller != NotFoundError {
548                                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
549                                                 vol, hash)
550                                 }
551                                 // Update the timestamp if the caller requested.
552                                 // If we could not update the timestamp, continue looking on
553                                 // other volumes.
554                                 if update_timestamp {
555                                         if vol.Touch(hash) != nil {
556                                                 continue
557                                         }
558                                 }
559                                 return buf, nil
560                         }
561                 }
562         }
563
564         if error_to_caller != NotFoundError {
565                 log.Printf("%s: checksum mismatch, no good copy found\n", hash)
566         }
567         return nil, error_to_caller
568 }
569
570 /* PutBlock(block, hash)
571    Stores the BLOCK (identified by the content id HASH) in Keep.
572
573    The MD5 checksum of the block must be identical to the content id HASH.
574    If not, an error is returned.
575
576    PutBlock stores the BLOCK on the first Keep volume with free space.
577    A failure code is returned to the user only if all volumes fail.
578
579    On success, PutBlock returns nil.
580    On failure, it returns a KeepError with one of the following codes:
581
582    500 Collision
583           A different block with the same hash already exists on this
584           Keep server.
585    422 MD5Fail
586           The MD5 hash of the BLOCK does not match the argument HASH.
587    503 Full
588           There was not enough space left in any Keep volume to store
589           the object.
590    500 Fail
591           The object could not be stored for some other reason (e.g.
592           all writes failed). The text of the error message should
593           provide as much detail as possible.
594 */
595
596 func PutBlock(block []byte, hash string) error {
597         // Check that BLOCK's checksum matches HASH.
598         blockhash := fmt.Sprintf("%x", md5.Sum(block))
599         if blockhash != hash {
600                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
601                 return RequestHashError
602         }
603
604         // If we already have a block on disk under this identifier, return
605         // success (but check for MD5 collisions).  While fetching the block,
606         // update its timestamp.
607         // The only errors that GetBlock can return are DiskHashError and NotFoundError.
608         // In either case, we want to write our new (good) block to disk,
609         // so there is nothing special to do if err != nil.
610         //
611         if oldblock, err := GetBlock(hash, true); err == nil {
612                 if bytes.Compare(block, oldblock) == 0 {
613                         // The block already exists; return success.
614                         return nil
615                 } else {
616                         return CollisionError
617                 }
618         }
619
620         // Choose a Keep volume to write to.
621         // If this volume fails, try all of the volumes in order.
622         vol := KeepVM.Choose()
623         if err := vol.Put(hash, block); err == nil {
624                 return nil // success!
625         } else {
626                 allFull := true
627                 for _, vol := range KeepVM.Volumes() {
628                         err := vol.Put(hash, block)
629                         if err == nil {
630                                 return nil // success!
631                         }
632                         if err != FullError {
633                                 // The volume is not full but the write did not succeed.
634                                 // Report the error and continue trying.
635                                 allFull = false
636                                 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
637                         }
638                 }
639
640                 if allFull {
641                         log.Printf("all Keep volumes full")
642                         return FullError
643                 } else {
644                         log.Printf("all Keep volumes failed")
645                         return GenericError
646                 }
647         }
648 }
649
650 // IsValidLocator
651 //     Return true if the specified string is a valid Keep locator.
652 //     When Keep is extended to support hash types other than MD5,
653 //     this should be updated to cover those as well.
654 //
655 func IsValidLocator(loc string) bool {
656         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
657         if err == nil {
658                 return match
659         }
660         log.Printf("IsValidLocator: %s\n", err)
661         return false
662 }
663
664 // GetApiToken returns the OAuth2 token from the Authorization
665 // header of a HTTP request, or an empty string if no matching
666 // token is found.
667 func GetApiToken(req *http.Request) string {
668         if auth, ok := req.Header["Authorization"]; ok {
669                 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
670                         log.Println(err)
671                 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
672                         return match[1]
673                 }
674         }
675         return ""
676 }
677
678 // IsExpired returns true if the given Unix timestamp (expressed as a
679 // hexadecimal string) is in the past, or if timestamp_hex cannot be
680 // parsed as a hexadecimal string.
681 func IsExpired(timestamp_hex string) bool {
682         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
683         if err != nil {
684                 log.Printf("IsExpired: %s\n", err)
685                 return true
686         }
687         return time.Unix(ts, 0).Before(time.Now())
688 }
689
690 // CanDelete returns true if the user identified by api_token is
691 // allowed to delete blocks.
692 func CanDelete(api_token string) bool {
693         if api_token == "" {
694                 return false
695         }
696         // Blocks may be deleted only when Keep has been configured with a
697         // data manager.
698         if IsDataManagerToken(api_token) {
699                 return true
700         }
701         // TODO(twp): look up api_token with the API server
702         // return true if is_admin is true and if the token
703         // has unlimited scope
704         return false
705 }
706
707 // IsDataManagerToken returns true if api_token represents the data
708 // manager's token.
709 func IsDataManagerToken(api_token string) bool {
710         return data_manager_token != "" && api_token == data_manager_token
711 }