1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
7 // REST handlers for Keep are implemented here.
9 // GetBlockHandler (GET /locator)
10 // PutBlockHandler (PUT /locator)
11 // IndexHandler (GET /index, GET /index/prefix)
12 // StatusHandler (GET /status.json)
30 "github.com/gorilla/mux"
32 "git.curoverse.com/arvados.git/sdk/go/health"
33 "git.curoverse.com/arvados.git/sdk/go/httpserver"
34 arvadosVersion "git.curoverse.com/arvados.git/sdk/go/version"
35 log "github.com/Sirupsen/logrus"
40 limiter httpserver.RequestCounter
43 // MakeRESTRouter returns a new router that forwards all Keep requests
44 // to the appropriate handlers.
45 func MakeRESTRouter() *router {
46 rest := mux.NewRouter()
47 rtr := &router{Router: rest}
50 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
52 `/{hash:[0-9a-f]{32}}+{hints}`,
53 GetBlockHandler).Methods("GET", "HEAD")
55 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
56 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
57 // List all blocks stored here. Privileged client only.
58 rest.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
59 // List blocks stored here whose hash has the given prefix.
60 // Privileged client only.
61 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
63 // Internals/debugging info (runtime.MemStats)
64 rest.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
66 // List volumes: path, device number, bytes used/avail.
67 rest.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
69 // List mounts: UUID, readonly, tier, device ID, ...
70 rest.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
71 rest.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
72 rest.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
74 // Replace the current pull queue.
75 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
77 // Replace the current trash queue.
78 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
80 // Untrash moves blocks from trash back into store
81 rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
83 rest.Handle("/_health/{check}", &health.Handler{
84 Token: theConfig.ManagementToken,
88 // Any request which does not match any of these routes gets
90 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
95 // BadRequestHandler is a HandleFunc to address bad requests.
96 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
97 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
100 // GetBlockHandler is a HandleFunc to address Get block requests.
101 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
102 ctx, cancel := contextForResponse(context.TODO(), resp)
105 if theConfig.RequireSignatures {
106 locator := req.URL.Path[1:] // strip leading slash
107 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
108 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
113 // TODO: Probe volumes to check whether the block _might_
114 // exist. Some volumes/types could support a quick existence
115 // check without causing other operations to suffer. If all
116 // volumes support that, and assure us the block definitely
117 // isn't here, we can return 404 now instead of waiting for a
120 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
122 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
127 size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
129 code := http.StatusInternalServerError
130 if err, ok := err.(*KeepError); ok {
133 http.Error(resp, err.Error(), code)
137 resp.Header().Set("Content-Length", strconv.Itoa(size))
138 resp.Header().Set("Content-Type", "application/octet-stream")
139 resp.Write(buf[:size])
142 // Return a new context that gets cancelled by resp's CloseNotifier.
143 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
144 ctx, cancel := context.WithCancel(parent)
145 if cn, ok := resp.(http.CloseNotifier); ok {
146 go func(c <-chan bool) {
149 theConfig.debugLogf("cancel context")
158 // Get a buffer from the pool -- but give up and return a non-nil
159 // error if ctx ends before we get a buffer.
160 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
161 bufReady := make(chan []byte)
163 bufReady <- bufs.Get(bufSize)
166 case buf := <-bufReady:
170 // Even if closeNotifier happened first, we
171 // need to keep waiting for our buf so we can
172 // return it to the pool.
175 return nil, ErrClientDisconnect
179 // PutBlockHandler is a HandleFunc to address Put block requests.
180 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
181 ctx, cancel := contextForResponse(context.TODO(), resp)
184 hash := mux.Vars(req)["hash"]
186 // Detect as many error conditions as possible before reading
187 // the body: avoid transmitting data that will not end up
188 // being written anyway.
190 if req.ContentLength == -1 {
191 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
195 if req.ContentLength > BlockSize {
196 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
200 if len(KeepVM.AllWritable()) == 0 {
201 http.Error(resp, FullError.Error(), FullError.HTTPCode)
205 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
207 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
211 _, err = io.ReadFull(req.Body, buf)
213 http.Error(resp, err.Error(), 500)
218 replication, err := PutBlock(ctx, buf, hash)
222 code := http.StatusInternalServerError
223 if err, ok := err.(*KeepError); ok {
226 http.Error(resp, err.Error(), code)
230 // Success; add a size hint, sign the locator if possible, and
231 // return it to the client.
232 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
233 apiToken := GetAPIToken(req)
234 if theConfig.blobSigningKey != nil && apiToken != "" {
235 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
236 returnHash = SignLocator(returnHash, apiToken, expiry)
238 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
239 resp.Write([]byte(returnHash + "\n"))
242 // IndexHandler responds to "/index", "/index/{prefix}", and
243 // "/mounts/{uuid}/blocks" requests.
244 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
245 if !IsSystemAuth(GetAPIToken(req)) {
246 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
250 prefix := mux.Vars(req)["prefix"]
253 prefix = req.Form.Get("prefix")
256 uuid := mux.Vars(req)["uuid"]
260 vols = KeepVM.AllReadable()
261 } else if v := KeepVM.Lookup(uuid, false); v == nil {
262 http.Error(resp, "mount not found", http.StatusNotFound)
268 for _, v := range vols {
269 if err := v.IndexTo(prefix, resp); err != nil {
270 // The only errors returned by IndexTo are
271 // write errors returned by resp.Write(),
272 // which probably means the client has
273 // disconnected and this error will never be
274 // reported to the client -- but it will
275 // appear in our own error log.
276 http.Error(resp, err.Error(), http.StatusInternalServerError)
280 // An empty line at EOF is the only way the client can be
281 // assured the entire index was received.
282 resp.Write([]byte{'\n'})
285 // MountsHandler responds to "GET /mounts" requests.
286 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
287 err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
289 http.Error(resp, err.Error(), http.StatusInternalServerError)
294 type PoolStatus struct {
295 Alloc uint64 `json:"BytesAllocatedCumulative"`
296 Cap int `json:"BuffersMax"`
297 Len int `json:"BuffersInUse"`
300 type volumeStatusEnt struct {
302 Status *VolumeStatus `json:",omitempty"`
303 VolumeStats *ioStats `json:",omitempty"`
304 InternalStats interface{} `json:",omitempty"`
308 type NodeStatus struct {
309 Volumes []*volumeStatusEnt
310 BufferPool PoolStatus
311 PullQueue WorkQueueStatus
312 TrashQueue WorkQueueStatus
319 var stLock sync.Mutex
321 // DebugHandler addresses /debug.json requests.
322 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
323 type debugStats struct {
324 MemStats runtime.MemStats
327 runtime.ReadMemStats(&ds.MemStats)
328 err := json.NewEncoder(resp).Encode(&ds)
330 http.Error(resp, err.Error(), 500)
334 // StatusHandler addresses /status.json requests.
335 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
337 rtr.readNodeStatus(&st)
338 jstat, err := json.Marshal(&st)
343 log.Printf("json.Marshal: %s", err)
344 log.Printf("NodeStatus = %v", &st)
345 http.Error(resp, err.Error(), 500)
349 // populate the given NodeStatus struct with current values.
350 func (rtr *router) readNodeStatus(st *NodeStatus) {
351 st.Version = arvadosVersion.GetVersion()
352 vols := KeepVM.AllReadable()
353 if cap(st.Volumes) < len(vols) {
354 st.Volumes = make([]*volumeStatusEnt, len(vols))
356 st.Volumes = st.Volumes[:0]
357 for _, vol := range vols {
358 var internalStats interface{}
359 if vol, ok := vol.(InternalStatser); ok {
360 internalStats = vol.InternalStats()
362 st.Volumes = append(st.Volumes, &volumeStatusEnt{
364 Status: vol.Status(),
365 InternalStats: internalStats,
366 //VolumeStats: KeepVM.VolumeStats(vol),
369 st.BufferPool.Alloc = bufs.Alloc()
370 st.BufferPool.Cap = bufs.Cap()
371 st.BufferPool.Len = bufs.Len()
372 st.PullQueue = getWorkQueueStatus(pullq)
373 st.TrashQueue = getWorkQueueStatus(trashq)
374 if rtr.limiter != nil {
375 st.RequestsCurrent = rtr.limiter.Current()
376 st.RequestsMax = rtr.limiter.Max()
380 // return a WorkQueueStatus for the given queue. If q is nil (which
381 // should never happen except in test suites), return a zero status
382 // value instead of crashing.
383 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
385 // This should only happen during tests.
386 return WorkQueueStatus{}
391 // DeleteHandler processes DELETE requests.
393 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
394 // from all connected volumes.
396 // Only the Data Manager, or an Arvados admin with scope "all", are
397 // allowed to issue DELETE requests. If a DELETE request is not
398 // authenticated or is issued by a non-admin user, the server returns
399 // a PermissionError.
401 // Upon receiving a valid request from an authorized user,
402 // DeleteHandler deletes all copies of the specified block on local
407 // If the requested blocks was not found on any volume, the response
408 // code is HTTP 404 Not Found.
410 // Otherwise, the response code is 200 OK, with a response body
411 // consisting of the JSON message
413 // {"copies_deleted":d,"copies_failed":f}
415 // where d and f are integers representing the number of blocks that
416 // were successfully and unsuccessfully deleted.
418 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
419 hash := mux.Vars(req)["hash"]
421 // Confirm that this user is an admin and has a token with unlimited scope.
422 var tok = GetAPIToken(req)
423 if tok == "" || !CanDelete(tok) {
424 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
428 if !theConfig.EnableDelete {
429 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
433 // Delete copies of this block from all available volumes.
434 // Report how many blocks were successfully deleted, and how
435 // many were found on writable volumes but not deleted.
437 Deleted int `json:"copies_deleted"`
438 Failed int `json:"copies_failed"`
440 for _, vol := range KeepVM.AllWritable() {
441 if err := vol.Trash(hash); err == nil {
443 } else if os.IsNotExist(err) {
447 log.Println("DeleteHandler:", err)
453 if result.Deleted == 0 && result.Failed == 0 {
454 st = http.StatusNotFound
461 if st == http.StatusOK {
462 if body, err := json.Marshal(result); err == nil {
465 log.Printf("json.Marshal: %s (result = %v)", err, result)
466 http.Error(resp, err.Error(), 500)
471 /* PullHandler processes "PUT /pull" requests for the data manager.
472 The request body is a JSON message containing a list of pull
473 requests in the following format:
477 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
479 "keep0.qr1hi.arvadosapi.com:25107",
480 "keep1.qr1hi.arvadosapi.com:25108"
484 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
494 Each pull request in the list consists of a block locator string
495 and an ordered list of servers. Keepstore should try to fetch the
496 block from each server in turn.
498 If the request has not been sent by the Data Manager, return 401
501 If the JSON unmarshalling fails, return 400 Bad Request.
504 // PullRequest consists of a block locator and an ordered list of servers
505 type PullRequest struct {
506 Locator string `json:"locator"`
507 Servers []string `json:"servers"`
509 // Destination mount, or "" for "anywhere"
513 // PullHandler processes "PUT /pull" requests for the data manager.
514 func PullHandler(resp http.ResponseWriter, req *http.Request) {
515 // Reject unauthorized requests.
516 if !IsSystemAuth(GetAPIToken(req)) {
517 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
521 // Parse the request body.
523 r := json.NewDecoder(req.Body)
524 if err := r.Decode(&pr); err != nil {
525 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
529 // We have a properly formatted pull list sent from the data
530 // manager. Report success and send the list to the pull list
531 // manager for further handling.
532 resp.WriteHeader(http.StatusOK)
534 fmt.Sprintf("Received %d pull requests\n", len(pr))))
537 for _, p := range pr {
540 pullq.ReplaceQueue(plist)
543 // TrashRequest consists of a block locator and it's Mtime
544 type TrashRequest struct {
545 Locator string `json:"locator"`
546 BlockMtime int64 `json:"block_mtime"`
548 // Target mount, or "" for "everywhere"
552 // TrashHandler processes /trash requests.
553 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
554 // Reject unauthorized requests.
555 if !IsSystemAuth(GetAPIToken(req)) {
556 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
560 // Parse the request body.
561 var trash []TrashRequest
562 r := json.NewDecoder(req.Body)
563 if err := r.Decode(&trash); err != nil {
564 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
568 // We have a properly formatted trash list sent from the data
569 // manager. Report success and send the list to the trash work
570 // queue for further handling.
571 resp.WriteHeader(http.StatusOK)
573 fmt.Sprintf("Received %d trash requests\n", len(trash))))
576 for _, t := range trash {
579 trashq.ReplaceQueue(tlist)
582 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
583 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
584 // Reject unauthorized requests.
585 if !IsSystemAuth(GetAPIToken(req)) {
586 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
590 hash := mux.Vars(req)["hash"]
592 if len(KeepVM.AllWritable()) == 0 {
593 http.Error(resp, "No writable volumes", http.StatusNotFound)
597 var untrashedOn, failedOn []string
599 for _, vol := range KeepVM.AllWritable() {
600 err := vol.Untrash(hash)
602 if os.IsNotExist(err) {
604 } else if err != nil {
605 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
606 failedOn = append(failedOn, vol.String())
608 log.Printf("Untrashed %v on volume %v", hash, vol.String())
609 untrashedOn = append(untrashedOn, vol.String())
613 if numNotFound == len(KeepVM.AllWritable()) {
614 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
618 if len(failedOn) == len(KeepVM.AllWritable()) {
619 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
621 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
622 if len(failedOn) > 0 {
623 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
625 resp.Write([]byte(respBody))
629 // GetBlock and PutBlock implement lower-level code for handling
630 // blocks by rooting through volumes connected to the local machine.
631 // Once the handler has determined that system policy permits the
632 // request, it calls these methods to perform the actual operation.
634 // TODO(twp): this code would probably be better located in the
635 // VolumeManager interface. As an abstraction, the VolumeManager
636 // should be the only part of the code that cares about which volume a
637 // block is stored on, so it should be responsible for figuring out
638 // which volume to check for fetching blocks, storing blocks, etc.
640 // GetBlock fetches the block identified by "hash" into the provided
641 // buf, and returns the data size.
643 // If the block cannot be found on any volume, returns NotFoundError.
645 // If the block found does not have the correct MD5 hash, returns
648 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
649 // Attempt to read the requested hash from a keep volume.
650 errorToCaller := NotFoundError
652 for _, vol := range KeepVM.AllReadable() {
653 size, err := vol.Get(ctx, hash, buf)
656 return 0, ErrClientDisconnect
660 // IsNotExist is an expected error and may be
661 // ignored. All other errors are logged. In
662 // any case we continue trying to read other
663 // volumes. If all volumes report IsNotExist,
664 // we return a NotFoundError.
665 if !os.IsNotExist(err) {
666 log.Printf("%s: Get(%s): %s", vol, hash, err)
670 // Check the file checksum.
672 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
673 if filehash != hash {
674 // TODO: Try harder to tell a sysadmin about
676 log.Printf("%s: checksum mismatch for request %s (actual %s)",
678 errorToCaller = DiskHashError
681 if errorToCaller == DiskHashError {
682 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
687 return 0, errorToCaller
690 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
692 // PutBlock(ctx, block, hash)
693 // Stores the BLOCK (identified by the content id HASH) in Keep.
695 // The MD5 checksum of the block must be identical to the content id HASH.
696 // If not, an error is returned.
698 // PutBlock stores the BLOCK on the first Keep volume with free space.
699 // A failure code is returned to the user only if all volumes fail.
701 // On success, PutBlock returns nil.
702 // On failure, it returns a KeepError with one of the following codes:
705 // A different block with the same hash already exists on this
708 // The MD5 hash of the BLOCK does not match the argument HASH.
710 // There was not enough space left in any Keep volume to store
713 // The object could not be stored for some other reason (e.g.
714 // all writes failed). The text of the error message should
715 // provide as much detail as possible.
717 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
718 // Check that BLOCK's checksum matches HASH.
719 blockhash := fmt.Sprintf("%x", md5.Sum(block))
720 if blockhash != hash {
721 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
722 return 0, RequestHashError
725 // If we already have this data, it's intact on disk, and we
726 // can update its timestamp, return success. If we have
727 // different data with the same hash, return failure.
728 if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
730 } else if ctx.Err() != nil {
731 return 0, ErrClientDisconnect
734 // Choose a Keep volume to write to.
735 // If this volume fails, try all of the volumes in order.
736 if vol := KeepVM.NextWritable(); vol != nil {
737 if err := vol.Put(ctx, hash, block); err == nil {
738 return vol.Replication(), nil // success!
740 if ctx.Err() != nil {
741 return 0, ErrClientDisconnect
745 writables := KeepVM.AllWritable()
746 if len(writables) == 0 {
747 log.Print("No writable volumes.")
752 for _, vol := range writables {
753 err := vol.Put(ctx, hash, block)
754 if ctx.Err() != nil {
755 return 0, ErrClientDisconnect
758 return vol.Replication(), nil // success!
760 if err != FullError {
761 // The volume is not full but the
762 // write did not succeed. Report the
763 // error and continue trying.
765 log.Printf("%s: Write(%s): %s", vol, hash, err)
770 log.Print("All volumes are full.")
773 // Already logged the non-full errors.
774 return 0, GenericError
777 // CompareAndTouch returns the current replication level if one of the
778 // volumes already has the given content and it successfully updates
779 // the relevant block's modification time in order to protect it from
780 // premature garbage collection. Otherwise, it returns a non-nil
782 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
783 var bestErr error = NotFoundError
784 for _, vol := range KeepVM.AllWritable() {
785 err := vol.Compare(ctx, hash, buf)
786 if ctx.Err() != nil {
788 } else if err == CollisionError {
789 // Stop if we have a block with same hash but
790 // different content. (It will be impossible
791 // to tell which one is wanted if we have
792 // both, so there's no point writing it even
793 // on a different volume.)
794 log.Printf("%s: Compare(%s): %s", vol, hash, err)
796 } else if os.IsNotExist(err) {
797 // Block does not exist. This is the only
798 // "normal" error: we don't log anything.
800 } else if err != nil {
801 // Couldn't open file, data is corrupt on
802 // disk, etc.: log this abnormal condition,
803 // and try the next volume.
804 log.Printf("%s: Compare(%s): %s", vol, hash, err)
807 if err := vol.Touch(hash); err != nil {
808 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
812 // Compare and Touch both worked --> done.
813 return vol.Replication(), nil
818 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
820 // IsValidLocator returns true if the specified string is a valid Keep locator.
821 // When Keep is extended to support hash types other than MD5,
822 // this should be updated to cover those as well.
824 func IsValidLocator(loc string) bool {
825 return validLocatorRe.MatchString(loc)
828 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
830 // GetAPIToken returns the OAuth2 token from the Authorization
831 // header of a HTTP request, or an empty string if no matching
833 func GetAPIToken(req *http.Request) string {
834 if auth, ok := req.Header["Authorization"]; ok {
835 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
842 // IsExpired returns true if the given Unix timestamp (expressed as a
843 // hexadecimal string) is in the past, or if timestampHex cannot be
844 // parsed as a hexadecimal string.
845 func IsExpired(timestampHex string) bool {
846 ts, err := strconv.ParseInt(timestampHex, 16, 0)
848 log.Printf("IsExpired: %s", err)
851 return time.Unix(ts, 0).Before(time.Now())
854 // CanDelete returns true if the user identified by apiToken is
855 // allowed to delete blocks.
856 func CanDelete(apiToken string) bool {
860 // Blocks may be deleted only when Keep has been configured with a
862 if IsSystemAuth(apiToken) {
865 // TODO(twp): look up apiToken with the API server
866 // return true if is_admin is true and if the token
867 // has unlimited scope
871 // IsSystemAuth returns true if the given token is allowed to perform
872 // system level actions like deleting data.
873 func IsSystemAuth(token string) bool {
874 return token != "" && token == theConfig.systemAuthToken