1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
23 "git.curoverse.com/arvados.git/sdk/go/arvados"
24 "git.curoverse.com/arvados.git/sdk/go/health"
25 "git.curoverse.com/arvados.git/sdk/go/httpserver"
26 "github.com/gorilla/mux"
27 "github.com/prometheus/client_golang/prometheus"
32 limiter httpserver.RequestCounter
33 cluster *arvados.Cluster
34 remoteProxy remoteProxy
38 // MakeRESTRouter returns a new router that forwards all Keep requests
39 // to the appropriate handlers.
40 func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
42 Router: mux.NewRouter(),
44 metrics: &nodeMetrics{reg: reg},
48 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
50 `/{hash:[0-9a-f]{32}}+{hints}`,
51 rtr.handleGET).Methods("GET", "HEAD")
53 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
54 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
55 // List all blocks stored here. Privileged client only.
56 rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
57 // List blocks stored here whose hash has the given prefix.
58 // Privileged client only.
59 rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
61 // Internals/debugging info (runtime.MemStats)
62 rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
64 // List volumes: path, device number, bytes used/avail.
65 rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
67 // List mounts: UUID, readonly, tier, device ID, ...
68 rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
69 rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
70 rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
72 // Replace the current pull queue.
73 rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
75 // Replace the current trash queue.
76 rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
78 // Untrash moves blocks from trash back into store
79 rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
81 rtr.Handle("/_health/{check}", &health.Handler{
82 Token: theConfig.ManagementToken,
86 // Any request which does not match any of these routes gets
88 rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
90 rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
91 rtr.metrics.setupBufferPoolMetrics(bufs)
92 rtr.metrics.setupWorkQueueMetrics(pullq, "pull")
93 rtr.metrics.setupWorkQueueMetrics(trashq, "trash")
94 rtr.metrics.setupRequestMetrics(rtr.limiter)
96 instrumented := httpserver.Instrument(rtr.metrics.reg, nil,
97 httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
98 return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
101 // BadRequestHandler is a HandleFunc to address bad requests.
102 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
103 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
106 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
107 ctx, cancel := contextForResponse(context.TODO(), resp)
110 locator := req.URL.Path[1:]
111 if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
112 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster)
116 if theConfig.RequireSignatures {
117 locator := req.URL.Path[1:] // strip leading slash
118 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
119 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
124 // TODO: Probe volumes to check whether the block _might_
125 // exist. Some volumes/types could support a quick existence
126 // check without causing other operations to suffer. If all
127 // volumes support that, and assure us the block definitely
128 // isn't here, we can return 404 now instead of waiting for a
131 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
133 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
138 size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
140 code := http.StatusInternalServerError
141 if err, ok := err.(*KeepError); ok {
144 http.Error(resp, err.Error(), code)
148 resp.Header().Set("Content-Length", strconv.Itoa(size))
149 resp.Header().Set("Content-Type", "application/octet-stream")
150 resp.Write(buf[:size])
153 // Return a new context that gets cancelled by resp's CloseNotifier.
154 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
155 ctx, cancel := context.WithCancel(parent)
156 if cn, ok := resp.(http.CloseNotifier); ok {
157 go func(c <-chan bool) {
160 theConfig.debugLogf("cancel context")
169 // Get a buffer from the pool -- but give up and return a non-nil
170 // error if ctx ends before we get a buffer.
171 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
172 bufReady := make(chan []byte)
174 bufReady <- bufs.Get(bufSize)
177 case buf := <-bufReady:
181 // Even if closeNotifier happened first, we
182 // need to keep waiting for our buf so we can
183 // return it to the pool.
186 return nil, ErrClientDisconnect
190 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
191 ctx, cancel := contextForResponse(context.TODO(), resp)
194 hash := mux.Vars(req)["hash"]
196 // Detect as many error conditions as possible before reading
197 // the body: avoid transmitting data that will not end up
198 // being written anyway.
200 if req.ContentLength == -1 {
201 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
205 if req.ContentLength > BlockSize {
206 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
210 if len(KeepVM.AllWritable()) == 0 {
211 http.Error(resp, FullError.Error(), FullError.HTTPCode)
215 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
217 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
221 _, err = io.ReadFull(req.Body, buf)
223 http.Error(resp, err.Error(), 500)
228 replication, err := PutBlock(ctx, buf, hash)
232 code := http.StatusInternalServerError
233 if err, ok := err.(*KeepError); ok {
236 http.Error(resp, err.Error(), code)
240 // Success; add a size hint, sign the locator if possible, and
241 // return it to the client.
242 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
243 apiToken := GetAPIToken(req)
244 if theConfig.blobSigningKey != nil && apiToken != "" {
245 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
246 returnHash = SignLocator(returnHash, apiToken, expiry)
248 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
249 resp.Write([]byte(returnHash + "\n"))
252 // IndexHandler responds to "/index", "/index/{prefix}", and
253 // "/mounts/{uuid}/blocks" requests.
254 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
255 if !IsSystemAuth(GetAPIToken(req)) {
256 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
260 prefix := mux.Vars(req)["prefix"]
263 prefix = req.Form.Get("prefix")
266 uuid := mux.Vars(req)["uuid"]
270 vols = KeepVM.AllReadable()
271 } else if v := KeepVM.Lookup(uuid, false); v == nil {
272 http.Error(resp, "mount not found", http.StatusNotFound)
278 for _, v := range vols {
279 if err := v.IndexTo(prefix, resp); err != nil {
280 // We can't send an error message to the
281 // client because we might have already sent
282 // headers and index content. All we can do is
283 // log the error in our own logs, and (in
284 // cases where headers haven't been sent yet)
287 // If headers have already been sent, the
288 // client must notice the lack of trailing
289 // newline as an indication that the response
291 log.Printf("index error from volume %s: %s", v, err)
292 http.Error(resp, "", http.StatusInternalServerError)
296 // An empty line at EOF is the only way the client can be
297 // assured the entire index was received.
298 resp.Write([]byte{'\n'})
301 // MountsHandler responds to "GET /mounts" requests.
302 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
303 err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
305 http.Error(resp, err.Error(), http.StatusInternalServerError)
310 type PoolStatus struct {
311 Alloc uint64 `json:"BytesAllocatedCumulative"`
312 Cap int `json:"BuffersMax"`
313 Len int `json:"BuffersInUse"`
316 type volumeStatusEnt struct {
318 Status *VolumeStatus `json:",omitempty"`
319 VolumeStats *ioStats `json:",omitempty"`
320 InternalStats interface{} `json:",omitempty"`
324 type NodeStatus struct {
325 Volumes []*volumeStatusEnt
326 BufferPool PoolStatus
327 PullQueue WorkQueueStatus
328 TrashQueue WorkQueueStatus
335 var stLock sync.Mutex
337 // DebugHandler addresses /debug.json requests.
338 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
339 type debugStats struct {
340 MemStats runtime.MemStats
343 runtime.ReadMemStats(&ds.MemStats)
344 err := json.NewEncoder(resp).Encode(&ds)
346 http.Error(resp, err.Error(), 500)
350 // StatusHandler addresses /status.json requests.
351 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
353 rtr.readNodeStatus(&st)
354 jstat, err := json.Marshal(&st)
359 log.Printf("json.Marshal: %s", err)
360 log.Printf("NodeStatus = %v", &st)
361 http.Error(resp, err.Error(), 500)
365 // populate the given NodeStatus struct with current values.
366 func (rtr *router) readNodeStatus(st *NodeStatus) {
368 vols := KeepVM.AllReadable()
369 if cap(st.Volumes) < len(vols) {
370 st.Volumes = make([]*volumeStatusEnt, len(vols))
372 st.Volumes = st.Volumes[:0]
373 for _, vol := range vols {
374 var internalStats interface{}
375 if vol, ok := vol.(InternalStatser); ok {
376 internalStats = vol.InternalStats()
378 st.Volumes = append(st.Volumes, &volumeStatusEnt{
380 Status: vol.Status(),
381 InternalStats: internalStats,
382 //VolumeStats: KeepVM.VolumeStats(vol),
385 st.BufferPool.Alloc = bufs.Alloc()
386 st.BufferPool.Cap = bufs.Cap()
387 st.BufferPool.Len = bufs.Len()
388 st.PullQueue = getWorkQueueStatus(pullq)
389 st.TrashQueue = getWorkQueueStatus(trashq)
390 if rtr.limiter != nil {
391 st.RequestsCurrent = rtr.limiter.Current()
392 st.RequestsMax = rtr.limiter.Max()
396 // return a WorkQueueStatus for the given queue. If q is nil (which
397 // should never happen except in test suites), return a zero status
398 // value instead of crashing.
399 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
401 // This should only happen during tests.
402 return WorkQueueStatus{}
407 // DeleteHandler processes DELETE requests.
409 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
410 // from all connected volumes.
412 // Only the Data Manager, or an Arvados admin with scope "all", are
413 // allowed to issue DELETE requests. If a DELETE request is not
414 // authenticated or is issued by a non-admin user, the server returns
415 // a PermissionError.
417 // Upon receiving a valid request from an authorized user,
418 // DeleteHandler deletes all copies of the specified block on local
423 // If the requested blocks was not found on any volume, the response
424 // code is HTTP 404 Not Found.
426 // Otherwise, the response code is 200 OK, with a response body
427 // consisting of the JSON message
429 // {"copies_deleted":d,"copies_failed":f}
431 // where d and f are integers representing the number of blocks that
432 // were successfully and unsuccessfully deleted.
434 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
435 hash := mux.Vars(req)["hash"]
437 // Confirm that this user is an admin and has a token with unlimited scope.
438 var tok = GetAPIToken(req)
439 if tok == "" || !CanDelete(tok) {
440 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
444 if !theConfig.EnableDelete {
445 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
449 // Delete copies of this block from all available volumes.
450 // Report how many blocks were successfully deleted, and how
451 // many were found on writable volumes but not deleted.
453 Deleted int `json:"copies_deleted"`
454 Failed int `json:"copies_failed"`
456 for _, vol := range KeepVM.AllWritable() {
457 if err := vol.Trash(hash); err == nil {
459 } else if os.IsNotExist(err) {
463 log.Println("DeleteHandler:", err)
469 if result.Deleted == 0 && result.Failed == 0 {
470 st = http.StatusNotFound
477 if st == http.StatusOK {
478 if body, err := json.Marshal(result); err == nil {
481 log.Printf("json.Marshal: %s (result = %v)", err, result)
482 http.Error(resp, err.Error(), 500)
487 /* PullHandler processes "PUT /pull" requests for the data manager.
488 The request body is a JSON message containing a list of pull
489 requests in the following format:
493 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
495 "keep0.qr1hi.arvadosapi.com:25107",
496 "keep1.qr1hi.arvadosapi.com:25108"
500 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
510 Each pull request in the list consists of a block locator string
511 and an ordered list of servers. Keepstore should try to fetch the
512 block from each server in turn.
514 If the request has not been sent by the Data Manager, return 401
517 If the JSON unmarshalling fails, return 400 Bad Request.
520 // PullRequest consists of a block locator and an ordered list of servers
521 type PullRequest struct {
522 Locator string `json:"locator"`
523 Servers []string `json:"servers"`
525 // Destination mount, or "" for "anywhere"
526 MountUUID string `json:"mount_uuid"`
529 // PullHandler processes "PUT /pull" requests for the data manager.
530 func PullHandler(resp http.ResponseWriter, req *http.Request) {
531 // Reject unauthorized requests.
532 if !IsSystemAuth(GetAPIToken(req)) {
533 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
537 // Parse the request body.
539 r := json.NewDecoder(req.Body)
540 if err := r.Decode(&pr); err != nil {
541 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
545 // We have a properly formatted pull list sent from the data
546 // manager. Report success and send the list to the pull list
547 // manager for further handling.
548 resp.WriteHeader(http.StatusOK)
550 fmt.Sprintf("Received %d pull requests\n", len(pr))))
553 for _, p := range pr {
556 pullq.ReplaceQueue(plist)
559 // TrashRequest consists of a block locator and its Mtime
560 type TrashRequest struct {
561 Locator string `json:"locator"`
562 BlockMtime int64 `json:"block_mtime"`
564 // Target mount, or "" for "everywhere"
565 MountUUID string `json:"mount_uuid"`
568 // TrashHandler processes /trash requests.
569 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
570 // Reject unauthorized requests.
571 if !IsSystemAuth(GetAPIToken(req)) {
572 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
576 // Parse the request body.
577 var trash []TrashRequest
578 r := json.NewDecoder(req.Body)
579 if err := r.Decode(&trash); err != nil {
580 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
584 // We have a properly formatted trash list sent from the data
585 // manager. Report success and send the list to the trash work
586 // queue for further handling.
587 resp.WriteHeader(http.StatusOK)
589 fmt.Sprintf("Received %d trash requests\n", len(trash))))
592 for _, t := range trash {
595 trashq.ReplaceQueue(tlist)
598 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
599 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
600 // Reject unauthorized requests.
601 if !IsSystemAuth(GetAPIToken(req)) {
602 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
606 hash := mux.Vars(req)["hash"]
608 if len(KeepVM.AllWritable()) == 0 {
609 http.Error(resp, "No writable volumes", http.StatusNotFound)
613 var untrashedOn, failedOn []string
615 for _, vol := range KeepVM.AllWritable() {
616 err := vol.Untrash(hash)
618 if os.IsNotExist(err) {
620 } else if err != nil {
621 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
622 failedOn = append(failedOn, vol.String())
624 log.Printf("Untrashed %v on volume %v", hash, vol.String())
625 untrashedOn = append(untrashedOn, vol.String())
629 if numNotFound == len(KeepVM.AllWritable()) {
630 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
634 if len(failedOn) == len(KeepVM.AllWritable()) {
635 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
637 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
638 if len(failedOn) > 0 {
639 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
641 resp.Write([]byte(respBody))
645 // GetBlock and PutBlock implement lower-level code for handling
646 // blocks by rooting through volumes connected to the local machine.
647 // Once the handler has determined that system policy permits the
648 // request, it calls these methods to perform the actual operation.
650 // TODO(twp): this code would probably be better located in the
651 // VolumeManager interface. As an abstraction, the VolumeManager
652 // should be the only part of the code that cares about which volume a
653 // block is stored on, so it should be responsible for figuring out
654 // which volume to check for fetching blocks, storing blocks, etc.
656 // GetBlock fetches the block identified by "hash" into the provided
657 // buf, and returns the data size.
659 // If the block cannot be found on any volume, returns NotFoundError.
661 // If the block found does not have the correct MD5 hash, returns
664 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
665 // Attempt to read the requested hash from a keep volume.
666 errorToCaller := NotFoundError
668 for _, vol := range KeepVM.AllReadable() {
669 size, err := vol.Get(ctx, hash, buf)
672 return 0, ErrClientDisconnect
676 // IsNotExist is an expected error and may be
677 // ignored. All other errors are logged. In
678 // any case we continue trying to read other
679 // volumes. If all volumes report IsNotExist,
680 // we return a NotFoundError.
681 if !os.IsNotExist(err) {
682 log.Printf("%s: Get(%s): %s", vol, hash, err)
684 // If some volume returns a transient error, return it to the caller
685 // instead of "Not found" so it can retry.
686 if err == VolumeBusyError {
687 errorToCaller = err.(*KeepError)
691 // Check the file checksum.
693 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
694 if filehash != hash {
695 // TODO: Try harder to tell a sysadmin about
697 log.Printf("%s: checksum mismatch for request %s (actual %s)",
699 errorToCaller = DiskHashError
702 if errorToCaller == DiskHashError {
703 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
708 return 0, errorToCaller
711 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
713 // PutBlock(ctx, block, hash)
714 // Stores the BLOCK (identified by the content id HASH) in Keep.
716 // The MD5 checksum of the block must be identical to the content id HASH.
717 // If not, an error is returned.
719 // PutBlock stores the BLOCK on the first Keep volume with free space.
720 // A failure code is returned to the user only if all volumes fail.
722 // On success, PutBlock returns nil.
723 // On failure, it returns a KeepError with one of the following codes:
726 // A different block with the same hash already exists on this
729 // The MD5 hash of the BLOCK does not match the argument HASH.
731 // There was not enough space left in any Keep volume to store
734 // The object could not be stored for some other reason (e.g.
735 // all writes failed). The text of the error message should
736 // provide as much detail as possible.
738 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
739 // Check that BLOCK's checksum matches HASH.
740 blockhash := fmt.Sprintf("%x", md5.Sum(block))
741 if blockhash != hash {
742 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
743 return 0, RequestHashError
746 // If we already have this data, it's intact on disk, and we
747 // can update its timestamp, return success. If we have
748 // different data with the same hash, return failure.
749 if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
751 } else if ctx.Err() != nil {
752 return 0, ErrClientDisconnect
755 // Choose a Keep volume to write to.
756 // If this volume fails, try all of the volumes in order.
757 if vol := KeepVM.NextWritable(); vol != nil {
758 if err := vol.Put(ctx, hash, block); err == nil {
759 return vol.Replication(), nil // success!
761 if ctx.Err() != nil {
762 return 0, ErrClientDisconnect
766 writables := KeepVM.AllWritable()
767 if len(writables) == 0 {
768 log.Print("No writable volumes.")
773 for _, vol := range writables {
774 err := vol.Put(ctx, hash, block)
775 if ctx.Err() != nil {
776 return 0, ErrClientDisconnect
779 return vol.Replication(), nil // success!
781 if err != FullError {
782 // The volume is not full but the
783 // write did not succeed. Report the
784 // error and continue trying.
786 log.Printf("%s: Write(%s): %s", vol, hash, err)
791 log.Print("All volumes are full.")
794 // Already logged the non-full errors.
795 return 0, GenericError
798 // CompareAndTouch returns the current replication level if one of the
799 // volumes already has the given content and it successfully updates
800 // the relevant block's modification time in order to protect it from
801 // premature garbage collection. Otherwise, it returns a non-nil
803 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
804 var bestErr error = NotFoundError
805 for _, vol := range KeepVM.AllWritable() {
806 err := vol.Compare(ctx, hash, buf)
807 if ctx.Err() != nil {
809 } else if err == CollisionError {
810 // Stop if we have a block with same hash but
811 // different content. (It will be impossible
812 // to tell which one is wanted if we have
813 // both, so there's no point writing it even
814 // on a different volume.)
815 log.Printf("%s: Compare(%s): %s", vol, hash, err)
817 } else if os.IsNotExist(err) {
818 // Block does not exist. This is the only
819 // "normal" error: we don't log anything.
821 } else if err != nil {
822 // Couldn't open file, data is corrupt on
823 // disk, etc.: log this abnormal condition,
824 // and try the next volume.
825 log.Printf("%s: Compare(%s): %s", vol, hash, err)
828 if err := vol.Touch(hash); err != nil {
829 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
833 // Compare and Touch both worked --> done.
834 return vol.Replication(), nil
839 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
841 // IsValidLocator returns true if the specified string is a valid Keep locator.
842 // When Keep is extended to support hash types other than MD5,
843 // this should be updated to cover those as well.
845 func IsValidLocator(loc string) bool {
846 return validLocatorRe.MatchString(loc)
849 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
851 // GetAPIToken returns the OAuth2 token from the Authorization
852 // header of a HTTP request, or an empty string if no matching
854 func GetAPIToken(req *http.Request) string {
855 if auth, ok := req.Header["Authorization"]; ok {
856 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
863 // IsExpired returns true if the given Unix timestamp (expressed as a
864 // hexadecimal string) is in the past, or if timestampHex cannot be
865 // parsed as a hexadecimal string.
866 func IsExpired(timestampHex string) bool {
867 ts, err := strconv.ParseInt(timestampHex, 16, 0)
869 log.Printf("IsExpired: %s", err)
872 return time.Unix(ts, 0).Before(time.Now())
875 // CanDelete returns true if the user identified by apiToken is
876 // allowed to delete blocks.
877 func CanDelete(apiToken string) bool {
881 // Blocks may be deleted only when Keep has been configured with a
883 if IsSystemAuth(apiToken) {
886 // TODO(twp): look up apiToken with the API server
887 // return true if is_admin is true and if the token
888 // has unlimited scope
892 // IsSystemAuth returns true if the given token is allowed to perform
893 // system level actions like deleting data.
894 func IsSystemAuth(token string) bool {
895 return token != "" && token == theConfig.systemAuthToken