1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
23 "git.curoverse.com/arvados.git/sdk/go/arvados"
24 "git.curoverse.com/arvados.git/sdk/go/health"
25 "git.curoverse.com/arvados.git/sdk/go/httpserver"
26 "github.com/gorilla/mux"
31 limiter httpserver.RequestCounter
32 cluster *arvados.Cluster
33 remoteProxy remoteProxy
36 // MakeRESTRouter returns a new router that forwards all Keep requests
37 // to the appropriate handlers.
38 func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
40 Router: mux.NewRouter(),
45 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
47 `/{hash:[0-9a-f]{32}}+{hints}`,
48 rtr.handleGET).Methods("GET", "HEAD")
50 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
51 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
52 // List all blocks stored here. Privileged client only.
53 rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
54 // List blocks stored here whose hash has the given prefix.
55 // Privileged client only.
56 rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
58 // Internals/debugging info (runtime.MemStats)
59 rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
61 // List volumes: path, device number, bytes used/avail.
62 rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
64 // List mounts: UUID, readonly, tier, device ID, ...
65 rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
66 rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
67 rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
69 // Replace the current pull queue.
70 rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
72 // Replace the current trash queue.
73 rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
75 // Untrash moves blocks from trash back into store
76 rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
78 rtr.Handle("/_health/{check}", &health.Handler{
79 Token: theConfig.ManagementToken,
83 // Any request which does not match any of these routes gets
85 rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
87 rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
89 instrumented := httpserver.Instrument(nil, nil,
90 httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
91 return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
94 // BadRequestHandler is a HandleFunc to address bad requests.
95 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
96 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
99 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
100 ctx, cancel := contextForResponse(context.TODO(), resp)
103 locator := req.URL.Path[1:]
104 if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
105 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster)
109 if theConfig.RequireSignatures {
110 locator := req.URL.Path[1:] // strip leading slash
111 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
112 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
117 // TODO: Probe volumes to check whether the block _might_
118 // exist. Some volumes/types could support a quick existence
119 // check without causing other operations to suffer. If all
120 // volumes support that, and assure us the block definitely
121 // isn't here, we can return 404 now instead of waiting for a
124 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
126 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
131 size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
133 code := http.StatusInternalServerError
134 if err, ok := err.(*KeepError); ok {
137 http.Error(resp, err.Error(), code)
141 resp.Header().Set("Content-Length", strconv.Itoa(size))
142 resp.Header().Set("Content-Type", "application/octet-stream")
143 resp.Write(buf[:size])
146 // Return a new context that gets cancelled by resp's CloseNotifier.
147 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
148 ctx, cancel := context.WithCancel(parent)
149 if cn, ok := resp.(http.CloseNotifier); ok {
150 go func(c <-chan bool) {
153 theConfig.debugLogf("cancel context")
162 // Get a buffer from the pool -- but give up and return a non-nil
163 // error if ctx ends before we get a buffer.
164 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
165 bufReady := make(chan []byte)
167 bufReady <- bufs.Get(bufSize)
170 case buf := <-bufReady:
174 // Even if closeNotifier happened first, we
175 // need to keep waiting for our buf so we can
176 // return it to the pool.
179 return nil, ErrClientDisconnect
183 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
184 ctx, cancel := contextForResponse(context.TODO(), resp)
187 hash := mux.Vars(req)["hash"]
189 // Detect as many error conditions as possible before reading
190 // the body: avoid transmitting data that will not end up
191 // being written anyway.
193 if req.ContentLength == -1 {
194 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
198 if req.ContentLength > BlockSize {
199 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
203 if len(KeepVM.AllWritable()) == 0 {
204 http.Error(resp, FullError.Error(), FullError.HTTPCode)
208 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
210 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
214 _, err = io.ReadFull(req.Body, buf)
216 http.Error(resp, err.Error(), 500)
221 replication, err := PutBlock(ctx, buf, hash)
225 code := http.StatusInternalServerError
226 if err, ok := err.(*KeepError); ok {
229 http.Error(resp, err.Error(), code)
233 // Success; add a size hint, sign the locator if possible, and
234 // return it to the client.
235 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
236 apiToken := GetAPIToken(req)
237 if theConfig.blobSigningKey != nil && apiToken != "" {
238 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
239 returnHash = SignLocator(returnHash, apiToken, expiry)
241 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
242 resp.Write([]byte(returnHash + "\n"))
245 // IndexHandler responds to "/index", "/index/{prefix}", and
246 // "/mounts/{uuid}/blocks" requests.
247 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
248 if !IsSystemAuth(GetAPIToken(req)) {
249 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
253 prefix := mux.Vars(req)["prefix"]
256 prefix = req.Form.Get("prefix")
259 uuid := mux.Vars(req)["uuid"]
263 vols = KeepVM.AllReadable()
264 } else if v := KeepVM.Lookup(uuid, false); v == nil {
265 http.Error(resp, "mount not found", http.StatusNotFound)
271 for _, v := range vols {
272 if err := v.IndexTo(prefix, resp); err != nil {
273 // The only errors returned by IndexTo are
274 // write errors returned by resp.Write(),
275 // which probably means the client has
276 // disconnected and this error will never be
277 // reported to the client -- but it will
278 // appear in our own error log.
279 http.Error(resp, err.Error(), http.StatusInternalServerError)
283 // An empty line at EOF is the only way the client can be
284 // assured the entire index was received.
285 resp.Write([]byte{'\n'})
288 // MountsHandler responds to "GET /mounts" requests.
289 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
290 err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
292 http.Error(resp, err.Error(), http.StatusInternalServerError)
297 type PoolStatus struct {
298 Alloc uint64 `json:"BytesAllocatedCumulative"`
299 Cap int `json:"BuffersMax"`
300 Len int `json:"BuffersInUse"`
303 type volumeStatusEnt struct {
305 Status *VolumeStatus `json:",omitempty"`
306 VolumeStats *ioStats `json:",omitempty"`
307 InternalStats interface{} `json:",omitempty"`
311 type NodeStatus struct {
312 Volumes []*volumeStatusEnt
313 BufferPool PoolStatus
314 PullQueue WorkQueueStatus
315 TrashQueue WorkQueueStatus
322 var stLock sync.Mutex
324 // DebugHandler addresses /debug.json requests.
325 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
326 type debugStats struct {
327 MemStats runtime.MemStats
330 runtime.ReadMemStats(&ds.MemStats)
331 err := json.NewEncoder(resp).Encode(&ds)
333 http.Error(resp, err.Error(), 500)
337 // StatusHandler addresses /status.json requests.
338 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
340 rtr.readNodeStatus(&st)
341 jstat, err := json.Marshal(&st)
346 log.Printf("json.Marshal: %s", err)
347 log.Printf("NodeStatus = %v", &st)
348 http.Error(resp, err.Error(), 500)
352 // populate the given NodeStatus struct with current values.
353 func (rtr *router) readNodeStatus(st *NodeStatus) {
355 vols := KeepVM.AllReadable()
356 if cap(st.Volumes) < len(vols) {
357 st.Volumes = make([]*volumeStatusEnt, len(vols))
359 st.Volumes = st.Volumes[:0]
360 for _, vol := range vols {
361 var internalStats interface{}
362 if vol, ok := vol.(InternalStatser); ok {
363 internalStats = vol.InternalStats()
365 st.Volumes = append(st.Volumes, &volumeStatusEnt{
367 Status: vol.Status(),
368 InternalStats: internalStats,
369 //VolumeStats: KeepVM.VolumeStats(vol),
372 st.BufferPool.Alloc = bufs.Alloc()
373 st.BufferPool.Cap = bufs.Cap()
374 st.BufferPool.Len = bufs.Len()
375 st.PullQueue = getWorkQueueStatus(pullq)
376 st.TrashQueue = getWorkQueueStatus(trashq)
377 if rtr.limiter != nil {
378 st.RequestsCurrent = rtr.limiter.Current()
379 st.RequestsMax = rtr.limiter.Max()
383 // return a WorkQueueStatus for the given queue. If q is nil (which
384 // should never happen except in test suites), return a zero status
385 // value instead of crashing.
386 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
388 // This should only happen during tests.
389 return WorkQueueStatus{}
394 // DeleteHandler processes DELETE requests.
396 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
397 // from all connected volumes.
399 // Only the Data Manager, or an Arvados admin with scope "all", are
400 // allowed to issue DELETE requests. If a DELETE request is not
401 // authenticated or is issued by a non-admin user, the server returns
402 // a PermissionError.
404 // Upon receiving a valid request from an authorized user,
405 // DeleteHandler deletes all copies of the specified block on local
410 // If the requested blocks was not found on any volume, the response
411 // code is HTTP 404 Not Found.
413 // Otherwise, the response code is 200 OK, with a response body
414 // consisting of the JSON message
416 // {"copies_deleted":d,"copies_failed":f}
418 // where d and f are integers representing the number of blocks that
419 // were successfully and unsuccessfully deleted.
421 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
422 hash := mux.Vars(req)["hash"]
424 // Confirm that this user is an admin and has a token with unlimited scope.
425 var tok = GetAPIToken(req)
426 if tok == "" || !CanDelete(tok) {
427 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
431 if !theConfig.EnableDelete {
432 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
436 // Delete copies of this block from all available volumes.
437 // Report how many blocks were successfully deleted, and how
438 // many were found on writable volumes but not deleted.
440 Deleted int `json:"copies_deleted"`
441 Failed int `json:"copies_failed"`
443 for _, vol := range KeepVM.AllWritable() {
444 if err := vol.Trash(hash); err == nil {
446 } else if os.IsNotExist(err) {
450 log.Println("DeleteHandler:", err)
456 if result.Deleted == 0 && result.Failed == 0 {
457 st = http.StatusNotFound
464 if st == http.StatusOK {
465 if body, err := json.Marshal(result); err == nil {
468 log.Printf("json.Marshal: %s (result = %v)", err, result)
469 http.Error(resp, err.Error(), 500)
474 /* PullHandler processes "PUT /pull" requests for the data manager.
475 The request body is a JSON message containing a list of pull
476 requests in the following format:
480 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
482 "keep0.qr1hi.arvadosapi.com:25107",
483 "keep1.qr1hi.arvadosapi.com:25108"
487 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
497 Each pull request in the list consists of a block locator string
498 and an ordered list of servers. Keepstore should try to fetch the
499 block from each server in turn.
501 If the request has not been sent by the Data Manager, return 401
504 If the JSON unmarshalling fails, return 400 Bad Request.
507 // PullRequest consists of a block locator and an ordered list of servers
508 type PullRequest struct {
509 Locator string `json:"locator"`
510 Servers []string `json:"servers"`
512 // Destination mount, or "" for "anywhere"
513 MountUUID string `json:"mount_uuid"`
516 // PullHandler processes "PUT /pull" requests for the data manager.
517 func PullHandler(resp http.ResponseWriter, req *http.Request) {
518 // Reject unauthorized requests.
519 if !IsSystemAuth(GetAPIToken(req)) {
520 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
524 // Parse the request body.
526 r := json.NewDecoder(req.Body)
527 if err := r.Decode(&pr); err != nil {
528 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
532 // We have a properly formatted pull list sent from the data
533 // manager. Report success and send the list to the pull list
534 // manager for further handling.
535 resp.WriteHeader(http.StatusOK)
537 fmt.Sprintf("Received %d pull requests\n", len(pr))))
540 for _, p := range pr {
543 pullq.ReplaceQueue(plist)
546 // TrashRequest consists of a block locator and its Mtime
547 type TrashRequest struct {
548 Locator string `json:"locator"`
549 BlockMtime int64 `json:"block_mtime"`
551 // Target mount, or "" for "everywhere"
552 MountUUID string `json:"mount_uuid"`
555 // TrashHandler processes /trash requests.
556 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
557 // Reject unauthorized requests.
558 if !IsSystemAuth(GetAPIToken(req)) {
559 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
563 // Parse the request body.
564 var trash []TrashRequest
565 r := json.NewDecoder(req.Body)
566 if err := r.Decode(&trash); err != nil {
567 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
571 // We have a properly formatted trash list sent from the data
572 // manager. Report success and send the list to the trash work
573 // queue for further handling.
574 resp.WriteHeader(http.StatusOK)
576 fmt.Sprintf("Received %d trash requests\n", len(trash))))
579 for _, t := range trash {
582 trashq.ReplaceQueue(tlist)
585 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
586 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
587 // Reject unauthorized requests.
588 if !IsSystemAuth(GetAPIToken(req)) {
589 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
593 hash := mux.Vars(req)["hash"]
595 if len(KeepVM.AllWritable()) == 0 {
596 http.Error(resp, "No writable volumes", http.StatusNotFound)
600 var untrashedOn, failedOn []string
602 for _, vol := range KeepVM.AllWritable() {
603 err := vol.Untrash(hash)
605 if os.IsNotExist(err) {
607 } else if err != nil {
608 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
609 failedOn = append(failedOn, vol.String())
611 log.Printf("Untrashed %v on volume %v", hash, vol.String())
612 untrashedOn = append(untrashedOn, vol.String())
616 if numNotFound == len(KeepVM.AllWritable()) {
617 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
621 if len(failedOn) == len(KeepVM.AllWritable()) {
622 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
624 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
625 if len(failedOn) > 0 {
626 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
628 resp.Write([]byte(respBody))
632 // GetBlock and PutBlock implement lower-level code for handling
633 // blocks by rooting through volumes connected to the local machine.
634 // Once the handler has determined that system policy permits the
635 // request, it calls these methods to perform the actual operation.
637 // TODO(twp): this code would probably be better located in the
638 // VolumeManager interface. As an abstraction, the VolumeManager
639 // should be the only part of the code that cares about which volume a
640 // block is stored on, so it should be responsible for figuring out
641 // which volume to check for fetching blocks, storing blocks, etc.
643 // GetBlock fetches the block identified by "hash" into the provided
644 // buf, and returns the data size.
646 // If the block cannot be found on any volume, returns NotFoundError.
648 // If the block found does not have the correct MD5 hash, returns
651 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
652 // Attempt to read the requested hash from a keep volume.
653 errorToCaller := NotFoundError
655 for _, vol := range KeepVM.AllReadable() {
656 size, err := vol.Get(ctx, hash, buf)
659 return 0, ErrClientDisconnect
663 // IsNotExist is an expected error and may be
664 // ignored. All other errors are logged. In
665 // any case we continue trying to read other
666 // volumes. If all volumes report IsNotExist,
667 // we return a NotFoundError.
668 if !os.IsNotExist(err) {
669 log.Printf("%s: Get(%s): %s", vol, hash, err)
671 // If some volume returns a transient error, return it to the caller
672 // instead of "Not found" so it can retry.
673 if err == VolumeBusyError {
674 errorToCaller = err.(*KeepError)
678 // Check the file checksum.
680 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
681 if filehash != hash {
682 // TODO: Try harder to tell a sysadmin about
684 log.Printf("%s: checksum mismatch for request %s (actual %s)",
686 errorToCaller = DiskHashError
689 if errorToCaller == DiskHashError {
690 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
695 return 0, errorToCaller
698 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
700 // PutBlock(ctx, block, hash)
701 // Stores the BLOCK (identified by the content id HASH) in Keep.
703 // The MD5 checksum of the block must be identical to the content id HASH.
704 // If not, an error is returned.
706 // PutBlock stores the BLOCK on the first Keep volume with free space.
707 // A failure code is returned to the user only if all volumes fail.
709 // On success, PutBlock returns nil.
710 // On failure, it returns a KeepError with one of the following codes:
713 // A different block with the same hash already exists on this
716 // The MD5 hash of the BLOCK does not match the argument HASH.
718 // There was not enough space left in any Keep volume to store
721 // The object could not be stored for some other reason (e.g.
722 // all writes failed). The text of the error message should
723 // provide as much detail as possible.
725 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
726 // Check that BLOCK's checksum matches HASH.
727 blockhash := fmt.Sprintf("%x", md5.Sum(block))
728 if blockhash != hash {
729 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
730 return 0, RequestHashError
733 // If we already have this data, it's intact on disk, and we
734 // can update its timestamp, return success. If we have
735 // different data with the same hash, return failure.
736 if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
738 } else if ctx.Err() != nil {
739 return 0, ErrClientDisconnect
742 // Choose a Keep volume to write to.
743 // If this volume fails, try all of the volumes in order.
744 if vol := KeepVM.NextWritable(); vol != nil {
745 if err := vol.Put(ctx, hash, block); err == nil {
746 return vol.Replication(), nil // success!
748 if ctx.Err() != nil {
749 return 0, ErrClientDisconnect
753 writables := KeepVM.AllWritable()
754 if len(writables) == 0 {
755 log.Print("No writable volumes.")
760 for _, vol := range writables {
761 err := vol.Put(ctx, hash, block)
762 if ctx.Err() != nil {
763 return 0, ErrClientDisconnect
766 return vol.Replication(), nil // success!
768 if err != FullError {
769 // The volume is not full but the
770 // write did not succeed. Report the
771 // error and continue trying.
773 log.Printf("%s: Write(%s): %s", vol, hash, err)
778 log.Print("All volumes are full.")
781 // Already logged the non-full errors.
782 return 0, GenericError
785 // CompareAndTouch returns the current replication level if one of the
786 // volumes already has the given content and it successfully updates
787 // the relevant block's modification time in order to protect it from
788 // premature garbage collection. Otherwise, it returns a non-nil
790 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
791 var bestErr error = NotFoundError
792 for _, vol := range KeepVM.AllWritable() {
793 err := vol.Compare(ctx, hash, buf)
794 if ctx.Err() != nil {
796 } else if err == CollisionError {
797 // Stop if we have a block with same hash but
798 // different content. (It will be impossible
799 // to tell which one is wanted if we have
800 // both, so there's no point writing it even
801 // on a different volume.)
802 log.Printf("%s: Compare(%s): %s", vol, hash, err)
804 } else if os.IsNotExist(err) {
805 // Block does not exist. This is the only
806 // "normal" error: we don't log anything.
808 } else if err != nil {
809 // Couldn't open file, data is corrupt on
810 // disk, etc.: log this abnormal condition,
811 // and try the next volume.
812 log.Printf("%s: Compare(%s): %s", vol, hash, err)
815 if err := vol.Touch(hash); err != nil {
816 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
820 // Compare and Touch both worked --> done.
821 return vol.Replication(), nil
826 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
828 // IsValidLocator returns true if the specified string is a valid Keep locator.
829 // When Keep is extended to support hash types other than MD5,
830 // this should be updated to cover those as well.
832 func IsValidLocator(loc string) bool {
833 return validLocatorRe.MatchString(loc)
836 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
838 // GetAPIToken returns the OAuth2 token from the Authorization
839 // header of a HTTP request, or an empty string if no matching
841 func GetAPIToken(req *http.Request) string {
842 if auth, ok := req.Header["Authorization"]; ok {
843 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
850 // IsExpired returns true if the given Unix timestamp (expressed as a
851 // hexadecimal string) is in the past, or if timestampHex cannot be
852 // parsed as a hexadecimal string.
853 func IsExpired(timestampHex string) bool {
854 ts, err := strconv.ParseInt(timestampHex, 16, 0)
856 log.Printf("IsExpired: %s", err)
859 return time.Unix(ts, 0).Before(time.Now())
862 // CanDelete returns true if the user identified by apiToken is
863 // allowed to delete blocks.
864 func CanDelete(apiToken string) bool {
868 // Blocks may be deleted only when Keep has been configured with a
870 if IsSystemAuth(apiToken) {
873 // TODO(twp): look up apiToken with the API server
874 // return true if is_admin is true and if the token
875 // has unlimited scope
879 // IsSystemAuth returns true if the given token is allowed to perform
880 // system level actions like deleting data.
881 func IsSystemAuth(token string) bool {
882 return token != "" && token == theConfig.systemAuthToken