1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
7 // REST handlers for Keep are implemented here.
9 // GetBlockHandler (GET /locator)
10 // PutBlockHandler (PUT /locator)
11 // IndexHandler (GET /index, GET /index/prefix)
12 // StatusHandler (GET /status.json)
30 "github.com/gorilla/mux"
32 "git.curoverse.com/arvados.git/sdk/go/health"
33 "git.curoverse.com/arvados.git/sdk/go/httpserver"
38 limiter httpserver.RequestCounter
41 // MakeRESTRouter returns a new router that forwards all Keep requests
42 // to the appropriate handlers.
43 func MakeRESTRouter() http.Handler {
44 rtr := &router{Router: mux.NewRouter()}
47 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
49 `/{hash:[0-9a-f]{32}}+{hints}`,
50 GetBlockHandler).Methods("GET", "HEAD")
52 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
53 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
54 // List all blocks stored here. Privileged client only.
55 rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
56 // List blocks stored here whose hash has the given prefix.
57 // Privileged client only.
58 rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
60 // Internals/debugging info (runtime.MemStats)
61 rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
63 // List volumes: path, device number, bytes used/avail.
64 rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
66 // List mounts: UUID, readonly, tier, device ID, ...
67 rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
68 rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
69 rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
71 // Replace the current pull queue.
72 rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
74 // Replace the current trash queue.
75 rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
77 // Untrash moves blocks from trash back into store
78 rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
80 rtr.Handle("/_health/{check}", &health.Handler{
81 Token: theConfig.ManagementToken,
85 // Any request which does not match any of these routes gets
87 rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
89 rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
91 stack := httpserver.Instrument(nil, nil,
92 httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
93 return stack.ServeAPI(stack)
96 // BadRequestHandler is a HandleFunc to address bad requests.
97 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
98 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
101 // GetBlockHandler is a HandleFunc to address Get block requests.
102 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
103 ctx, cancel := contextForResponse(context.TODO(), resp)
106 if theConfig.RequireSignatures {
107 locator := req.URL.Path[1:] // strip leading slash
108 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
109 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
114 // TODO: Probe volumes to check whether the block _might_
115 // exist. Some volumes/types could support a quick existence
116 // check without causing other operations to suffer. If all
117 // volumes support that, and assure us the block definitely
118 // isn't here, we can return 404 now instead of waiting for a
121 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
123 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
128 size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
130 code := http.StatusInternalServerError
131 if err, ok := err.(*KeepError); ok {
134 http.Error(resp, err.Error(), code)
138 resp.Header().Set("Content-Length", strconv.Itoa(size))
139 resp.Header().Set("Content-Type", "application/octet-stream")
140 resp.Write(buf[:size])
143 // Return a new context that gets cancelled by resp's CloseNotifier.
144 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
145 ctx, cancel := context.WithCancel(parent)
146 if cn, ok := resp.(http.CloseNotifier); ok {
147 go func(c <-chan bool) {
150 theConfig.debugLogf("cancel context")
159 // Get a buffer from the pool -- but give up and return a non-nil
160 // error if ctx ends before we get a buffer.
161 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
162 bufReady := make(chan []byte)
164 bufReady <- bufs.Get(bufSize)
167 case buf := <-bufReady:
171 // Even if closeNotifier happened first, we
172 // need to keep waiting for our buf so we can
173 // return it to the pool.
176 return nil, ErrClientDisconnect
180 // PutBlockHandler is a HandleFunc to address Put block requests.
181 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
182 ctx, cancel := contextForResponse(context.TODO(), resp)
185 hash := mux.Vars(req)["hash"]
187 // Detect as many error conditions as possible before reading
188 // the body: avoid transmitting data that will not end up
189 // being written anyway.
191 if req.ContentLength == -1 {
192 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
196 if req.ContentLength > BlockSize {
197 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
201 if len(KeepVM.AllWritable()) == 0 {
202 http.Error(resp, FullError.Error(), FullError.HTTPCode)
206 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
208 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
212 _, err = io.ReadFull(req.Body, buf)
214 http.Error(resp, err.Error(), 500)
219 replication, err := PutBlock(ctx, buf, hash)
223 code := http.StatusInternalServerError
224 if err, ok := err.(*KeepError); ok {
227 http.Error(resp, err.Error(), code)
231 // Success; add a size hint, sign the locator if possible, and
232 // return it to the client.
233 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
234 apiToken := GetAPIToken(req)
235 if theConfig.blobSigningKey != nil && apiToken != "" {
236 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
237 returnHash = SignLocator(returnHash, apiToken, expiry)
239 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
240 resp.Write([]byte(returnHash + "\n"))
243 // IndexHandler responds to "/index", "/index/{prefix}", and
244 // "/mounts/{uuid}/blocks" requests.
245 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
246 if !IsSystemAuth(GetAPIToken(req)) {
247 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
251 prefix := mux.Vars(req)["prefix"]
254 prefix = req.Form.Get("prefix")
257 uuid := mux.Vars(req)["uuid"]
261 vols = KeepVM.AllReadable()
262 } else if v := KeepVM.Lookup(uuid, false); v == nil {
263 http.Error(resp, "mount not found", http.StatusNotFound)
269 for _, v := range vols {
270 if err := v.IndexTo(prefix, resp); err != nil {
271 // The only errors returned by IndexTo are
272 // write errors returned by resp.Write(),
273 // which probably means the client has
274 // disconnected and this error will never be
275 // reported to the client -- but it will
276 // appear in our own error log.
277 http.Error(resp, err.Error(), http.StatusInternalServerError)
281 // An empty line at EOF is the only way the client can be
282 // assured the entire index was received.
283 resp.Write([]byte{'\n'})
286 // MountsHandler responds to "GET /mounts" requests.
287 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
288 err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
290 http.Error(resp, err.Error(), http.StatusInternalServerError)
295 type PoolStatus struct {
296 Alloc uint64 `json:"BytesAllocatedCumulative"`
297 Cap int `json:"BuffersMax"`
298 Len int `json:"BuffersInUse"`
301 type volumeStatusEnt struct {
303 Status *VolumeStatus `json:",omitempty"`
304 VolumeStats *ioStats `json:",omitempty"`
305 InternalStats interface{} `json:",omitempty"`
309 type NodeStatus struct {
310 Volumes []*volumeStatusEnt
311 BufferPool PoolStatus
312 PullQueue WorkQueueStatus
313 TrashQueue WorkQueueStatus
320 var stLock sync.Mutex
322 // DebugHandler addresses /debug.json requests.
323 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
324 type debugStats struct {
325 MemStats runtime.MemStats
328 runtime.ReadMemStats(&ds.MemStats)
329 err := json.NewEncoder(resp).Encode(&ds)
331 http.Error(resp, err.Error(), 500)
335 // StatusHandler addresses /status.json requests.
336 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
338 rtr.readNodeStatus(&st)
339 jstat, err := json.Marshal(&st)
344 log.Printf("json.Marshal: %s", err)
345 log.Printf("NodeStatus = %v", &st)
346 http.Error(resp, err.Error(), 500)
350 // populate the given NodeStatus struct with current values.
351 func (rtr *router) readNodeStatus(st *NodeStatus) {
353 vols := KeepVM.AllReadable()
354 if cap(st.Volumes) < len(vols) {
355 st.Volumes = make([]*volumeStatusEnt, len(vols))
357 st.Volumes = st.Volumes[:0]
358 for _, vol := range vols {
359 var internalStats interface{}
360 if vol, ok := vol.(InternalStatser); ok {
361 internalStats = vol.InternalStats()
363 st.Volumes = append(st.Volumes, &volumeStatusEnt{
365 Status: vol.Status(),
366 InternalStats: internalStats,
367 //VolumeStats: KeepVM.VolumeStats(vol),
370 st.BufferPool.Alloc = bufs.Alloc()
371 st.BufferPool.Cap = bufs.Cap()
372 st.BufferPool.Len = bufs.Len()
373 st.PullQueue = getWorkQueueStatus(pullq)
374 st.TrashQueue = getWorkQueueStatus(trashq)
375 if rtr.limiter != nil {
376 st.RequestsCurrent = rtr.limiter.Current()
377 st.RequestsMax = rtr.limiter.Max()
381 // return a WorkQueueStatus for the given queue. If q is nil (which
382 // should never happen except in test suites), return a zero status
383 // value instead of crashing.
384 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
386 // This should only happen during tests.
387 return WorkQueueStatus{}
392 // DeleteHandler processes DELETE requests.
394 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
395 // from all connected volumes.
397 // Only the Data Manager, or an Arvados admin with scope "all", are
398 // allowed to issue DELETE requests. If a DELETE request is not
399 // authenticated or is issued by a non-admin user, the server returns
400 // a PermissionError.
402 // Upon receiving a valid request from an authorized user,
403 // DeleteHandler deletes all copies of the specified block on local
408 // If the requested blocks was not found on any volume, the response
409 // code is HTTP 404 Not Found.
411 // Otherwise, the response code is 200 OK, with a response body
412 // consisting of the JSON message
414 // {"copies_deleted":d,"copies_failed":f}
416 // where d and f are integers representing the number of blocks that
417 // were successfully and unsuccessfully deleted.
419 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
420 hash := mux.Vars(req)["hash"]
422 // Confirm that this user is an admin and has a token with unlimited scope.
423 var tok = GetAPIToken(req)
424 if tok == "" || !CanDelete(tok) {
425 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
429 if !theConfig.EnableDelete {
430 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
434 // Delete copies of this block from all available volumes.
435 // Report how many blocks were successfully deleted, and how
436 // many were found on writable volumes but not deleted.
438 Deleted int `json:"copies_deleted"`
439 Failed int `json:"copies_failed"`
441 for _, vol := range KeepVM.AllWritable() {
442 if err := vol.Trash(hash); err == nil {
444 } else if os.IsNotExist(err) {
448 log.Println("DeleteHandler:", err)
454 if result.Deleted == 0 && result.Failed == 0 {
455 st = http.StatusNotFound
462 if st == http.StatusOK {
463 if body, err := json.Marshal(result); err == nil {
466 log.Printf("json.Marshal: %s (result = %v)", err, result)
467 http.Error(resp, err.Error(), 500)
472 /* PullHandler processes "PUT /pull" requests for the data manager.
473 The request body is a JSON message containing a list of pull
474 requests in the following format:
478 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
480 "keep0.qr1hi.arvadosapi.com:25107",
481 "keep1.qr1hi.arvadosapi.com:25108"
485 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
495 Each pull request in the list consists of a block locator string
496 and an ordered list of servers. Keepstore should try to fetch the
497 block from each server in turn.
499 If the request has not been sent by the Data Manager, return 401
502 If the JSON unmarshalling fails, return 400 Bad Request.
505 // PullRequest consists of a block locator and an ordered list of servers
506 type PullRequest struct {
507 Locator string `json:"locator"`
508 Servers []string `json:"servers"`
510 // Destination mount, or "" for "anywhere"
511 MountUUID string `json:"mount_uuid"`
514 // PullHandler processes "PUT /pull" requests for the data manager.
515 func PullHandler(resp http.ResponseWriter, req *http.Request) {
516 // Reject unauthorized requests.
517 if !IsSystemAuth(GetAPIToken(req)) {
518 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
522 // Parse the request body.
524 r := json.NewDecoder(req.Body)
525 if err := r.Decode(&pr); err != nil {
526 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
530 // We have a properly formatted pull list sent from the data
531 // manager. Report success and send the list to the pull list
532 // manager for further handling.
533 resp.WriteHeader(http.StatusOK)
535 fmt.Sprintf("Received %d pull requests\n", len(pr))))
538 for _, p := range pr {
541 pullq.ReplaceQueue(plist)
544 // TrashRequest consists of a block locator and its Mtime
545 type TrashRequest struct {
546 Locator string `json:"locator"`
547 BlockMtime int64 `json:"block_mtime"`
549 // Target mount, or "" for "everywhere"
550 MountUUID string `json:"mount_uuid"`
553 // TrashHandler processes /trash requests.
554 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
555 // Reject unauthorized requests.
556 if !IsSystemAuth(GetAPIToken(req)) {
557 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
561 // Parse the request body.
562 var trash []TrashRequest
563 r := json.NewDecoder(req.Body)
564 if err := r.Decode(&trash); err != nil {
565 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
569 // We have a properly formatted trash list sent from the data
570 // manager. Report success and send the list to the trash work
571 // queue for further handling.
572 resp.WriteHeader(http.StatusOK)
574 fmt.Sprintf("Received %d trash requests\n", len(trash))))
577 for _, t := range trash {
580 trashq.ReplaceQueue(tlist)
583 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
584 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
585 // Reject unauthorized requests.
586 if !IsSystemAuth(GetAPIToken(req)) {
587 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
591 hash := mux.Vars(req)["hash"]
593 if len(KeepVM.AllWritable()) == 0 {
594 http.Error(resp, "No writable volumes", http.StatusNotFound)
598 var untrashedOn, failedOn []string
600 for _, vol := range KeepVM.AllWritable() {
601 err := vol.Untrash(hash)
603 if os.IsNotExist(err) {
605 } else if err != nil {
606 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
607 failedOn = append(failedOn, vol.String())
609 log.Printf("Untrashed %v on volume %v", hash, vol.String())
610 untrashedOn = append(untrashedOn, vol.String())
614 if numNotFound == len(KeepVM.AllWritable()) {
615 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
619 if len(failedOn) == len(KeepVM.AllWritable()) {
620 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
622 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
623 if len(failedOn) > 0 {
624 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
626 resp.Write([]byte(respBody))
630 // GetBlock and PutBlock implement lower-level code for handling
631 // blocks by rooting through volumes connected to the local machine.
632 // Once the handler has determined that system policy permits the
633 // request, it calls these methods to perform the actual operation.
635 // TODO(twp): this code would probably be better located in the
636 // VolumeManager interface. As an abstraction, the VolumeManager
637 // should be the only part of the code that cares about which volume a
638 // block is stored on, so it should be responsible for figuring out
639 // which volume to check for fetching blocks, storing blocks, etc.
641 // GetBlock fetches the block identified by "hash" into the provided
642 // buf, and returns the data size.
644 // If the block cannot be found on any volume, returns NotFoundError.
646 // If the block found does not have the correct MD5 hash, returns
649 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
650 // Attempt to read the requested hash from a keep volume.
651 errorToCaller := NotFoundError
653 for _, vol := range KeepVM.AllReadable() {
654 size, err := vol.Get(ctx, hash, buf)
657 return 0, ErrClientDisconnect
661 // IsNotExist is an expected error and may be
662 // ignored. All other errors are logged. In
663 // any case we continue trying to read other
664 // volumes. If all volumes report IsNotExist,
665 // we return a NotFoundError.
666 if !os.IsNotExist(err) {
667 log.Printf("%s: Get(%s): %s", vol, hash, err)
671 // Check the file checksum.
673 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
674 if filehash != hash {
675 // TODO: Try harder to tell a sysadmin about
677 log.Printf("%s: checksum mismatch for request %s (actual %s)",
679 errorToCaller = DiskHashError
682 if errorToCaller == DiskHashError {
683 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
688 return 0, errorToCaller
691 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
693 // PutBlock(ctx, block, hash)
694 // Stores the BLOCK (identified by the content id HASH) in Keep.
696 // The MD5 checksum of the block must be identical to the content id HASH.
697 // If not, an error is returned.
699 // PutBlock stores the BLOCK on the first Keep volume with free space.
700 // A failure code is returned to the user only if all volumes fail.
702 // On success, PutBlock returns nil.
703 // On failure, it returns a KeepError with one of the following codes:
706 // A different block with the same hash already exists on this
709 // The MD5 hash of the BLOCK does not match the argument HASH.
711 // There was not enough space left in any Keep volume to store
714 // The object could not be stored for some other reason (e.g.
715 // all writes failed). The text of the error message should
716 // provide as much detail as possible.
718 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
719 // Check that BLOCK's checksum matches HASH.
720 blockhash := fmt.Sprintf("%x", md5.Sum(block))
721 if blockhash != hash {
722 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
723 return 0, RequestHashError
726 // If we already have this data, it's intact on disk, and we
727 // can update its timestamp, return success. If we have
728 // different data with the same hash, return failure.
729 if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
731 } else if ctx.Err() != nil {
732 return 0, ErrClientDisconnect
735 // Choose a Keep volume to write to.
736 // If this volume fails, try all of the volumes in order.
737 if vol := KeepVM.NextWritable(); vol != nil {
738 if err := vol.Put(ctx, hash, block); err == nil {
739 return vol.Replication(), nil // success!
741 if ctx.Err() != nil {
742 return 0, ErrClientDisconnect
746 writables := KeepVM.AllWritable()
747 if len(writables) == 0 {
748 log.Print("No writable volumes.")
753 for _, vol := range writables {
754 err := vol.Put(ctx, hash, block)
755 if ctx.Err() != nil {
756 return 0, ErrClientDisconnect
759 return vol.Replication(), nil // success!
761 if err != FullError {
762 // The volume is not full but the
763 // write did not succeed. Report the
764 // error and continue trying.
766 log.Printf("%s: Write(%s): %s", vol, hash, err)
771 log.Print("All volumes are full.")
774 // Already logged the non-full errors.
775 return 0, GenericError
778 // CompareAndTouch returns the current replication level if one of the
779 // volumes already has the given content and it successfully updates
780 // the relevant block's modification time in order to protect it from
781 // premature garbage collection. Otherwise, it returns a non-nil
783 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
784 var bestErr error = NotFoundError
785 for _, vol := range KeepVM.AllWritable() {
786 err := vol.Compare(ctx, hash, buf)
787 if ctx.Err() != nil {
789 } else if err == CollisionError {
790 // Stop if we have a block with same hash but
791 // different content. (It will be impossible
792 // to tell which one is wanted if we have
793 // both, so there's no point writing it even
794 // on a different volume.)
795 log.Printf("%s: Compare(%s): %s", vol, hash, err)
797 } else if os.IsNotExist(err) {
798 // Block does not exist. This is the only
799 // "normal" error: we don't log anything.
801 } else if err != nil {
802 // Couldn't open file, data is corrupt on
803 // disk, etc.: log this abnormal condition,
804 // and try the next volume.
805 log.Printf("%s: Compare(%s): %s", vol, hash, err)
808 if err := vol.Touch(hash); err != nil {
809 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
813 // Compare and Touch both worked --> done.
814 return vol.Replication(), nil
819 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
821 // IsValidLocator returns true if the specified string is a valid Keep locator.
822 // When Keep is extended to support hash types other than MD5,
823 // this should be updated to cover those as well.
825 func IsValidLocator(loc string) bool {
826 return validLocatorRe.MatchString(loc)
829 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
831 // GetAPIToken returns the OAuth2 token from the Authorization
832 // header of a HTTP request, or an empty string if no matching
834 func GetAPIToken(req *http.Request) string {
835 if auth, ok := req.Header["Authorization"]; ok {
836 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
843 // IsExpired returns true if the given Unix timestamp (expressed as a
844 // hexadecimal string) is in the past, or if timestampHex cannot be
845 // parsed as a hexadecimal string.
846 func IsExpired(timestampHex string) bool {
847 ts, err := strconv.ParseInt(timestampHex, 16, 0)
849 log.Printf("IsExpired: %s", err)
852 return time.Unix(ts, 0).Before(time.Now())
855 // CanDelete returns true if the user identified by apiToken is
856 // allowed to delete blocks.
857 func CanDelete(apiToken string) bool {
861 // Blocks may be deleted only when Keep has been configured with a
863 if IsSystemAuth(apiToken) {
866 // TODO(twp): look up apiToken with the API server
867 // return true if is_admin is true and if the token
868 // has unlimited scope
872 // IsSystemAuth returns true if the given token is allowed to perform
873 // system level actions like deleting data.
874 func IsSystemAuth(token string) bool {
875 return token != "" && token == theConfig.systemAuthToken