1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
23 "git.curoverse.com/arvados.git/sdk/go/arvados"
24 "git.curoverse.com/arvados.git/sdk/go/health"
25 "git.curoverse.com/arvados.git/sdk/go/httpserver"
26 "github.com/gorilla/mux"
27 "github.com/prometheus/client_golang/prometheus"
32 limiter httpserver.RequestCounter
33 cluster *arvados.Cluster
34 remoteProxy remoteProxy
38 // MakeRESTRouter returns a new router that forwards all Keep requests
39 // to the appropriate handlers.
40 func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
42 Router: mux.NewRouter(),
44 metrics: &nodeMetrics{reg: reg},
48 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
50 `/{hash:[0-9a-f]{32}}+{hints}`,
51 rtr.handleGET).Methods("GET", "HEAD")
53 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
54 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
55 // List all blocks stored here. Privileged client only.
56 rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
57 // List blocks stored here whose hash has the given prefix.
58 // Privileged client only.
59 rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
61 // Internals/debugging info (runtime.MemStats)
62 rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
64 // List volumes: path, device number, bytes used/avail.
65 rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
67 // List mounts: UUID, readonly, tier, device ID, ...
68 rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
69 rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
70 rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
72 // Replace the current pull queue.
73 rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
75 // Replace the current trash queue.
76 rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
78 // Untrash moves blocks from trash back into store
79 rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
81 rtr.Handle("/_health/{check}", &health.Handler{
82 Token: theConfig.ManagementToken,
86 // Any request which does not match any of these routes gets
88 rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
90 rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
91 rtr.metrics.setupBufferPoolMetrics(bufs)
92 rtr.metrics.setupWorkQueueMetrics(pullq, "pull")
93 rtr.metrics.setupWorkQueueMetrics(trashq, "trash")
94 rtr.metrics.setupRequestMetrics(rtr.limiter)
96 instrumented := httpserver.Instrument(rtr.metrics.reg, nil,
97 httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
98 return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
101 // BadRequestHandler is a HandleFunc to address bad requests.
102 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
103 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
106 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
107 ctx, cancel := contextForResponse(context.TODO(), resp)
110 locator := req.URL.Path[1:]
111 if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
112 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster)
116 if theConfig.RequireSignatures {
117 locator := req.URL.Path[1:] // strip leading slash
118 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
119 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
124 // TODO: Probe volumes to check whether the block _might_
125 // exist. Some volumes/types could support a quick existence
126 // check without causing other operations to suffer. If all
127 // volumes support that, and assure us the block definitely
128 // isn't here, we can return 404 now instead of waiting for a
131 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
133 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
138 size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
140 code := http.StatusInternalServerError
141 if err, ok := err.(*KeepError); ok {
144 http.Error(resp, err.Error(), code)
148 resp.Header().Set("Content-Length", strconv.Itoa(size))
149 resp.Header().Set("Content-Type", "application/octet-stream")
150 resp.Write(buf[:size])
153 // Return a new context that gets cancelled by resp's CloseNotifier.
154 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
155 ctx, cancel := context.WithCancel(parent)
156 if cn, ok := resp.(http.CloseNotifier); ok {
157 go func(c <-chan bool) {
160 theConfig.debugLogf("cancel context")
169 // Get a buffer from the pool -- but give up and return a non-nil
170 // error if ctx ends before we get a buffer.
171 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
172 bufReady := make(chan []byte)
174 bufReady <- bufs.Get(bufSize)
177 case buf := <-bufReady:
181 // Even if closeNotifier happened first, we
182 // need to keep waiting for our buf so we can
183 // return it to the pool.
186 return nil, ErrClientDisconnect
190 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
191 ctx, cancel := contextForResponse(context.TODO(), resp)
194 hash := mux.Vars(req)["hash"]
196 // Detect as many error conditions as possible before reading
197 // the body: avoid transmitting data that will not end up
198 // being written anyway.
200 if req.ContentLength == -1 {
201 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
205 if req.ContentLength > BlockSize {
206 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
210 if len(KeepVM.AllWritable()) == 0 {
211 http.Error(resp, FullError.Error(), FullError.HTTPCode)
215 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
217 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
221 _, err = io.ReadFull(req.Body, buf)
223 http.Error(resp, err.Error(), 500)
228 replication, err := PutBlock(ctx, buf, hash)
232 code := http.StatusInternalServerError
233 if err, ok := err.(*KeepError); ok {
236 http.Error(resp, err.Error(), code)
240 // Success; add a size hint, sign the locator if possible, and
241 // return it to the client.
242 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
243 apiToken := GetAPIToken(req)
244 if theConfig.blobSigningKey != nil && apiToken != "" {
245 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
246 returnHash = SignLocator(returnHash, apiToken, expiry)
248 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
249 resp.Write([]byte(returnHash + "\n"))
252 // IndexHandler responds to "/index", "/index/{prefix}", and
253 // "/mounts/{uuid}/blocks" requests.
254 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
255 if !IsSystemAuth(GetAPIToken(req)) {
256 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
260 prefix := mux.Vars(req)["prefix"]
263 prefix = req.Form.Get("prefix")
266 uuid := mux.Vars(req)["uuid"]
270 vols = KeepVM.AllReadable()
271 } else if v := KeepVM.Lookup(uuid, false); v == nil {
272 http.Error(resp, "mount not found", http.StatusNotFound)
278 for _, v := range vols {
279 if err := v.IndexTo(prefix, resp); err != nil {
280 // The only errors returned by IndexTo are
281 // write errors returned by resp.Write(),
282 // which probably means the client has
283 // disconnected and this error will never be
284 // reported to the client -- but it will
285 // appear in our own error log.
286 http.Error(resp, err.Error(), http.StatusInternalServerError)
290 // An empty line at EOF is the only way the client can be
291 // assured the entire index was received.
292 resp.Write([]byte{'\n'})
295 // MountsHandler responds to "GET /mounts" requests.
296 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
297 err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
299 http.Error(resp, err.Error(), http.StatusInternalServerError)
304 type PoolStatus struct {
305 Alloc uint64 `json:"BytesAllocatedCumulative"`
306 Cap int `json:"BuffersMax"`
307 Len int `json:"BuffersInUse"`
310 type volumeStatusEnt struct {
312 Status *VolumeStatus `json:",omitempty"`
313 VolumeStats *ioStats `json:",omitempty"`
314 InternalStats interface{} `json:",omitempty"`
318 type NodeStatus struct {
319 Volumes []*volumeStatusEnt
320 BufferPool PoolStatus
321 PullQueue WorkQueueStatus
322 TrashQueue WorkQueueStatus
329 var stLock sync.Mutex
331 // DebugHandler addresses /debug.json requests.
332 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
333 type debugStats struct {
334 MemStats runtime.MemStats
337 runtime.ReadMemStats(&ds.MemStats)
338 err := json.NewEncoder(resp).Encode(&ds)
340 http.Error(resp, err.Error(), 500)
344 // StatusHandler addresses /status.json requests.
345 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
347 rtr.readNodeStatus(&st)
348 jstat, err := json.Marshal(&st)
353 log.Printf("json.Marshal: %s", err)
354 log.Printf("NodeStatus = %v", &st)
355 http.Error(resp, err.Error(), 500)
359 // populate the given NodeStatus struct with current values.
360 func (rtr *router) readNodeStatus(st *NodeStatus) {
362 vols := KeepVM.AllReadable()
363 if cap(st.Volumes) < len(vols) {
364 st.Volumes = make([]*volumeStatusEnt, len(vols))
366 st.Volumes = st.Volumes[:0]
367 for _, vol := range vols {
368 var internalStats interface{}
369 if vol, ok := vol.(InternalStatser); ok {
370 internalStats = vol.InternalStats()
372 st.Volumes = append(st.Volumes, &volumeStatusEnt{
374 Status: vol.Status(),
375 InternalStats: internalStats,
376 //VolumeStats: KeepVM.VolumeStats(vol),
379 st.BufferPool.Alloc = bufs.Alloc()
380 st.BufferPool.Cap = bufs.Cap()
381 st.BufferPool.Len = bufs.Len()
382 st.PullQueue = getWorkQueueStatus(pullq)
383 st.TrashQueue = getWorkQueueStatus(trashq)
384 if rtr.limiter != nil {
385 st.RequestsCurrent = rtr.limiter.Current()
386 st.RequestsMax = rtr.limiter.Max()
390 // return a WorkQueueStatus for the given queue. If q is nil (which
391 // should never happen except in test suites), return a zero status
392 // value instead of crashing.
393 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
395 // This should only happen during tests.
396 return WorkQueueStatus{}
401 // DeleteHandler processes DELETE requests.
403 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
404 // from all connected volumes.
406 // Only the Data Manager, or an Arvados admin with scope "all", are
407 // allowed to issue DELETE requests. If a DELETE request is not
408 // authenticated or is issued by a non-admin user, the server returns
409 // a PermissionError.
411 // Upon receiving a valid request from an authorized user,
412 // DeleteHandler deletes all copies of the specified block on local
417 // If the requested blocks was not found on any volume, the response
418 // code is HTTP 404 Not Found.
420 // Otherwise, the response code is 200 OK, with a response body
421 // consisting of the JSON message
423 // {"copies_deleted":d,"copies_failed":f}
425 // where d and f are integers representing the number of blocks that
426 // were successfully and unsuccessfully deleted.
428 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
429 hash := mux.Vars(req)["hash"]
431 // Confirm that this user is an admin and has a token with unlimited scope.
432 var tok = GetAPIToken(req)
433 if tok == "" || !CanDelete(tok) {
434 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
438 if !theConfig.EnableDelete {
439 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
443 // Delete copies of this block from all available volumes.
444 // Report how many blocks were successfully deleted, and how
445 // many were found on writable volumes but not deleted.
447 Deleted int `json:"copies_deleted"`
448 Failed int `json:"copies_failed"`
450 for _, vol := range KeepVM.AllWritable() {
451 if err := vol.Trash(hash); err == nil {
453 } else if os.IsNotExist(err) {
457 log.Println("DeleteHandler:", err)
463 if result.Deleted == 0 && result.Failed == 0 {
464 st = http.StatusNotFound
471 if st == http.StatusOK {
472 if body, err := json.Marshal(result); err == nil {
475 log.Printf("json.Marshal: %s (result = %v)", err, result)
476 http.Error(resp, err.Error(), 500)
481 /* PullHandler processes "PUT /pull" requests for the data manager.
482 The request body is a JSON message containing a list of pull
483 requests in the following format:
487 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
489 "keep0.qr1hi.arvadosapi.com:25107",
490 "keep1.qr1hi.arvadosapi.com:25108"
494 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
504 Each pull request in the list consists of a block locator string
505 and an ordered list of servers. Keepstore should try to fetch the
506 block from each server in turn.
508 If the request has not been sent by the Data Manager, return 401
511 If the JSON unmarshalling fails, return 400 Bad Request.
514 // PullRequest consists of a block locator and an ordered list of servers
515 type PullRequest struct {
516 Locator string `json:"locator"`
517 Servers []string `json:"servers"`
519 // Destination mount, or "" for "anywhere"
520 MountUUID string `json:"mount_uuid"`
523 // PullHandler processes "PUT /pull" requests for the data manager.
524 func PullHandler(resp http.ResponseWriter, req *http.Request) {
525 // Reject unauthorized requests.
526 if !IsSystemAuth(GetAPIToken(req)) {
527 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
531 // Parse the request body.
533 r := json.NewDecoder(req.Body)
534 if err := r.Decode(&pr); err != nil {
535 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
539 // We have a properly formatted pull list sent from the data
540 // manager. Report success and send the list to the pull list
541 // manager for further handling.
542 resp.WriteHeader(http.StatusOK)
544 fmt.Sprintf("Received %d pull requests\n", len(pr))))
547 for _, p := range pr {
550 pullq.ReplaceQueue(plist)
553 // TrashRequest consists of a block locator and its Mtime
554 type TrashRequest struct {
555 Locator string `json:"locator"`
556 BlockMtime int64 `json:"block_mtime"`
558 // Target mount, or "" for "everywhere"
559 MountUUID string `json:"mount_uuid"`
562 // TrashHandler processes /trash requests.
563 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
564 // Reject unauthorized requests.
565 if !IsSystemAuth(GetAPIToken(req)) {
566 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
570 // Parse the request body.
571 var trash []TrashRequest
572 r := json.NewDecoder(req.Body)
573 if err := r.Decode(&trash); err != nil {
574 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
578 // We have a properly formatted trash list sent from the data
579 // manager. Report success and send the list to the trash work
580 // queue for further handling.
581 resp.WriteHeader(http.StatusOK)
583 fmt.Sprintf("Received %d trash requests\n", len(trash))))
586 for _, t := range trash {
589 trashq.ReplaceQueue(tlist)
592 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
593 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
594 // Reject unauthorized requests.
595 if !IsSystemAuth(GetAPIToken(req)) {
596 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
600 hash := mux.Vars(req)["hash"]
602 if len(KeepVM.AllWritable()) == 0 {
603 http.Error(resp, "No writable volumes", http.StatusNotFound)
607 var untrashedOn, failedOn []string
609 for _, vol := range KeepVM.AllWritable() {
610 err := vol.Untrash(hash)
612 if os.IsNotExist(err) {
614 } else if err != nil {
615 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
616 failedOn = append(failedOn, vol.String())
618 log.Printf("Untrashed %v on volume %v", hash, vol.String())
619 untrashedOn = append(untrashedOn, vol.String())
623 if numNotFound == len(KeepVM.AllWritable()) {
624 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
628 if len(failedOn) == len(KeepVM.AllWritable()) {
629 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
631 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
632 if len(failedOn) > 0 {
633 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
635 resp.Write([]byte(respBody))
639 // GetBlock and PutBlock implement lower-level code for handling
640 // blocks by rooting through volumes connected to the local machine.
641 // Once the handler has determined that system policy permits the
642 // request, it calls these methods to perform the actual operation.
644 // TODO(twp): this code would probably be better located in the
645 // VolumeManager interface. As an abstraction, the VolumeManager
646 // should be the only part of the code that cares about which volume a
647 // block is stored on, so it should be responsible for figuring out
648 // which volume to check for fetching blocks, storing blocks, etc.
650 // GetBlock fetches the block identified by "hash" into the provided
651 // buf, and returns the data size.
653 // If the block cannot be found on any volume, returns NotFoundError.
655 // If the block found does not have the correct MD5 hash, returns
658 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
659 // Attempt to read the requested hash from a keep volume.
660 errorToCaller := NotFoundError
662 for _, vol := range KeepVM.AllReadable() {
663 size, err := vol.Get(ctx, hash, buf)
666 return 0, ErrClientDisconnect
670 // IsNotExist is an expected error and may be
671 // ignored. All other errors are logged. In
672 // any case we continue trying to read other
673 // volumes. If all volumes report IsNotExist,
674 // we return a NotFoundError.
675 if !os.IsNotExist(err) {
676 log.Printf("%s: Get(%s): %s", vol, hash, err)
678 // If some volume returns a transient error, return it to the caller
679 // instead of "Not found" so it can retry.
680 if err == VolumeBusyError {
681 errorToCaller = err.(*KeepError)
685 // Check the file checksum.
687 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
688 if filehash != hash {
689 // TODO: Try harder to tell a sysadmin about
691 log.Printf("%s: checksum mismatch for request %s (actual %s)",
693 errorToCaller = DiskHashError
696 if errorToCaller == DiskHashError {
697 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
702 return 0, errorToCaller
705 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
707 // PutBlock(ctx, block, hash)
708 // Stores the BLOCK (identified by the content id HASH) in Keep.
710 // The MD5 checksum of the block must be identical to the content id HASH.
711 // If not, an error is returned.
713 // PutBlock stores the BLOCK on the first Keep volume with free space.
714 // A failure code is returned to the user only if all volumes fail.
716 // On success, PutBlock returns nil.
717 // On failure, it returns a KeepError with one of the following codes:
720 // A different block with the same hash already exists on this
723 // The MD5 hash of the BLOCK does not match the argument HASH.
725 // There was not enough space left in any Keep volume to store
728 // The object could not be stored for some other reason (e.g.
729 // all writes failed). The text of the error message should
730 // provide as much detail as possible.
732 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
733 // Check that BLOCK's checksum matches HASH.
734 blockhash := fmt.Sprintf("%x", md5.Sum(block))
735 if blockhash != hash {
736 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
737 return 0, RequestHashError
740 // If we already have this data, it's intact on disk, and we
741 // can update its timestamp, return success. If we have
742 // different data with the same hash, return failure.
743 if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
745 } else if ctx.Err() != nil {
746 return 0, ErrClientDisconnect
749 // Choose a Keep volume to write to.
750 // If this volume fails, try all of the volumes in order.
751 if vol := KeepVM.NextWritable(); vol != nil {
752 if err := vol.Put(ctx, hash, block); err == nil {
753 return vol.Replication(), nil // success!
755 if ctx.Err() != nil {
756 return 0, ErrClientDisconnect
760 writables := KeepVM.AllWritable()
761 if len(writables) == 0 {
762 log.Print("No writable volumes.")
767 for _, vol := range writables {
768 err := vol.Put(ctx, hash, block)
769 if ctx.Err() != nil {
770 return 0, ErrClientDisconnect
773 return vol.Replication(), nil // success!
775 if err != FullError {
776 // The volume is not full but the
777 // write did not succeed. Report the
778 // error and continue trying.
780 log.Printf("%s: Write(%s): %s", vol, hash, err)
785 log.Print("All volumes are full.")
788 // Already logged the non-full errors.
789 return 0, GenericError
792 // CompareAndTouch returns the current replication level if one of the
793 // volumes already has the given content and it successfully updates
794 // the relevant block's modification time in order to protect it from
795 // premature garbage collection. Otherwise, it returns a non-nil
797 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
798 var bestErr error = NotFoundError
799 for _, vol := range KeepVM.AllWritable() {
800 err := vol.Compare(ctx, hash, buf)
801 if ctx.Err() != nil {
803 } else if err == CollisionError {
804 // Stop if we have a block with same hash but
805 // different content. (It will be impossible
806 // to tell which one is wanted if we have
807 // both, so there's no point writing it even
808 // on a different volume.)
809 log.Printf("%s: Compare(%s): %s", vol, hash, err)
811 } else if os.IsNotExist(err) {
812 // Block does not exist. This is the only
813 // "normal" error: we don't log anything.
815 } else if err != nil {
816 // Couldn't open file, data is corrupt on
817 // disk, etc.: log this abnormal condition,
818 // and try the next volume.
819 log.Printf("%s: Compare(%s): %s", vol, hash, err)
822 if err := vol.Touch(hash); err != nil {
823 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
827 // Compare and Touch both worked --> done.
828 return vol.Replication(), nil
833 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
835 // IsValidLocator returns true if the specified string is a valid Keep locator.
836 // When Keep is extended to support hash types other than MD5,
837 // this should be updated to cover those as well.
839 func IsValidLocator(loc string) bool {
840 return validLocatorRe.MatchString(loc)
843 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
845 // GetAPIToken returns the OAuth2 token from the Authorization
846 // header of a HTTP request, or an empty string if no matching
848 func GetAPIToken(req *http.Request) string {
849 if auth, ok := req.Header["Authorization"]; ok {
850 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
857 // IsExpired returns true if the given Unix timestamp (expressed as a
858 // hexadecimal string) is in the past, or if timestampHex cannot be
859 // parsed as a hexadecimal string.
860 func IsExpired(timestampHex string) bool {
861 ts, err := strconv.ParseInt(timestampHex, 16, 0)
863 log.Printf("IsExpired: %s", err)
866 return time.Unix(ts, 0).Before(time.Now())
869 // CanDelete returns true if the user identified by apiToken is
870 // allowed to delete blocks.
871 func CanDelete(apiToken string) bool {
875 // Blocks may be deleted only when Keep has been configured with a
877 if IsSystemAuth(apiToken) {
880 // TODO(twp): look up apiToken with the API server
881 // return true if is_admin is true and if the token
882 // has unlimited scope
886 // IsSystemAuth returns true if the given token is allowed to perform
887 // system level actions like deleting data.
888 func IsSystemAuth(token string) bool {
889 return token != "" && token == theConfig.systemAuthToken