1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
23 "git.curoverse.com/arvados.git/sdk/go/arvados"
24 "git.curoverse.com/arvados.git/sdk/go/health"
25 "git.curoverse.com/arvados.git/sdk/go/httpserver"
26 "github.com/gorilla/mux"
27 "github.com/prometheus/client_golang/prometheus"
32 limiter httpserver.RequestCounter
33 cluster *arvados.Cluster
34 remoteProxy remoteProxy
35 registry *prometheus.Registry
39 // MakeRESTRouter returns a new router that forwards all Keep requests
40 // to the appropriate handlers.
41 func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
43 Router: mux.NewRouter(),
45 registry: prometheus.NewRegistry(),
49 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
51 `/{hash:[0-9a-f]{32}}+{hints}`,
52 rtr.handleGET).Methods("GET", "HEAD")
54 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
55 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
56 // List all blocks stored here. Privileged client only.
57 rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
58 // List blocks stored here whose hash has the given prefix.
59 // Privileged client only.
60 rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
62 // Internals/debugging info (runtime.MemStats)
63 rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
65 // List volumes: path, device number, bytes used/avail.
66 rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
68 // List mounts: UUID, readonly, tier, device ID, ...
69 rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
70 rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
71 rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
73 // Replace the current pull queue.
74 rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
76 // Replace the current trash queue.
77 rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
79 // Untrash moves blocks from trash back into store
80 rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
82 rtr.Handle("/_health/{check}", &health.Handler{
83 Token: theConfig.ManagementToken,
87 // Any request which does not match any of these routes gets
89 rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
91 rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
92 rtr.metrics = nodeMetrics{
98 instrumented := httpserver.Instrument(rtr.registry, nil,
99 httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
100 return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
103 // BadRequestHandler is a HandleFunc to address bad requests.
104 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
105 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
108 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
109 ctx, cancel := contextForResponse(context.TODO(), resp)
112 locator := req.URL.Path[1:]
113 if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
114 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster)
118 if theConfig.RequireSignatures {
119 locator := req.URL.Path[1:] // strip leading slash
120 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
121 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
126 // TODO: Probe volumes to check whether the block _might_
127 // exist. Some volumes/types could support a quick existence
128 // check without causing other operations to suffer. If all
129 // volumes support that, and assure us the block definitely
130 // isn't here, we can return 404 now instead of waiting for a
133 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
135 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
140 size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
142 code := http.StatusInternalServerError
143 if err, ok := err.(*KeepError); ok {
146 http.Error(resp, err.Error(), code)
150 resp.Header().Set("Content-Length", strconv.Itoa(size))
151 resp.Header().Set("Content-Type", "application/octet-stream")
152 resp.Write(buf[:size])
155 // Return a new context that gets cancelled by resp's CloseNotifier.
156 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
157 ctx, cancel := context.WithCancel(parent)
158 if cn, ok := resp.(http.CloseNotifier); ok {
159 go func(c <-chan bool) {
162 theConfig.debugLogf("cancel context")
171 // Get a buffer from the pool -- but give up and return a non-nil
172 // error if ctx ends before we get a buffer.
173 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
174 bufReady := make(chan []byte)
176 bufReady <- bufs.Get(bufSize)
179 case buf := <-bufReady:
183 // Even if closeNotifier happened first, we
184 // need to keep waiting for our buf so we can
185 // return it to the pool.
188 return nil, ErrClientDisconnect
192 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
193 ctx, cancel := contextForResponse(context.TODO(), resp)
196 hash := mux.Vars(req)["hash"]
198 // Detect as many error conditions as possible before reading
199 // the body: avoid transmitting data that will not end up
200 // being written anyway.
202 if req.ContentLength == -1 {
203 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
207 if req.ContentLength > BlockSize {
208 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
212 if len(KeepVM.AllWritable()) == 0 {
213 http.Error(resp, FullError.Error(), FullError.HTTPCode)
217 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
219 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
223 _, err = io.ReadFull(req.Body, buf)
225 http.Error(resp, err.Error(), 500)
230 replication, err := PutBlock(ctx, buf, hash)
234 code := http.StatusInternalServerError
235 if err, ok := err.(*KeepError); ok {
238 http.Error(resp, err.Error(), code)
242 // Success; add a size hint, sign the locator if possible, and
243 // return it to the client.
244 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
245 apiToken := GetAPIToken(req)
246 if theConfig.blobSigningKey != nil && apiToken != "" {
247 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
248 returnHash = SignLocator(returnHash, apiToken, expiry)
250 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
251 resp.Write([]byte(returnHash + "\n"))
254 // IndexHandler responds to "/index", "/index/{prefix}", and
255 // "/mounts/{uuid}/blocks" requests.
256 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
257 if !IsSystemAuth(GetAPIToken(req)) {
258 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
262 prefix := mux.Vars(req)["prefix"]
265 prefix = req.Form.Get("prefix")
268 uuid := mux.Vars(req)["uuid"]
272 vols = KeepVM.AllReadable()
273 } else if v := KeepVM.Lookup(uuid, false); v == nil {
274 http.Error(resp, "mount not found", http.StatusNotFound)
280 for _, v := range vols {
281 if err := v.IndexTo(prefix, resp); err != nil {
282 // The only errors returned by IndexTo are
283 // write errors returned by resp.Write(),
284 // which probably means the client has
285 // disconnected and this error will never be
286 // reported to the client -- but it will
287 // appear in our own error log.
288 http.Error(resp, err.Error(), http.StatusInternalServerError)
292 // An empty line at EOF is the only way the client can be
293 // assured the entire index was received.
294 resp.Write([]byte{'\n'})
297 // MountsHandler responds to "GET /mounts" requests.
298 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
299 err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
301 http.Error(resp, err.Error(), http.StatusInternalServerError)
306 type PoolStatus struct {
307 Alloc uint64 `json:"BytesAllocatedCumulative"`
308 Cap int `json:"BuffersMax"`
309 Len int `json:"BuffersInUse"`
312 type volumeStatusEnt struct {
314 Status *VolumeStatus `json:",omitempty"`
315 VolumeStats *ioStats `json:",omitempty"`
316 InternalStats interface{} `json:",omitempty"`
320 type NodeStatus struct {
321 Volumes []*volumeStatusEnt
322 BufferPool PoolStatus
323 PullQueue WorkQueueStatus
324 TrashQueue WorkQueueStatus
331 var stLock sync.Mutex
333 // DebugHandler addresses /debug.json requests.
334 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
335 type debugStats struct {
336 MemStats runtime.MemStats
339 runtime.ReadMemStats(&ds.MemStats)
340 err := json.NewEncoder(resp).Encode(&ds)
342 http.Error(resp, err.Error(), 500)
346 // StatusHandler addresses /status.json requests.
347 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
349 rtr.readNodeStatus(&st)
350 jstat, err := json.Marshal(&st)
355 log.Printf("json.Marshal: %s", err)
356 log.Printf("NodeStatus = %v", &st)
357 http.Error(resp, err.Error(), 500)
361 // populate the given NodeStatus struct with current values.
362 func (rtr *router) readNodeStatus(st *NodeStatus) {
364 vols := KeepVM.AllReadable()
365 if cap(st.Volumes) < len(vols) {
366 st.Volumes = make([]*volumeStatusEnt, len(vols))
368 st.Volumes = st.Volumes[:0]
369 for _, vol := range vols {
370 var internalStats interface{}
371 if vol, ok := vol.(InternalStatser); ok {
372 internalStats = vol.InternalStats()
374 st.Volumes = append(st.Volumes, &volumeStatusEnt{
376 Status: vol.Status(),
377 InternalStats: internalStats,
378 //VolumeStats: KeepVM.VolumeStats(vol),
381 st.BufferPool.Alloc = bufs.Alloc()
382 st.BufferPool.Cap = bufs.Cap()
383 st.BufferPool.Len = bufs.Len()
384 st.PullQueue = getWorkQueueStatus(pullq)
385 st.TrashQueue = getWorkQueueStatus(trashq)
386 if rtr.limiter != nil {
387 st.RequestsCurrent = rtr.limiter.Current()
388 st.RequestsMax = rtr.limiter.Max()
392 // return a WorkQueueStatus for the given queue. If q is nil (which
393 // should never happen except in test suites), return a zero status
394 // value instead of crashing.
395 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
397 // This should only happen during tests.
398 return WorkQueueStatus{}
403 // DeleteHandler processes DELETE requests.
405 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
406 // from all connected volumes.
408 // Only the Data Manager, or an Arvados admin with scope "all", are
409 // allowed to issue DELETE requests. If a DELETE request is not
410 // authenticated or is issued by a non-admin user, the server returns
411 // a PermissionError.
413 // Upon receiving a valid request from an authorized user,
414 // DeleteHandler deletes all copies of the specified block on local
419 // If the requested blocks was not found on any volume, the response
420 // code is HTTP 404 Not Found.
422 // Otherwise, the response code is 200 OK, with a response body
423 // consisting of the JSON message
425 // {"copies_deleted":d,"copies_failed":f}
427 // where d and f are integers representing the number of blocks that
428 // were successfully and unsuccessfully deleted.
430 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
431 hash := mux.Vars(req)["hash"]
433 // Confirm that this user is an admin and has a token with unlimited scope.
434 var tok = GetAPIToken(req)
435 if tok == "" || !CanDelete(tok) {
436 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
440 if !theConfig.EnableDelete {
441 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
445 // Delete copies of this block from all available volumes.
446 // Report how many blocks were successfully deleted, and how
447 // many were found on writable volumes but not deleted.
449 Deleted int `json:"copies_deleted"`
450 Failed int `json:"copies_failed"`
452 for _, vol := range KeepVM.AllWritable() {
453 if err := vol.Trash(hash); err == nil {
455 } else if os.IsNotExist(err) {
459 log.Println("DeleteHandler:", err)
465 if result.Deleted == 0 && result.Failed == 0 {
466 st = http.StatusNotFound
473 if st == http.StatusOK {
474 if body, err := json.Marshal(result); err == nil {
477 log.Printf("json.Marshal: %s (result = %v)", err, result)
478 http.Error(resp, err.Error(), 500)
483 /* PullHandler processes "PUT /pull" requests for the data manager.
484 The request body is a JSON message containing a list of pull
485 requests in the following format:
489 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
491 "keep0.qr1hi.arvadosapi.com:25107",
492 "keep1.qr1hi.arvadosapi.com:25108"
496 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
506 Each pull request in the list consists of a block locator string
507 and an ordered list of servers. Keepstore should try to fetch the
508 block from each server in turn.
510 If the request has not been sent by the Data Manager, return 401
513 If the JSON unmarshalling fails, return 400 Bad Request.
516 // PullRequest consists of a block locator and an ordered list of servers
517 type PullRequest struct {
518 Locator string `json:"locator"`
519 Servers []string `json:"servers"`
521 // Destination mount, or "" for "anywhere"
522 MountUUID string `json:"mount_uuid"`
525 // PullHandler processes "PUT /pull" requests for the data manager.
526 func PullHandler(resp http.ResponseWriter, req *http.Request) {
527 // Reject unauthorized requests.
528 if !IsSystemAuth(GetAPIToken(req)) {
529 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
533 // Parse the request body.
535 r := json.NewDecoder(req.Body)
536 if err := r.Decode(&pr); err != nil {
537 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
541 // We have a properly formatted pull list sent from the data
542 // manager. Report success and send the list to the pull list
543 // manager for further handling.
544 resp.WriteHeader(http.StatusOK)
546 fmt.Sprintf("Received %d pull requests\n", len(pr))))
549 for _, p := range pr {
552 pullq.ReplaceQueue(plist)
555 // TrashRequest consists of a block locator and its Mtime
556 type TrashRequest struct {
557 Locator string `json:"locator"`
558 BlockMtime int64 `json:"block_mtime"`
560 // Target mount, or "" for "everywhere"
561 MountUUID string `json:"mount_uuid"`
564 // TrashHandler processes /trash requests.
565 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
566 // Reject unauthorized requests.
567 if !IsSystemAuth(GetAPIToken(req)) {
568 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
572 // Parse the request body.
573 var trash []TrashRequest
574 r := json.NewDecoder(req.Body)
575 if err := r.Decode(&trash); err != nil {
576 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
580 // We have a properly formatted trash list sent from the data
581 // manager. Report success and send the list to the trash work
582 // queue for further handling.
583 resp.WriteHeader(http.StatusOK)
585 fmt.Sprintf("Received %d trash requests\n", len(trash))))
588 for _, t := range trash {
591 trashq.ReplaceQueue(tlist)
594 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
595 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
596 // Reject unauthorized requests.
597 if !IsSystemAuth(GetAPIToken(req)) {
598 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
602 hash := mux.Vars(req)["hash"]
604 if len(KeepVM.AllWritable()) == 0 {
605 http.Error(resp, "No writable volumes", http.StatusNotFound)
609 var untrashedOn, failedOn []string
611 for _, vol := range KeepVM.AllWritable() {
612 err := vol.Untrash(hash)
614 if os.IsNotExist(err) {
616 } else if err != nil {
617 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
618 failedOn = append(failedOn, vol.String())
620 log.Printf("Untrashed %v on volume %v", hash, vol.String())
621 untrashedOn = append(untrashedOn, vol.String())
625 if numNotFound == len(KeepVM.AllWritable()) {
626 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
630 if len(failedOn) == len(KeepVM.AllWritable()) {
631 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
633 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
634 if len(failedOn) > 0 {
635 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
637 resp.Write([]byte(respBody))
641 // GetBlock and PutBlock implement lower-level code for handling
642 // blocks by rooting through volumes connected to the local machine.
643 // Once the handler has determined that system policy permits the
644 // request, it calls these methods to perform the actual operation.
646 // TODO(twp): this code would probably be better located in the
647 // VolumeManager interface. As an abstraction, the VolumeManager
648 // should be the only part of the code that cares about which volume a
649 // block is stored on, so it should be responsible for figuring out
650 // which volume to check for fetching blocks, storing blocks, etc.
652 // GetBlock fetches the block identified by "hash" into the provided
653 // buf, and returns the data size.
655 // If the block cannot be found on any volume, returns NotFoundError.
657 // If the block found does not have the correct MD5 hash, returns
660 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
661 // Attempt to read the requested hash from a keep volume.
662 errorToCaller := NotFoundError
664 for _, vol := range KeepVM.AllReadable() {
665 size, err := vol.Get(ctx, hash, buf)
668 return 0, ErrClientDisconnect
672 // IsNotExist is an expected error and may be
673 // ignored. All other errors are logged. In
674 // any case we continue trying to read other
675 // volumes. If all volumes report IsNotExist,
676 // we return a NotFoundError.
677 if !os.IsNotExist(err) {
678 log.Printf("%s: Get(%s): %s", vol, hash, err)
682 // Check the file checksum.
684 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
685 if filehash != hash {
686 // TODO: Try harder to tell a sysadmin about
688 log.Printf("%s: checksum mismatch for request %s (actual %s)",
690 errorToCaller = DiskHashError
693 if errorToCaller == DiskHashError {
694 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
699 return 0, errorToCaller
702 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
704 // PutBlock(ctx, block, hash)
705 // Stores the BLOCK (identified by the content id HASH) in Keep.
707 // The MD5 checksum of the block must be identical to the content id HASH.
708 // If not, an error is returned.
710 // PutBlock stores the BLOCK on the first Keep volume with free space.
711 // A failure code is returned to the user only if all volumes fail.
713 // On success, PutBlock returns nil.
714 // On failure, it returns a KeepError with one of the following codes:
717 // A different block with the same hash already exists on this
720 // The MD5 hash of the BLOCK does not match the argument HASH.
722 // There was not enough space left in any Keep volume to store
725 // The object could not be stored for some other reason (e.g.
726 // all writes failed). The text of the error message should
727 // provide as much detail as possible.
729 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
730 // Check that BLOCK's checksum matches HASH.
731 blockhash := fmt.Sprintf("%x", md5.Sum(block))
732 if blockhash != hash {
733 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
734 return 0, RequestHashError
737 // If we already have this data, it's intact on disk, and we
738 // can update its timestamp, return success. If we have
739 // different data with the same hash, return failure.
740 if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
742 } else if ctx.Err() != nil {
743 return 0, ErrClientDisconnect
746 // Choose a Keep volume to write to.
747 // If this volume fails, try all of the volumes in order.
748 if vol := KeepVM.NextWritable(); vol != nil {
749 if err := vol.Put(ctx, hash, block); err == nil {
750 return vol.Replication(), nil // success!
752 if ctx.Err() != nil {
753 return 0, ErrClientDisconnect
757 writables := KeepVM.AllWritable()
758 if len(writables) == 0 {
759 log.Print("No writable volumes.")
764 for _, vol := range writables {
765 err := vol.Put(ctx, hash, block)
766 if ctx.Err() != nil {
767 return 0, ErrClientDisconnect
770 return vol.Replication(), nil // success!
772 if err != FullError {
773 // The volume is not full but the
774 // write did not succeed. Report the
775 // error and continue trying.
777 log.Printf("%s: Write(%s): %s", vol, hash, err)
782 log.Print("All volumes are full.")
785 // Already logged the non-full errors.
786 return 0, GenericError
789 // CompareAndTouch returns the current replication level if one of the
790 // volumes already has the given content and it successfully updates
791 // the relevant block's modification time in order to protect it from
792 // premature garbage collection. Otherwise, it returns a non-nil
794 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
795 var bestErr error = NotFoundError
796 for _, vol := range KeepVM.AllWritable() {
797 err := vol.Compare(ctx, hash, buf)
798 if ctx.Err() != nil {
800 } else if err == CollisionError {
801 // Stop if we have a block with same hash but
802 // different content. (It will be impossible
803 // to tell which one is wanted if we have
804 // both, so there's no point writing it even
805 // on a different volume.)
806 log.Printf("%s: Compare(%s): %s", vol, hash, err)
808 } else if os.IsNotExist(err) {
809 // Block does not exist. This is the only
810 // "normal" error: we don't log anything.
812 } else if err != nil {
813 // Couldn't open file, data is corrupt on
814 // disk, etc.: log this abnormal condition,
815 // and try the next volume.
816 log.Printf("%s: Compare(%s): %s", vol, hash, err)
819 if err := vol.Touch(hash); err != nil {
820 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
824 // Compare and Touch both worked --> done.
825 return vol.Replication(), nil
830 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
832 // IsValidLocator returns true if the specified string is a valid Keep locator.
833 // When Keep is extended to support hash types other than MD5,
834 // this should be updated to cover those as well.
836 func IsValidLocator(loc string) bool {
837 return validLocatorRe.MatchString(loc)
840 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
842 // GetAPIToken returns the OAuth2 token from the Authorization
843 // header of a HTTP request, or an empty string if no matching
845 func GetAPIToken(req *http.Request) string {
846 if auth, ok := req.Header["Authorization"]; ok {
847 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
854 // IsExpired returns true if the given Unix timestamp (expressed as a
855 // hexadecimal string) is in the past, or if timestampHex cannot be
856 // parsed as a hexadecimal string.
857 func IsExpired(timestampHex string) bool {
858 ts, err := strconv.ParseInt(timestampHex, 16, 0)
860 log.Printf("IsExpired: %s", err)
863 return time.Unix(ts, 0).Before(time.Now())
866 // CanDelete returns true if the user identified by apiToken is
867 // allowed to delete blocks.
868 func CanDelete(apiToken string) bool {
872 // Blocks may be deleted only when Keep has been configured with a
874 if IsSystemAuth(apiToken) {
877 // TODO(twp): look up apiToken with the API server
878 // return true if is_admin is true and if the token
879 // has unlimited scope
883 // IsSystemAuth returns true if the given token is allowed to perform
884 // system level actions like deleting data.
885 func IsSystemAuth(token string) bool {
886 return token != "" && token == theConfig.systemAuthToken