1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
23 "github.com/gorilla/mux"
25 "git.curoverse.com/arvados.git/sdk/go/arvados"
26 "git.curoverse.com/arvados.git/sdk/go/health"
27 "git.curoverse.com/arvados.git/sdk/go/httpserver"
32 limiter httpserver.RequestCounter
33 cluster *arvados.Cluster
34 remoteProxy remoteProxy
37 // MakeRESTRouter returns a new router that forwards all Keep requests
38 // to the appropriate handlers.
39 func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
41 Router: mux.NewRouter(),
46 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
48 `/{hash:[0-9a-f]{32}}+{hints}`,
49 rtr.handleGET).Methods("GET", "HEAD")
51 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
52 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
53 // List all blocks stored here. Privileged client only.
54 rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
55 // List blocks stored here whose hash has the given prefix.
56 // Privileged client only.
57 rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
59 // Internals/debugging info (runtime.MemStats)
60 rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
62 // List volumes: path, device number, bytes used/avail.
63 rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
65 // List mounts: UUID, readonly, tier, device ID, ...
66 rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
67 rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
68 rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
70 // Replace the current pull queue.
71 rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
73 // Replace the current trash queue.
74 rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
76 // Untrash moves blocks from trash back into store
77 rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
79 rtr.Handle("/_health/{check}", &health.Handler{
80 Token: theConfig.ManagementToken,
84 // Any request which does not match any of these routes gets
86 rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
88 rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
90 instrumented := httpserver.Instrument(nil, nil,
91 httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
92 return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
95 // BadRequestHandler is a HandleFunc to address bad requests.
96 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
97 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
100 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
101 ctx, cancel := contextForResponse(context.TODO(), resp)
104 locator := req.URL.Path[1:]
105 if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
106 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster)
110 if theConfig.RequireSignatures {
111 locator := req.URL.Path[1:] // strip leading slash
112 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
113 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
118 // TODO: Probe volumes to check whether the block _might_
119 // exist. Some volumes/types could support a quick existence
120 // check without causing other operations to suffer. If all
121 // volumes support that, and assure us the block definitely
122 // isn't here, we can return 404 now instead of waiting for a
125 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
127 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
132 size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
134 code := http.StatusInternalServerError
135 if err, ok := err.(*KeepError); ok {
138 http.Error(resp, err.Error(), code)
142 resp.Header().Set("Content-Length", strconv.Itoa(size))
143 resp.Header().Set("Content-Type", "application/octet-stream")
144 resp.Write(buf[:size])
147 // Return a new context that gets cancelled by resp's CloseNotifier.
148 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
149 ctx, cancel := context.WithCancel(parent)
150 if cn, ok := resp.(http.CloseNotifier); ok {
151 go func(c <-chan bool) {
154 theConfig.debugLogf("cancel context")
163 // Get a buffer from the pool -- but give up and return a non-nil
164 // error if ctx ends before we get a buffer.
165 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
166 bufReady := make(chan []byte)
168 bufReady <- bufs.Get(bufSize)
171 case buf := <-bufReady:
175 // Even if closeNotifier happened first, we
176 // need to keep waiting for our buf so we can
177 // return it to the pool.
180 return nil, ErrClientDisconnect
184 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
185 ctx, cancel := contextForResponse(context.TODO(), resp)
188 hash := mux.Vars(req)["hash"]
190 // Detect as many error conditions as possible before reading
191 // the body: avoid transmitting data that will not end up
192 // being written anyway.
194 if req.ContentLength == -1 {
195 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
199 if req.ContentLength > BlockSize {
200 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
204 if len(KeepVM.AllWritable()) == 0 {
205 http.Error(resp, FullError.Error(), FullError.HTTPCode)
209 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
211 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
215 _, err = io.ReadFull(req.Body, buf)
217 http.Error(resp, err.Error(), 500)
222 replication, err := PutBlock(ctx, buf, hash)
226 code := http.StatusInternalServerError
227 if err, ok := err.(*KeepError); ok {
230 http.Error(resp, err.Error(), code)
234 // Success; add a size hint, sign the locator if possible, and
235 // return it to the client.
236 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
237 apiToken := GetAPIToken(req)
238 if theConfig.blobSigningKey != nil && apiToken != "" {
239 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
240 returnHash = SignLocator(returnHash, apiToken, expiry)
242 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
243 resp.Write([]byte(returnHash + "\n"))
246 // IndexHandler responds to "/index", "/index/{prefix}", and
247 // "/mounts/{uuid}/blocks" requests.
248 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
249 if !IsSystemAuth(GetAPIToken(req)) {
250 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
254 prefix := mux.Vars(req)["prefix"]
257 prefix = req.Form.Get("prefix")
260 uuid := mux.Vars(req)["uuid"]
264 vols = KeepVM.AllReadable()
265 } else if v := KeepVM.Lookup(uuid, false); v == nil {
266 http.Error(resp, "mount not found", http.StatusNotFound)
272 for _, v := range vols {
273 if err := v.IndexTo(prefix, resp); err != nil {
274 // The only errors returned by IndexTo are
275 // write errors returned by resp.Write(),
276 // which probably means the client has
277 // disconnected and this error will never be
278 // reported to the client -- but it will
279 // appear in our own error log.
280 http.Error(resp, err.Error(), http.StatusInternalServerError)
284 // An empty line at EOF is the only way the client can be
285 // assured the entire index was received.
286 resp.Write([]byte{'\n'})
289 // MountsHandler responds to "GET /mounts" requests.
290 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
291 err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
293 http.Error(resp, err.Error(), http.StatusInternalServerError)
298 type PoolStatus struct {
299 Alloc uint64 `json:"BytesAllocatedCumulative"`
300 Cap int `json:"BuffersMax"`
301 Len int `json:"BuffersInUse"`
304 type volumeStatusEnt struct {
306 Status *VolumeStatus `json:",omitempty"`
307 VolumeStats *ioStats `json:",omitempty"`
308 InternalStats interface{} `json:",omitempty"`
312 type NodeStatus struct {
313 Volumes []*volumeStatusEnt
314 BufferPool PoolStatus
315 PullQueue WorkQueueStatus
316 TrashQueue WorkQueueStatus
323 var stLock sync.Mutex
325 // DebugHandler addresses /debug.json requests.
326 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
327 type debugStats struct {
328 MemStats runtime.MemStats
331 runtime.ReadMemStats(&ds.MemStats)
332 err := json.NewEncoder(resp).Encode(&ds)
334 http.Error(resp, err.Error(), 500)
338 // StatusHandler addresses /status.json requests.
339 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
341 rtr.readNodeStatus(&st)
342 jstat, err := json.Marshal(&st)
347 log.Printf("json.Marshal: %s", err)
348 log.Printf("NodeStatus = %v", &st)
349 http.Error(resp, err.Error(), 500)
353 // populate the given NodeStatus struct with current values.
354 func (rtr *router) readNodeStatus(st *NodeStatus) {
356 vols := KeepVM.AllReadable()
357 if cap(st.Volumes) < len(vols) {
358 st.Volumes = make([]*volumeStatusEnt, len(vols))
360 st.Volumes = st.Volumes[:0]
361 for _, vol := range vols {
362 var internalStats interface{}
363 if vol, ok := vol.(InternalStatser); ok {
364 internalStats = vol.InternalStats()
366 st.Volumes = append(st.Volumes, &volumeStatusEnt{
368 Status: vol.Status(),
369 InternalStats: internalStats,
370 //VolumeStats: KeepVM.VolumeStats(vol),
373 st.BufferPool.Alloc = bufs.Alloc()
374 st.BufferPool.Cap = bufs.Cap()
375 st.BufferPool.Len = bufs.Len()
376 st.PullQueue = getWorkQueueStatus(pullq)
377 st.TrashQueue = getWorkQueueStatus(trashq)
378 if rtr.limiter != nil {
379 st.RequestsCurrent = rtr.limiter.Current()
380 st.RequestsMax = rtr.limiter.Max()
384 // return a WorkQueueStatus for the given queue. If q is nil (which
385 // should never happen except in test suites), return a zero status
386 // value instead of crashing.
387 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
389 // This should only happen during tests.
390 return WorkQueueStatus{}
395 // DeleteHandler processes DELETE requests.
397 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
398 // from all connected volumes.
400 // Only the Data Manager, or an Arvados admin with scope "all", are
401 // allowed to issue DELETE requests. If a DELETE request is not
402 // authenticated or is issued by a non-admin user, the server returns
403 // a PermissionError.
405 // Upon receiving a valid request from an authorized user,
406 // DeleteHandler deletes all copies of the specified block on local
411 // If the requested blocks was not found on any volume, the response
412 // code is HTTP 404 Not Found.
414 // Otherwise, the response code is 200 OK, with a response body
415 // consisting of the JSON message
417 // {"copies_deleted":d,"copies_failed":f}
419 // where d and f are integers representing the number of blocks that
420 // were successfully and unsuccessfully deleted.
422 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
423 hash := mux.Vars(req)["hash"]
425 // Confirm that this user is an admin and has a token with unlimited scope.
426 var tok = GetAPIToken(req)
427 if tok == "" || !CanDelete(tok) {
428 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
432 if !theConfig.EnableDelete {
433 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
437 // Delete copies of this block from all available volumes.
438 // Report how many blocks were successfully deleted, and how
439 // many were found on writable volumes but not deleted.
441 Deleted int `json:"copies_deleted"`
442 Failed int `json:"copies_failed"`
444 for _, vol := range KeepVM.AllWritable() {
445 if err := vol.Trash(hash); err == nil {
447 } else if os.IsNotExist(err) {
451 log.Println("DeleteHandler:", err)
457 if result.Deleted == 0 && result.Failed == 0 {
458 st = http.StatusNotFound
465 if st == http.StatusOK {
466 if body, err := json.Marshal(result); err == nil {
469 log.Printf("json.Marshal: %s (result = %v)", err, result)
470 http.Error(resp, err.Error(), 500)
475 /* PullHandler processes "PUT /pull" requests for the data manager.
476 The request body is a JSON message containing a list of pull
477 requests in the following format:
481 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
483 "keep0.qr1hi.arvadosapi.com:25107",
484 "keep1.qr1hi.arvadosapi.com:25108"
488 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
498 Each pull request in the list consists of a block locator string
499 and an ordered list of servers. Keepstore should try to fetch the
500 block from each server in turn.
502 If the request has not been sent by the Data Manager, return 401
505 If the JSON unmarshalling fails, return 400 Bad Request.
508 // PullRequest consists of a block locator and an ordered list of servers
509 type PullRequest struct {
510 Locator string `json:"locator"`
511 Servers []string `json:"servers"`
513 // Destination mount, or "" for "anywhere"
514 MountUUID string `json:"mount_uuid"`
517 // PullHandler processes "PUT /pull" requests for the data manager.
518 func PullHandler(resp http.ResponseWriter, req *http.Request) {
519 // Reject unauthorized requests.
520 if !IsSystemAuth(GetAPIToken(req)) {
521 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
525 // Parse the request body.
527 r := json.NewDecoder(req.Body)
528 if err := r.Decode(&pr); err != nil {
529 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
533 // We have a properly formatted pull list sent from the data
534 // manager. Report success and send the list to the pull list
535 // manager for further handling.
536 resp.WriteHeader(http.StatusOK)
538 fmt.Sprintf("Received %d pull requests\n", len(pr))))
541 for _, p := range pr {
544 pullq.ReplaceQueue(plist)
547 // TrashRequest consists of a block locator and its Mtime
548 type TrashRequest struct {
549 Locator string `json:"locator"`
550 BlockMtime int64 `json:"block_mtime"`
552 // Target mount, or "" for "everywhere"
553 MountUUID string `json:"mount_uuid"`
556 // TrashHandler processes /trash requests.
557 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
558 // Reject unauthorized requests.
559 if !IsSystemAuth(GetAPIToken(req)) {
560 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
564 // Parse the request body.
565 var trash []TrashRequest
566 r := json.NewDecoder(req.Body)
567 if err := r.Decode(&trash); err != nil {
568 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
572 // We have a properly formatted trash list sent from the data
573 // manager. Report success and send the list to the trash work
574 // queue for further handling.
575 resp.WriteHeader(http.StatusOK)
577 fmt.Sprintf("Received %d trash requests\n", len(trash))))
580 for _, t := range trash {
583 trashq.ReplaceQueue(tlist)
586 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
587 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
588 // Reject unauthorized requests.
589 if !IsSystemAuth(GetAPIToken(req)) {
590 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
594 hash := mux.Vars(req)["hash"]
596 if len(KeepVM.AllWritable()) == 0 {
597 http.Error(resp, "No writable volumes", http.StatusNotFound)
601 var untrashedOn, failedOn []string
603 for _, vol := range KeepVM.AllWritable() {
604 err := vol.Untrash(hash)
606 if os.IsNotExist(err) {
608 } else if err != nil {
609 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
610 failedOn = append(failedOn, vol.String())
612 log.Printf("Untrashed %v on volume %v", hash, vol.String())
613 untrashedOn = append(untrashedOn, vol.String())
617 if numNotFound == len(KeepVM.AllWritable()) {
618 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
622 if len(failedOn) == len(KeepVM.AllWritable()) {
623 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
625 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
626 if len(failedOn) > 0 {
627 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
629 resp.Write([]byte(respBody))
633 // GetBlock and PutBlock implement lower-level code for handling
634 // blocks by rooting through volumes connected to the local machine.
635 // Once the handler has determined that system policy permits the
636 // request, it calls these methods to perform the actual operation.
638 // TODO(twp): this code would probably be better located in the
639 // VolumeManager interface. As an abstraction, the VolumeManager
640 // should be the only part of the code that cares about which volume a
641 // block is stored on, so it should be responsible for figuring out
642 // which volume to check for fetching blocks, storing blocks, etc.
644 // GetBlock fetches the block identified by "hash" into the provided
645 // buf, and returns the data size.
647 // If the block cannot be found on any volume, returns NotFoundError.
649 // If the block found does not have the correct MD5 hash, returns
652 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
653 // Attempt to read the requested hash from a keep volume.
654 errorToCaller := NotFoundError
656 for _, vol := range KeepVM.AllReadable() {
657 size, err := vol.Get(ctx, hash, buf)
660 return 0, ErrClientDisconnect
664 // IsNotExist is an expected error and may be
665 // ignored. All other errors are logged. In
666 // any case we continue trying to read other
667 // volumes. If all volumes report IsNotExist,
668 // we return a NotFoundError.
669 if !os.IsNotExist(err) {
670 log.Printf("%s: Get(%s): %s", vol, hash, err)
674 // Check the file checksum.
676 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
677 if filehash != hash {
678 // TODO: Try harder to tell a sysadmin about
680 log.Printf("%s: checksum mismatch for request %s (actual %s)",
682 errorToCaller = DiskHashError
685 if errorToCaller == DiskHashError {
686 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
691 return 0, errorToCaller
694 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
696 // PutBlock(ctx, block, hash)
697 // Stores the BLOCK (identified by the content id HASH) in Keep.
699 // The MD5 checksum of the block must be identical to the content id HASH.
700 // If not, an error is returned.
702 // PutBlock stores the BLOCK on the first Keep volume with free space.
703 // A failure code is returned to the user only if all volumes fail.
705 // On success, PutBlock returns nil.
706 // On failure, it returns a KeepError with one of the following codes:
709 // A different block with the same hash already exists on this
712 // The MD5 hash of the BLOCK does not match the argument HASH.
714 // There was not enough space left in any Keep volume to store
717 // The object could not be stored for some other reason (e.g.
718 // all writes failed). The text of the error message should
719 // provide as much detail as possible.
721 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
722 // Check that BLOCK's checksum matches HASH.
723 blockhash := fmt.Sprintf("%x", md5.Sum(block))
724 if blockhash != hash {
725 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
726 return 0, RequestHashError
729 // If we already have this data, it's intact on disk, and we
730 // can update its timestamp, return success. If we have
731 // different data with the same hash, return failure.
732 if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
734 } else if ctx.Err() != nil {
735 return 0, ErrClientDisconnect
738 // Choose a Keep volume to write to.
739 // If this volume fails, try all of the volumes in order.
740 if vol := KeepVM.NextWritable(); vol != nil {
741 if err := vol.Put(ctx, hash, block); err == nil {
742 return vol.Replication(), nil // success!
744 if ctx.Err() != nil {
745 return 0, ErrClientDisconnect
749 writables := KeepVM.AllWritable()
750 if len(writables) == 0 {
751 log.Print("No writable volumes.")
756 for _, vol := range writables {
757 err := vol.Put(ctx, hash, block)
758 if ctx.Err() != nil {
759 return 0, ErrClientDisconnect
762 return vol.Replication(), nil // success!
764 if err != FullError {
765 // The volume is not full but the
766 // write did not succeed. Report the
767 // error and continue trying.
769 log.Printf("%s: Write(%s): %s", vol, hash, err)
774 log.Print("All volumes are full.")
777 // Already logged the non-full errors.
778 return 0, GenericError
781 // CompareAndTouch returns the current replication level if one of the
782 // volumes already has the given content and it successfully updates
783 // the relevant block's modification time in order to protect it from
784 // premature garbage collection. Otherwise, it returns a non-nil
786 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
787 var bestErr error = NotFoundError
788 for _, vol := range KeepVM.AllWritable() {
789 err := vol.Compare(ctx, hash, buf)
790 if ctx.Err() != nil {
792 } else if err == CollisionError {
793 // Stop if we have a block with same hash but
794 // different content. (It will be impossible
795 // to tell which one is wanted if we have
796 // both, so there's no point writing it even
797 // on a different volume.)
798 log.Printf("%s: Compare(%s): %s", vol, hash, err)
800 } else if os.IsNotExist(err) {
801 // Block does not exist. This is the only
802 // "normal" error: we don't log anything.
804 } else if err != nil {
805 // Couldn't open file, data is corrupt on
806 // disk, etc.: log this abnormal condition,
807 // and try the next volume.
808 log.Printf("%s: Compare(%s): %s", vol, hash, err)
811 if err := vol.Touch(hash); err != nil {
812 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
816 // Compare and Touch both worked --> done.
817 return vol.Replication(), nil
822 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
824 // IsValidLocator returns true if the specified string is a valid Keep locator.
825 // When Keep is extended to support hash types other than MD5,
826 // this should be updated to cover those as well.
828 func IsValidLocator(loc string) bool {
829 return validLocatorRe.MatchString(loc)
832 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
834 // GetAPIToken returns the OAuth2 token from the Authorization
835 // header of a HTTP request, or an empty string if no matching
837 func GetAPIToken(req *http.Request) string {
838 if auth, ok := req.Header["Authorization"]; ok {
839 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
846 // IsExpired returns true if the given Unix timestamp (expressed as a
847 // hexadecimal string) is in the past, or if timestampHex cannot be
848 // parsed as a hexadecimal string.
849 func IsExpired(timestampHex string) bool {
850 ts, err := strconv.ParseInt(timestampHex, 16, 0)
852 log.Printf("IsExpired: %s", err)
855 return time.Unix(ts, 0).Before(time.Now())
858 // CanDelete returns true if the user identified by apiToken is
859 // allowed to delete blocks.
860 func CanDelete(apiToken string) bool {
864 // Blocks may be deleted only when Keep has been configured with a
866 if IsSystemAuth(apiToken) {
869 // TODO(twp): look up apiToken with the API server
870 // return true if is_admin is true and if the token
871 // has unlimited scope
875 // IsSystemAuth returns true if the given token is allowed to perform
876 // system level actions like deleting data.
877 func IsSystemAuth(token string) bool {
878 return token != "" && token == theConfig.systemAuthToken