8178: (for now) all volumes must return ErrNotImplemented if trash-lifetime != 0
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "container/list"
12         "crypto/md5"
13         "encoding/json"
14         "fmt"
15         "github.com/gorilla/mux"
16         "io"
17         "log"
18         "net/http"
19         "os"
20         "regexp"
21         "runtime"
22         "strconv"
23         "strings"
24         "sync"
25         "time"
26 )
27
28 // MakeRESTRouter returns a new mux.Router that forwards all Keep
29 // requests to the appropriate handlers.
30 //
31 func MakeRESTRouter() *mux.Router {
32         rest := mux.NewRouter()
33
34         rest.HandleFunc(
35                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
36         rest.HandleFunc(
37                 `/{hash:[0-9a-f]{32}}+{hints}`,
38                 GetBlockHandler).Methods("GET", "HEAD")
39
40         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
42         // List all blocks stored here. Privileged client only.
43         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
44         // List blocks stored here whose hash has the given prefix.
45         // Privileged client only.
46         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
47
48         // List volumes: path, device number, bytes used/avail.
49         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
50
51         // Replace the current pull queue.
52         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
53
54         // Replace the current trash queue.
55         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
56
57         // Untrash moves blocks from trash back into store
58         rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
59
60         // Any request which does not match any of these routes gets
61         // 400 Bad Request.
62         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
63
64         return rest
65 }
66
67 // BadRequestHandler is a HandleFunc to address bad requests.
68 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
69         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
70 }
71
72 // GetBlockHandler is a HandleFunc to address Get block requests.
73 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
74         if enforcePermissions {
75                 locator := req.URL.Path[1:] // strip leading slash
76                 if err := VerifySignature(locator, GetApiToken(req)); err != nil {
77                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
78                         return
79                 }
80         }
81
82         block, err := GetBlock(mux.Vars(req)["hash"])
83         if err != nil {
84                 // This type assertion is safe because the only errors
85                 // GetBlock can return are DiskHashError or NotFoundError.
86                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
87                 return
88         }
89         defer bufs.Put(block)
90
91         resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
92         resp.Header().Set("Content-Type", "application/octet-stream")
93         resp.Write(block)
94 }
95
96 // PutBlockHandler is a HandleFunc to address Put block requests.
97 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
98         hash := mux.Vars(req)["hash"]
99
100         // Detect as many error conditions as possible before reading
101         // the body: avoid transmitting data that will not end up
102         // being written anyway.
103
104         if req.ContentLength == -1 {
105                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
106                 return
107         }
108
109         if req.ContentLength > BlockSize {
110                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
111                 return
112         }
113
114         if len(KeepVM.AllWritable()) == 0 {
115                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
116                 return
117         }
118
119         buf := bufs.Get(int(req.ContentLength))
120         _, err := io.ReadFull(req.Body, buf)
121         if err != nil {
122                 http.Error(resp, err.Error(), 500)
123                 bufs.Put(buf)
124                 return
125         }
126
127         replication, err := PutBlock(buf, hash)
128         bufs.Put(buf)
129
130         if err != nil {
131                 ke := err.(*KeepError)
132                 http.Error(resp, ke.Error(), ke.HTTPCode)
133                 return
134         }
135
136         // Success; add a size hint, sign the locator if possible, and
137         // return it to the client.
138         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
139         apiToken := GetApiToken(req)
140         if PermissionSecret != nil && apiToken != "" {
141                 expiry := time.Now().Add(blobSignatureTTL)
142                 returnHash = SignLocator(returnHash, apiToken, expiry)
143         }
144         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
145         resp.Write([]byte(returnHash + "\n"))
146 }
147
148 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
149 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
150         // Reject unauthorized requests.
151         if !IsDataManagerToken(GetApiToken(req)) {
152                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
153                 return
154         }
155
156         prefix := mux.Vars(req)["prefix"]
157
158         for _, vol := range KeepVM.AllReadable() {
159                 if err := vol.IndexTo(prefix, resp); err != nil {
160                         // The only errors returned by IndexTo are
161                         // write errors returned by resp.Write(),
162                         // which probably means the client has
163                         // disconnected and this error will never be
164                         // reported to the client -- but it will
165                         // appear in our own error log.
166                         http.Error(resp, err.Error(), http.StatusInternalServerError)
167                         return
168                 }
169         }
170         // An empty line at EOF is the only way the client can be
171         // assured the entire index was received.
172         resp.Write([]byte{'\n'})
173 }
174
175 // StatusHandler
176 //     Responds to /status.json requests with the current node status,
177 //     described in a JSON structure.
178 //
179 //     The data given in a status.json response includes:
180 //        volumes - a list of Keep volumes currently in use by this server
181 //          each volume is an object with the following fields:
182 //            * mount_point
183 //            * device_num (an integer identifying the underlying filesystem)
184 //            * bytes_free
185 //            * bytes_used
186
187 // PoolStatus struct
188 type PoolStatus struct {
189         Alloc uint64 `json:"BytesAllocated"`
190         Cap   int    `json:"BuffersMax"`
191         Len   int    `json:"BuffersInUse"`
192 }
193
194 // NodeStatus struct
195 type NodeStatus struct {
196         Volumes    []*VolumeStatus `json:"volumes"`
197         BufferPool PoolStatus
198         PullQueue  WorkQueueStatus
199         TrashQueue WorkQueueStatus
200         Memory     runtime.MemStats
201 }
202
203 var st NodeStatus
204 var stLock sync.Mutex
205
206 // StatusHandler addresses /status.json requests.
207 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
208         stLock.Lock()
209         readNodeStatus(&st)
210         jstat, err := json.Marshal(&st)
211         stLock.Unlock()
212         if err == nil {
213                 resp.Write(jstat)
214         } else {
215                 log.Printf("json.Marshal: %s", err)
216                 log.Printf("NodeStatus = %v", &st)
217                 http.Error(resp, err.Error(), 500)
218         }
219 }
220
221 // populate the given NodeStatus struct with current values.
222 func readNodeStatus(st *NodeStatus) {
223         vols := KeepVM.AllReadable()
224         if cap(st.Volumes) < len(vols) {
225                 st.Volumes = make([]*VolumeStatus, len(vols))
226         }
227         st.Volumes = st.Volumes[:0]
228         for _, vol := range vols {
229                 if s := vol.Status(); s != nil {
230                         st.Volumes = append(st.Volumes, s)
231                 }
232         }
233         st.BufferPool.Alloc = bufs.Alloc()
234         st.BufferPool.Cap = bufs.Cap()
235         st.BufferPool.Len = bufs.Len()
236         st.PullQueue = getWorkQueueStatus(pullq)
237         st.TrashQueue = getWorkQueueStatus(trashq)
238         runtime.ReadMemStats(&st.Memory)
239 }
240
241 // return a WorkQueueStatus for the given queue. If q is nil (which
242 // should never happen except in test suites), return a zero status
243 // value instead of crashing.
244 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
245         if q == nil {
246                 // This should only happen during tests.
247                 return WorkQueueStatus{}
248         }
249         return q.Status()
250 }
251
252 // DeleteHandler processes DELETE requests.
253 //
254 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
255 // from all connected volumes.
256 //
257 // Only the Data Manager, or an Arvados admin with scope "all", are
258 // allowed to issue DELETE requests.  If a DELETE request is not
259 // authenticated or is issued by a non-admin user, the server returns
260 // a PermissionError.
261 //
262 // Upon receiving a valid request from an authorized user,
263 // DeleteHandler deletes all copies of the specified block on local
264 // writable volumes.
265 //
266 // Response format:
267 //
268 // If the requested blocks was not found on any volume, the response
269 // code is HTTP 404 Not Found.
270 //
271 // Otherwise, the response code is 200 OK, with a response body
272 // consisting of the JSON message
273 //
274 //    {"copies_deleted":d,"copies_failed":f}
275 //
276 // where d and f are integers representing the number of blocks that
277 // were successfully and unsuccessfully deleted.
278 //
279 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
280         hash := mux.Vars(req)["hash"]
281
282         // Confirm that this user is an admin and has a token with unlimited scope.
283         var tok = GetApiToken(req)
284         if tok == "" || !CanDelete(tok) {
285                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
286                 return
287         }
288
289         if neverDelete {
290                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
291                 return
292         }
293
294         // Delete copies of this block from all available volumes.
295         // Report how many blocks were successfully deleted, and how
296         // many were found on writable volumes but not deleted.
297         var result struct {
298                 Deleted int `json:"copies_deleted"`
299                 Failed  int `json:"copies_failed"`
300         }
301         for _, vol := range KeepVM.AllWritable() {
302                 if err := vol.Trash(hash); err == nil {
303                         result.Deleted++
304                 } else if os.IsNotExist(err) {
305                         continue
306                 } else {
307                         result.Failed++
308                         log.Println("DeleteHandler:", err)
309                 }
310         }
311
312         var st int
313
314         if result.Deleted == 0 && result.Failed == 0 {
315                 st = http.StatusNotFound
316         } else {
317                 st = http.StatusOK
318         }
319
320         resp.WriteHeader(st)
321
322         if st == http.StatusOK {
323                 if body, err := json.Marshal(result); err == nil {
324                         resp.Write(body)
325                 } else {
326                         log.Printf("json.Marshal: %s (result = %v)", err, result)
327                         http.Error(resp, err.Error(), 500)
328                 }
329         }
330 }
331
332 /* PullHandler processes "PUT /pull" requests for the data manager.
333    The request body is a JSON message containing a list of pull
334    requests in the following format:
335
336    [
337       {
338          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
339          "servers":[
340                         "keep0.qr1hi.arvadosapi.com:25107",
341                         "keep1.qr1hi.arvadosapi.com:25108"
342                  ]
343           },
344           {
345                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
346                  "servers":[
347                         "10.0.1.5:25107",
348                         "10.0.1.6:25107",
349                         "10.0.1.7:25108"
350                  ]
351           },
352           ...
353    ]
354
355    Each pull request in the list consists of a block locator string
356    and an ordered list of servers.  Keepstore should try to fetch the
357    block from each server in turn.
358
359    If the request has not been sent by the Data Manager, return 401
360    Unauthorized.
361
362    If the JSON unmarshalling fails, return 400 Bad Request.
363 */
364
365 // PullRequest consists of a block locator and an ordered list of servers
366 type PullRequest struct {
367         Locator string   `json:"locator"`
368         Servers []string `json:"servers"`
369 }
370
371 // PullHandler processes "PUT /pull" requests for the data manager.
372 func PullHandler(resp http.ResponseWriter, req *http.Request) {
373         // Reject unauthorized requests.
374         if !IsDataManagerToken(GetApiToken(req)) {
375                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
376                 return
377         }
378
379         // Parse the request body.
380         var pr []PullRequest
381         r := json.NewDecoder(req.Body)
382         if err := r.Decode(&pr); err != nil {
383                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
384                 return
385         }
386
387         // We have a properly formatted pull list sent from the data
388         // manager.  Report success and send the list to the pull list
389         // manager for further handling.
390         resp.WriteHeader(http.StatusOK)
391         resp.Write([]byte(
392                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
393
394         plist := list.New()
395         for _, p := range pr {
396                 plist.PushBack(p)
397         }
398         pullq.ReplaceQueue(plist)
399 }
400
401 // TrashRequest consists of a block locator and it's Mtime
402 type TrashRequest struct {
403         Locator    string `json:"locator"`
404         BlockMtime int64  `json:"block_mtime"`
405 }
406
407 // TrashHandler processes /trash requests.
408 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
409         // Reject unauthorized requests.
410         if !IsDataManagerToken(GetApiToken(req)) {
411                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
412                 return
413         }
414
415         // Parse the request body.
416         var trash []TrashRequest
417         r := json.NewDecoder(req.Body)
418         if err := r.Decode(&trash); err != nil {
419                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
420                 return
421         }
422
423         // We have a properly formatted trash list sent from the data
424         // manager.  Report success and send the list to the trash work
425         // queue for further handling.
426         resp.WriteHeader(http.StatusOK)
427         resp.Write([]byte(
428                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
429
430         tlist := list.New()
431         for _, t := range trash {
432                 tlist.PushBack(t)
433         }
434         trashq.ReplaceQueue(tlist)
435 }
436
437 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
438 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
439         // Reject unauthorized requests.
440         if !IsDataManagerToken(GetApiToken(req)) {
441                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
442                 return
443         }
444
445         hash := mux.Vars(req)["hash"]
446
447         if len(KeepVM.AllWritable()) == 0 {
448                 http.Error(resp, "No writable volumes", http.StatusNotFound)
449                 return
450         }
451
452         var untrashedOn, failedOn []string
453         var numNotFound int
454         for _, vol := range KeepVM.AllWritable() {
455                 err := vol.Untrash(hash)
456                 if err == nil || err == ErrNotImplemented {
457                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
458                         untrashedOn = append(untrashedOn, vol.String())
459                 } else {
460                         if os.IsNotExist(err) {
461                                 numNotFound++
462                         } else {
463                                 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
464                                 failedOn = append(failedOn, vol.String())
465                         }
466                 }
467         }
468
469         if numNotFound == len(KeepVM.AllWritable()) {
470                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
471                 return
472         }
473
474         if len(failedOn) == len(KeepVM.AllWritable()) {
475                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
476         } else {
477                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
478                 if len(failedOn) > 0 {
479                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
480                 }
481                 resp.Write([]byte(respBody))
482         }
483 }
484
485 // ==============================
486 // GetBlock and PutBlock implement lower-level code for handling
487 // blocks by rooting through volumes connected to the local machine.
488 // Once the handler has determined that system policy permits the
489 // request, it calls these methods to perform the actual operation.
490 //
491 // TODO(twp): this code would probably be better located in the
492 // VolumeManager interface. As an abstraction, the VolumeManager
493 // should be the only part of the code that cares about which volume a
494 // block is stored on, so it should be responsible for figuring out
495 // which volume to check for fetching blocks, storing blocks, etc.
496 // ==============================
497
498 // GetBlock fetches and returns the block identified by "hash".
499 //
500 // On success, GetBlock returns a byte slice with the block data, and
501 // a nil error.
502 //
503 // If the block cannot be found on any volume, returns NotFoundError.
504 //
505 // If the block found does not have the correct MD5 hash, returns
506 // DiskHashError.
507 //
508 func GetBlock(hash string) ([]byte, error) {
509         // Attempt to read the requested hash from a keep volume.
510         errorToCaller := NotFoundError
511
512         for _, vol := range KeepVM.AllReadable() {
513                 buf, err := vol.Get(hash)
514                 if err != nil {
515                         // IsNotExist is an expected error and may be
516                         // ignored. All other errors are logged. In
517                         // any case we continue trying to read other
518                         // volumes. If all volumes report IsNotExist,
519                         // we return a NotFoundError.
520                         if !os.IsNotExist(err) {
521                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
522                         }
523                         continue
524                 }
525                 // Check the file checksum.
526                 //
527                 filehash := fmt.Sprintf("%x", md5.Sum(buf))
528                 if filehash != hash {
529                         // TODO: Try harder to tell a sysadmin about
530                         // this.
531                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
532                                 vol, hash, filehash)
533                         errorToCaller = DiskHashError
534                         bufs.Put(buf)
535                         continue
536                 }
537                 if errorToCaller == DiskHashError {
538                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
539                                 vol, hash)
540                 }
541                 return buf, nil
542         }
543         return nil, errorToCaller
544 }
545
546 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
547 //
548 // PutBlock(block, hash)
549 //   Stores the BLOCK (identified by the content id HASH) in Keep.
550 //
551 //   The MD5 checksum of the block must be identical to the content id HASH.
552 //   If not, an error is returned.
553 //
554 //   PutBlock stores the BLOCK on the first Keep volume with free space.
555 //   A failure code is returned to the user only if all volumes fail.
556 //
557 //   On success, PutBlock returns nil.
558 //   On failure, it returns a KeepError with one of the following codes:
559 //
560 //   500 Collision
561 //          A different block with the same hash already exists on this
562 //          Keep server.
563 //   422 MD5Fail
564 //          The MD5 hash of the BLOCK does not match the argument HASH.
565 //   503 Full
566 //          There was not enough space left in any Keep volume to store
567 //          the object.
568 //   500 Fail
569 //          The object could not be stored for some other reason (e.g.
570 //          all writes failed). The text of the error message should
571 //          provide as much detail as possible.
572 //
573 func PutBlock(block []byte, hash string) (int, error) {
574         // Check that BLOCK's checksum matches HASH.
575         blockhash := fmt.Sprintf("%x", md5.Sum(block))
576         if blockhash != hash {
577                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
578                 return 0, RequestHashError
579         }
580
581         // If we already have this data, it's intact on disk, and we
582         // can update its timestamp, return success. If we have
583         // different data with the same hash, return failure.
584         if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
585                 return n, err
586         }
587
588         // Choose a Keep volume to write to.
589         // If this volume fails, try all of the volumes in order.
590         if vol := KeepVM.NextWritable(); vol != nil {
591                 if err := vol.Put(hash, block); err == nil {
592                         return vol.Replication(), nil // success!
593                 }
594         }
595
596         writables := KeepVM.AllWritable()
597         if len(writables) == 0 {
598                 log.Print("No writable volumes.")
599                 return 0, FullError
600         }
601
602         allFull := true
603         for _, vol := range writables {
604                 err := vol.Put(hash, block)
605                 if err == nil {
606                         return vol.Replication(), nil // success!
607                 }
608                 if err != FullError {
609                         // The volume is not full but the
610                         // write did not succeed.  Report the
611                         // error and continue trying.
612                         allFull = false
613                         log.Printf("%s: Write(%s): %s", vol, hash, err)
614                 }
615         }
616
617         if allFull {
618                 log.Print("All volumes are full.")
619                 return 0, FullError
620         }
621         // Already logged the non-full errors.
622         return 0, GenericError
623 }
624
625 // CompareAndTouch returns the current replication level if one of the
626 // volumes already has the given content and it successfully updates
627 // the relevant block's modification time in order to protect it from
628 // premature garbage collection. Otherwise, it returns a non-nil
629 // error.
630 func CompareAndTouch(hash string, buf []byte) (int, error) {
631         var bestErr error = NotFoundError
632         for _, vol := range KeepVM.AllWritable() {
633                 if err := vol.Compare(hash, buf); err == CollisionError {
634                         // Stop if we have a block with same hash but
635                         // different content. (It will be impossible
636                         // to tell which one is wanted if we have
637                         // both, so there's no point writing it even
638                         // on a different volume.)
639                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
640                         return 0, err
641                 } else if os.IsNotExist(err) {
642                         // Block does not exist. This is the only
643                         // "normal" error: we don't log anything.
644                         continue
645                 } else if err != nil {
646                         // Couldn't open file, data is corrupt on
647                         // disk, etc.: log this abnormal condition,
648                         // and try the next volume.
649                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
650                         continue
651                 }
652                 if err := vol.Touch(hash); err != nil {
653                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
654                         bestErr = err
655                         continue
656                 }
657                 // Compare and Touch both worked --> done.
658                 return vol.Replication(), nil
659         }
660         return 0, bestErr
661 }
662
663 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
664
665 // IsValidLocator returns true if the specified string is a valid Keep locator.
666 //   When Keep is extended to support hash types other than MD5,
667 //   this should be updated to cover those as well.
668 //
669 func IsValidLocator(loc string) bool {
670         return validLocatorRe.MatchString(loc)
671 }
672
673 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
674
675 // GetApiToken returns the OAuth2 token from the Authorization
676 // header of a HTTP request, or an empty string if no matching
677 // token is found.
678 func GetApiToken(req *http.Request) string {
679         if auth, ok := req.Header["Authorization"]; ok {
680                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
681                         return match[1]
682                 }
683         }
684         return ""
685 }
686
687 // IsExpired returns true if the given Unix timestamp (expressed as a
688 // hexadecimal string) is in the past, or if timestampHex cannot be
689 // parsed as a hexadecimal string.
690 func IsExpired(timestampHex string) bool {
691         ts, err := strconv.ParseInt(timestampHex, 16, 0)
692         if err != nil {
693                 log.Printf("IsExpired: %s", err)
694                 return true
695         }
696         return time.Unix(ts, 0).Before(time.Now())
697 }
698
699 // CanDelete returns true if the user identified by apiToken is
700 // allowed to delete blocks.
701 func CanDelete(apiToken string) bool {
702         if apiToken == "" {
703                 return false
704         }
705         // Blocks may be deleted only when Keep has been configured with a
706         // data manager.
707         if IsDataManagerToken(apiToken) {
708                 return true
709         }
710         // TODO(twp): look up apiToken with the API server
711         // return true if is_admin is true and if the token
712         // has unlimited scope
713         return false
714 }
715
716 // IsDataManagerToken returns true if apiToken represents the data
717 // manager's token.
718 func IsDataManagerToken(apiToken string) bool {
719         return dataManagerToken != "" && apiToken == dataManagerToken
720 }