1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
7 // REST handlers for Keep are implemented here.
9 // GetBlockHandler (GET /locator)
10 // PutBlockHandler (PUT /locator)
11 // IndexHandler (GET /index, GET /index/prefix)
12 // StatusHandler (GET /status.json)
30 "github.com/gorilla/mux"
32 "git.curoverse.com/arvados.git/sdk/go/health"
33 "git.curoverse.com/arvados.git/sdk/go/httpserver"
38 limiter httpserver.RequestCounter
41 // MakeRESTRouter returns a new router that forwards all Keep requests
42 // to the appropriate handlers.
43 func MakeRESTRouter() http.Handler {
44 rtr := &router{Router: mux.NewRouter()}
47 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
49 `/{hash:[0-9a-f]{32}}+{hints}`,
50 GetBlockHandler).Methods("GET", "HEAD")
52 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
53 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
54 // List all blocks stored here. Privileged client only.
55 rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
56 // List blocks stored here whose hash has the given prefix.
57 // Privileged client only.
58 rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
60 // Internals/debugging info (runtime.MemStats)
61 rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
63 // List volumes: path, device number, bytes used/avail.
64 rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
66 // List mounts: UUID, readonly, tier, device ID, ...
67 rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
68 rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
69 rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
71 // Replace the current pull queue.
72 rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
74 // Replace the current trash queue.
75 rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
77 // Untrash moves blocks from trash back into store
78 rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
80 rtr.Handle("/_health/{check}", &health.Handler{
81 Token: theConfig.ManagementToken,
85 // Any request which does not match any of these routes gets
87 rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
89 theConfig.metrics.setup()
91 rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
93 mux := http.NewServeMux()
94 mux.Handle("/", theConfig.metrics.Instrument(
95 httpserver.AddRequestIDs(httpserver.LogRequests(rtr.limiter))))
96 mux.HandleFunc("/metrics.json", theConfig.metrics.exportJSON)
97 mux.Handle("/metrics", theConfig.metrics.exportProm)
102 // BadRequestHandler is a HandleFunc to address bad requests.
103 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
104 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
107 // GetBlockHandler is a HandleFunc to address Get block requests.
108 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
109 ctx, cancel := contextForResponse(context.TODO(), resp)
112 if theConfig.RequireSignatures {
113 locator := req.URL.Path[1:] // strip leading slash
114 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
115 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
120 // TODO: Probe volumes to check whether the block _might_
121 // exist. Some volumes/types could support a quick existence
122 // check without causing other operations to suffer. If all
123 // volumes support that, and assure us the block definitely
124 // isn't here, we can return 404 now instead of waiting for a
127 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
129 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
134 size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
136 code := http.StatusInternalServerError
137 if err, ok := err.(*KeepError); ok {
140 http.Error(resp, err.Error(), code)
144 resp.Header().Set("Content-Length", strconv.Itoa(size))
145 resp.Header().Set("Content-Type", "application/octet-stream")
146 resp.Write(buf[:size])
149 // Return a new context that gets cancelled by resp's CloseNotifier.
150 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
151 ctx, cancel := context.WithCancel(parent)
152 if cn, ok := resp.(http.CloseNotifier); ok {
153 go func(c <-chan bool) {
156 theConfig.debugLogf("cancel context")
165 // Get a buffer from the pool -- but give up and return a non-nil
166 // error if ctx ends before we get a buffer.
167 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
168 bufReady := make(chan []byte)
170 bufReady <- bufs.Get(bufSize)
173 case buf := <-bufReady:
177 // Even if closeNotifier happened first, we
178 // need to keep waiting for our buf so we can
179 // return it to the pool.
182 return nil, ErrClientDisconnect
186 // PutBlockHandler is a HandleFunc to address Put block requests.
187 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
188 ctx, cancel := contextForResponse(context.TODO(), resp)
191 hash := mux.Vars(req)["hash"]
193 // Detect as many error conditions as possible before reading
194 // the body: avoid transmitting data that will not end up
195 // being written anyway.
197 if req.ContentLength == -1 {
198 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
202 if req.ContentLength > BlockSize {
203 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
207 if len(KeepVM.AllWritable()) == 0 {
208 http.Error(resp, FullError.Error(), FullError.HTTPCode)
212 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
214 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
218 _, err = io.ReadFull(req.Body, buf)
220 http.Error(resp, err.Error(), 500)
225 replication, err := PutBlock(ctx, buf, hash)
229 code := http.StatusInternalServerError
230 if err, ok := err.(*KeepError); ok {
233 http.Error(resp, err.Error(), code)
237 // Success; add a size hint, sign the locator if possible, and
238 // return it to the client.
239 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
240 apiToken := GetAPIToken(req)
241 if theConfig.blobSigningKey != nil && apiToken != "" {
242 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
243 returnHash = SignLocator(returnHash, apiToken, expiry)
245 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
246 resp.Write([]byte(returnHash + "\n"))
249 // IndexHandler responds to "/index", "/index/{prefix}", and
250 // "/mounts/{uuid}/blocks" requests.
251 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
252 if !IsSystemAuth(GetAPIToken(req)) {
253 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
257 prefix := mux.Vars(req)["prefix"]
260 prefix = req.Form.Get("prefix")
263 uuid := mux.Vars(req)["uuid"]
267 vols = KeepVM.AllReadable()
268 } else if v := KeepVM.Lookup(uuid, false); v == nil {
269 http.Error(resp, "mount not found", http.StatusNotFound)
275 for _, v := range vols {
276 if err := v.IndexTo(prefix, resp); err != nil {
277 // The only errors returned by IndexTo are
278 // write errors returned by resp.Write(),
279 // which probably means the client has
280 // disconnected and this error will never be
281 // reported to the client -- but it will
282 // appear in our own error log.
283 http.Error(resp, err.Error(), http.StatusInternalServerError)
287 // An empty line at EOF is the only way the client can be
288 // assured the entire index was received.
289 resp.Write([]byte{'\n'})
292 // MountsHandler responds to "GET /mounts" requests.
293 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
294 err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
296 http.Error(resp, err.Error(), http.StatusInternalServerError)
301 type PoolStatus struct {
302 Alloc uint64 `json:"BytesAllocatedCumulative"`
303 Cap int `json:"BuffersMax"`
304 Len int `json:"BuffersInUse"`
307 type volumeStatusEnt struct {
309 Status *VolumeStatus `json:",omitempty"`
310 VolumeStats *ioStats `json:",omitempty"`
311 InternalStats interface{} `json:",omitempty"`
315 type NodeStatus struct {
316 Volumes []*volumeStatusEnt
317 BufferPool PoolStatus
318 PullQueue WorkQueueStatus
319 TrashQueue WorkQueueStatus
326 var stLock sync.Mutex
328 // DebugHandler addresses /debug.json requests.
329 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
330 type debugStats struct {
331 MemStats runtime.MemStats
334 runtime.ReadMemStats(&ds.MemStats)
335 err := json.NewEncoder(resp).Encode(&ds)
337 http.Error(resp, err.Error(), 500)
341 // StatusHandler addresses /status.json requests.
342 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
344 rtr.readNodeStatus(&st)
345 jstat, err := json.Marshal(&st)
350 log.Printf("json.Marshal: %s", err)
351 log.Printf("NodeStatus = %v", &st)
352 http.Error(resp, err.Error(), 500)
356 // populate the given NodeStatus struct with current values.
357 func (rtr *router) readNodeStatus(st *NodeStatus) {
359 vols := KeepVM.AllReadable()
360 if cap(st.Volumes) < len(vols) {
361 st.Volumes = make([]*volumeStatusEnt, len(vols))
363 st.Volumes = st.Volumes[:0]
364 for _, vol := range vols {
365 var internalStats interface{}
366 if vol, ok := vol.(InternalStatser); ok {
367 internalStats = vol.InternalStats()
369 st.Volumes = append(st.Volumes, &volumeStatusEnt{
371 Status: vol.Status(),
372 InternalStats: internalStats,
373 //VolumeStats: KeepVM.VolumeStats(vol),
376 st.BufferPool.Alloc = bufs.Alloc()
377 st.BufferPool.Cap = bufs.Cap()
378 st.BufferPool.Len = bufs.Len()
379 st.PullQueue = getWorkQueueStatus(pullq)
380 st.TrashQueue = getWorkQueueStatus(trashq)
381 if rtr.limiter != nil {
382 st.RequestsCurrent = rtr.limiter.Current()
383 st.RequestsMax = rtr.limiter.Max()
387 // return a WorkQueueStatus for the given queue. If q is nil (which
388 // should never happen except in test suites), return a zero status
389 // value instead of crashing.
390 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
392 // This should only happen during tests.
393 return WorkQueueStatus{}
398 // DeleteHandler processes DELETE requests.
400 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
401 // from all connected volumes.
403 // Only the Data Manager, or an Arvados admin with scope "all", are
404 // allowed to issue DELETE requests. If a DELETE request is not
405 // authenticated or is issued by a non-admin user, the server returns
406 // a PermissionError.
408 // Upon receiving a valid request from an authorized user,
409 // DeleteHandler deletes all copies of the specified block on local
414 // If the requested blocks was not found on any volume, the response
415 // code is HTTP 404 Not Found.
417 // Otherwise, the response code is 200 OK, with a response body
418 // consisting of the JSON message
420 // {"copies_deleted":d,"copies_failed":f}
422 // where d and f are integers representing the number of blocks that
423 // were successfully and unsuccessfully deleted.
425 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
426 hash := mux.Vars(req)["hash"]
428 // Confirm that this user is an admin and has a token with unlimited scope.
429 var tok = GetAPIToken(req)
430 if tok == "" || !CanDelete(tok) {
431 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
435 if !theConfig.EnableDelete {
436 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
440 // Delete copies of this block from all available volumes.
441 // Report how many blocks were successfully deleted, and how
442 // many were found on writable volumes but not deleted.
444 Deleted int `json:"copies_deleted"`
445 Failed int `json:"copies_failed"`
447 for _, vol := range KeepVM.AllWritable() {
448 if err := vol.Trash(hash); err == nil {
450 } else if os.IsNotExist(err) {
454 log.Println("DeleteHandler:", err)
460 if result.Deleted == 0 && result.Failed == 0 {
461 st = http.StatusNotFound
468 if st == http.StatusOK {
469 if body, err := json.Marshal(result); err == nil {
472 log.Printf("json.Marshal: %s (result = %v)", err, result)
473 http.Error(resp, err.Error(), 500)
478 /* PullHandler processes "PUT /pull" requests for the data manager.
479 The request body is a JSON message containing a list of pull
480 requests in the following format:
484 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
486 "keep0.qr1hi.arvadosapi.com:25107",
487 "keep1.qr1hi.arvadosapi.com:25108"
491 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
501 Each pull request in the list consists of a block locator string
502 and an ordered list of servers. Keepstore should try to fetch the
503 block from each server in turn.
505 If the request has not been sent by the Data Manager, return 401
508 If the JSON unmarshalling fails, return 400 Bad Request.
511 // PullRequest consists of a block locator and an ordered list of servers
512 type PullRequest struct {
513 Locator string `json:"locator"`
514 Servers []string `json:"servers"`
516 // Destination mount, or "" for "anywhere"
517 MountUUID string `json:"mount_uuid"`
520 // PullHandler processes "PUT /pull" requests for the data manager.
521 func PullHandler(resp http.ResponseWriter, req *http.Request) {
522 // Reject unauthorized requests.
523 if !IsSystemAuth(GetAPIToken(req)) {
524 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
528 // Parse the request body.
530 r := json.NewDecoder(req.Body)
531 if err := r.Decode(&pr); err != nil {
532 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
536 // We have a properly formatted pull list sent from the data
537 // manager. Report success and send the list to the pull list
538 // manager for further handling.
539 resp.WriteHeader(http.StatusOK)
541 fmt.Sprintf("Received %d pull requests\n", len(pr))))
544 for _, p := range pr {
547 pullq.ReplaceQueue(plist)
550 // TrashRequest consists of a block locator and it's Mtime
551 type TrashRequest struct {
552 Locator string `json:"locator"`
553 BlockMtime int64 `json:"block_mtime"`
555 // Target mount, or "" for "everywhere"
556 MountUUID string `json:"mount_uuid"`
559 // TrashHandler processes /trash requests.
560 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
561 // Reject unauthorized requests.
562 if !IsSystemAuth(GetAPIToken(req)) {
563 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
567 // Parse the request body.
568 var trash []TrashRequest
569 r := json.NewDecoder(req.Body)
570 if err := r.Decode(&trash); err != nil {
571 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
575 // We have a properly formatted trash list sent from the data
576 // manager. Report success and send the list to the trash work
577 // queue for further handling.
578 resp.WriteHeader(http.StatusOK)
580 fmt.Sprintf("Received %d trash requests\n", len(trash))))
583 for _, t := range trash {
586 trashq.ReplaceQueue(tlist)
589 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
590 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
591 // Reject unauthorized requests.
592 if !IsSystemAuth(GetAPIToken(req)) {
593 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
597 hash := mux.Vars(req)["hash"]
599 if len(KeepVM.AllWritable()) == 0 {
600 http.Error(resp, "No writable volumes", http.StatusNotFound)
604 var untrashedOn, failedOn []string
606 for _, vol := range KeepVM.AllWritable() {
607 err := vol.Untrash(hash)
609 if os.IsNotExist(err) {
611 } else if err != nil {
612 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
613 failedOn = append(failedOn, vol.String())
615 log.Printf("Untrashed %v on volume %v", hash, vol.String())
616 untrashedOn = append(untrashedOn, vol.String())
620 if numNotFound == len(KeepVM.AllWritable()) {
621 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
625 if len(failedOn) == len(KeepVM.AllWritable()) {
626 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
628 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
629 if len(failedOn) > 0 {
630 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
632 resp.Write([]byte(respBody))
636 // GetBlock and PutBlock implement lower-level code for handling
637 // blocks by rooting through volumes connected to the local machine.
638 // Once the handler has determined that system policy permits the
639 // request, it calls these methods to perform the actual operation.
641 // TODO(twp): this code would probably be better located in the
642 // VolumeManager interface. As an abstraction, the VolumeManager
643 // should be the only part of the code that cares about which volume a
644 // block is stored on, so it should be responsible for figuring out
645 // which volume to check for fetching blocks, storing blocks, etc.
647 // GetBlock fetches the block identified by "hash" into the provided
648 // buf, and returns the data size.
650 // If the block cannot be found on any volume, returns NotFoundError.
652 // If the block found does not have the correct MD5 hash, returns
655 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
656 // Attempt to read the requested hash from a keep volume.
657 errorToCaller := NotFoundError
659 for _, vol := range KeepVM.AllReadable() {
660 size, err := vol.Get(ctx, hash, buf)
663 return 0, ErrClientDisconnect
667 // IsNotExist is an expected error and may be
668 // ignored. All other errors are logged. In
669 // any case we continue trying to read other
670 // volumes. If all volumes report IsNotExist,
671 // we return a NotFoundError.
672 if !os.IsNotExist(err) {
673 log.Printf("%s: Get(%s): %s", vol, hash, err)
677 // Check the file checksum.
679 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
680 if filehash != hash {
681 // TODO: Try harder to tell a sysadmin about
683 log.Printf("%s: checksum mismatch for request %s (actual %s)",
685 errorToCaller = DiskHashError
688 if errorToCaller == DiskHashError {
689 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
694 return 0, errorToCaller
697 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
699 // PutBlock(ctx, block, hash)
700 // Stores the BLOCK (identified by the content id HASH) in Keep.
702 // The MD5 checksum of the block must be identical to the content id HASH.
703 // If not, an error is returned.
705 // PutBlock stores the BLOCK on the first Keep volume with free space.
706 // A failure code is returned to the user only if all volumes fail.
708 // On success, PutBlock returns nil.
709 // On failure, it returns a KeepError with one of the following codes:
712 // A different block with the same hash already exists on this
715 // The MD5 hash of the BLOCK does not match the argument HASH.
717 // There was not enough space left in any Keep volume to store
720 // The object could not be stored for some other reason (e.g.
721 // all writes failed). The text of the error message should
722 // provide as much detail as possible.
724 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
725 // Check that BLOCK's checksum matches HASH.
726 blockhash := fmt.Sprintf("%x", md5.Sum(block))
727 if blockhash != hash {
728 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
729 return 0, RequestHashError
732 // If we already have this data, it's intact on disk, and we
733 // can update its timestamp, return success. If we have
734 // different data with the same hash, return failure.
735 if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
737 } else if ctx.Err() != nil {
738 return 0, ErrClientDisconnect
741 // Choose a Keep volume to write to.
742 // If this volume fails, try all of the volumes in order.
743 if vol := KeepVM.NextWritable(); vol != nil {
744 if err := vol.Put(ctx, hash, block); err == nil {
745 return vol.Replication(), nil // success!
747 if ctx.Err() != nil {
748 return 0, ErrClientDisconnect
752 writables := KeepVM.AllWritable()
753 if len(writables) == 0 {
754 log.Print("No writable volumes.")
759 for _, vol := range writables {
760 err := vol.Put(ctx, hash, block)
761 if ctx.Err() != nil {
762 return 0, ErrClientDisconnect
765 return vol.Replication(), nil // success!
767 if err != FullError {
768 // The volume is not full but the
769 // write did not succeed. Report the
770 // error and continue trying.
772 log.Printf("%s: Write(%s): %s", vol, hash, err)
777 log.Print("All volumes are full.")
780 // Already logged the non-full errors.
781 return 0, GenericError
784 // CompareAndTouch returns the current replication level if one of the
785 // volumes already has the given content and it successfully updates
786 // the relevant block's modification time in order to protect it from
787 // premature garbage collection. Otherwise, it returns a non-nil
789 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
790 var bestErr error = NotFoundError
791 for _, vol := range KeepVM.AllWritable() {
792 err := vol.Compare(ctx, hash, buf)
793 if ctx.Err() != nil {
795 } else if err == CollisionError {
796 // Stop if we have a block with same hash but
797 // different content. (It will be impossible
798 // to tell which one is wanted if we have
799 // both, so there's no point writing it even
800 // on a different volume.)
801 log.Printf("%s: Compare(%s): %s", vol, hash, err)
803 } else if os.IsNotExist(err) {
804 // Block does not exist. This is the only
805 // "normal" error: we don't log anything.
807 } else if err != nil {
808 // Couldn't open file, data is corrupt on
809 // disk, etc.: log this abnormal condition,
810 // and try the next volume.
811 log.Printf("%s: Compare(%s): %s", vol, hash, err)
814 if err := vol.Touch(hash); err != nil {
815 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
819 // Compare and Touch both worked --> done.
820 return vol.Replication(), nil
825 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
827 // IsValidLocator returns true if the specified string is a valid Keep locator.
828 // When Keep is extended to support hash types other than MD5,
829 // this should be updated to cover those as well.
831 func IsValidLocator(loc string) bool {
832 return validLocatorRe.MatchString(loc)
835 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
837 // GetAPIToken returns the OAuth2 token from the Authorization
838 // header of a HTTP request, or an empty string if no matching
840 func GetAPIToken(req *http.Request) string {
841 if auth, ok := req.Header["Authorization"]; ok {
842 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
849 // IsExpired returns true if the given Unix timestamp (expressed as a
850 // hexadecimal string) is in the past, or if timestampHex cannot be
851 // parsed as a hexadecimal string.
852 func IsExpired(timestampHex string) bool {
853 ts, err := strconv.ParseInt(timestampHex, 16, 0)
855 log.Printf("IsExpired: %s", err)
858 return time.Unix(ts, 0).Before(time.Now())
861 // CanDelete returns true if the user identified by apiToken is
862 // allowed to delete blocks.
863 func CanDelete(apiToken string) bool {
867 // Blocks may be deleted only when Keep has been configured with a
869 if IsSystemAuth(apiToken) {
872 // TODO(twp): look up apiToken with the API server
873 // return true if is_admin is true and if the token
874 // has unlimited scope
878 // IsSystemAuth returns true if the given token is allowed to perform
879 // system level actions like deleting data.
880 func IsSystemAuth(token string) bool {
881 return token != "" && token == theConfig.systemAuthToken