1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
7 // REST handlers for Keep are implemented here.
9 // GetBlockHandler (GET /locator)
10 // PutBlockHandler (PUT /locator)
11 // IndexHandler (GET /index, GET /index/prefix)
12 // StatusHandler (GET /status.json)
30 "github.com/gorilla/mux"
32 "git.curoverse.com/arvados.git/sdk/go/httpserver"
33 log "github.com/Sirupsen/logrus"
38 limiter httpserver.RequestCounter
41 // MakeRESTRouter returns a new router that forwards all Keep requests
42 // to the appropriate handlers.
43 func MakeRESTRouter() *router {
44 rest := mux.NewRouter()
45 rtr := &router{Router: rest}
48 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
50 `/{hash:[0-9a-f]{32}}+{hints}`,
51 GetBlockHandler).Methods("GET", "HEAD")
53 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
54 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
55 // List all blocks stored here. Privileged client only.
56 rest.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
57 // List blocks stored here whose hash has the given prefix.
58 // Privileged client only.
59 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
61 // Internals/debugging info (runtime.MemStats)
62 rest.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
64 // List volumes: path, device number, bytes used/avail.
65 rest.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
67 // List mounts: UUID, readonly, tier, device ID, ...
68 rest.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
69 rest.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
70 rest.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
72 // Replace the current pull queue.
73 rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
75 // Replace the current trash queue.
76 rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
78 // Untrash moves blocks from trash back into store
79 rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
82 rest.HandleFunc(`/_health/ping`, HealthCheckPingHandler).Methods("GET")
84 // Any request which does not match any of these routes gets
86 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
91 // BadRequestHandler is a HandleFunc to address bad requests.
92 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
93 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
96 // GetBlockHandler is a HandleFunc to address Get block requests.
97 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
98 ctx, cancel := contextForResponse(context.TODO(), resp)
101 if theConfig.RequireSignatures {
102 locator := req.URL.Path[1:] // strip leading slash
103 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
104 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
109 // TODO: Probe volumes to check whether the block _might_
110 // exist. Some volumes/types could support a quick existence
111 // check without causing other operations to suffer. If all
112 // volumes support that, and assure us the block definitely
113 // isn't here, we can return 404 now instead of waiting for a
116 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
118 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
123 size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
125 code := http.StatusInternalServerError
126 if err, ok := err.(*KeepError); ok {
129 http.Error(resp, err.Error(), code)
133 resp.Header().Set("Content-Length", strconv.Itoa(size))
134 resp.Header().Set("Content-Type", "application/octet-stream")
135 resp.Write(buf[:size])
138 // Return a new context that gets cancelled by resp's CloseNotifier.
139 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
140 ctx, cancel := context.WithCancel(parent)
141 if cn, ok := resp.(http.CloseNotifier); ok {
142 go func(c <-chan bool) {
145 theConfig.debugLogf("cancel context")
154 // Get a buffer from the pool -- but give up and return a non-nil
155 // error if ctx ends before we get a buffer.
156 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
157 bufReady := make(chan []byte)
159 bufReady <- bufs.Get(bufSize)
162 case buf := <-bufReady:
166 // Even if closeNotifier happened first, we
167 // need to keep waiting for our buf so we can
168 // return it to the pool.
171 return nil, ErrClientDisconnect
175 // PutBlockHandler is a HandleFunc to address Put block requests.
176 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
177 ctx, cancel := contextForResponse(context.TODO(), resp)
180 hash := mux.Vars(req)["hash"]
182 // Detect as many error conditions as possible before reading
183 // the body: avoid transmitting data that will not end up
184 // being written anyway.
186 if req.ContentLength == -1 {
187 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
191 if req.ContentLength > BlockSize {
192 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
196 if len(KeepVM.AllWritable()) == 0 {
197 http.Error(resp, FullError.Error(), FullError.HTTPCode)
201 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
203 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
207 _, err = io.ReadFull(req.Body, buf)
209 http.Error(resp, err.Error(), 500)
214 replication, err := PutBlock(ctx, buf, hash)
218 code := http.StatusInternalServerError
219 if err, ok := err.(*KeepError); ok {
222 http.Error(resp, err.Error(), code)
226 // Success; add a size hint, sign the locator if possible, and
227 // return it to the client.
228 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
229 apiToken := GetAPIToken(req)
230 if theConfig.blobSigningKey != nil && apiToken != "" {
231 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
232 returnHash = SignLocator(returnHash, apiToken, expiry)
234 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
235 resp.Write([]byte(returnHash + "\n"))
238 // IndexHandler responds to "/index", "/index/{prefix}", and
239 // "/mounts/{uuid}/blocks" requests.
240 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
241 if !IsSystemAuth(GetAPIToken(req)) {
242 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
246 prefix := mux.Vars(req)["prefix"]
249 prefix = req.Form.Get("prefix")
252 uuid := mux.Vars(req)["uuid"]
256 vols = KeepVM.AllReadable()
257 } else if v := KeepVM.Lookup(uuid, false); v == nil {
258 http.Error(resp, "mount not found", http.StatusNotFound)
264 for _, v := range vols {
265 if err := v.IndexTo(prefix, resp); err != nil {
266 // The only errors returned by IndexTo are
267 // write errors returned by resp.Write(),
268 // which probably means the client has
269 // disconnected and this error will never be
270 // reported to the client -- but it will
271 // appear in our own error log.
272 http.Error(resp, err.Error(), http.StatusInternalServerError)
276 // An empty line at EOF is the only way the client can be
277 // assured the entire index was received.
278 resp.Write([]byte{'\n'})
281 // MountsHandler responds to "GET /mounts" requests.
282 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
283 err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
285 http.Error(resp, err.Error(), http.StatusInternalServerError)
290 type PoolStatus struct {
291 Alloc uint64 `json:"BytesAllocated"`
292 Cap int `json:"BuffersMax"`
293 Len int `json:"BuffersInUse"`
296 type volumeStatusEnt struct {
298 Status *VolumeStatus `json:",omitempty"`
299 VolumeStats *ioStats `json:",omitempty"`
300 InternalStats interface{} `json:",omitempty"`
304 type NodeStatus struct {
305 Volumes []*volumeStatusEnt
306 BufferPool PoolStatus
307 PullQueue WorkQueueStatus
308 TrashQueue WorkQueueStatus
314 var stLock sync.Mutex
316 // DebugHandler addresses /debug.json requests.
317 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
318 type debugStats struct {
319 MemStats runtime.MemStats
322 runtime.ReadMemStats(&ds.MemStats)
323 err := json.NewEncoder(resp).Encode(&ds)
325 http.Error(resp, err.Error(), 500)
329 // StatusHandler addresses /status.json requests.
330 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
332 rtr.readNodeStatus(&st)
333 jstat, err := json.Marshal(&st)
338 log.Printf("json.Marshal: %s", err)
339 log.Printf("NodeStatus = %v", &st)
340 http.Error(resp, err.Error(), 500)
344 // populate the given NodeStatus struct with current values.
345 func (rtr *router) readNodeStatus(st *NodeStatus) {
346 vols := KeepVM.AllReadable()
347 if cap(st.Volumes) < len(vols) {
348 st.Volumes = make([]*volumeStatusEnt, len(vols))
350 st.Volumes = st.Volumes[:0]
351 for _, vol := range vols {
352 var internalStats interface{}
353 if vol, ok := vol.(InternalStatser); ok {
354 internalStats = vol.InternalStats()
356 st.Volumes = append(st.Volumes, &volumeStatusEnt{
358 Status: vol.Status(),
359 InternalStats: internalStats,
360 //VolumeStats: KeepVM.VolumeStats(vol),
363 st.BufferPool.Alloc = bufs.Alloc()
364 st.BufferPool.Cap = bufs.Cap()
365 st.BufferPool.Len = bufs.Len()
366 st.PullQueue = getWorkQueueStatus(pullq)
367 st.TrashQueue = getWorkQueueStatus(trashq)
368 if rtr.limiter != nil {
369 st.RequestsCurrent = rtr.limiter.Current()
370 st.RequestsMax = rtr.limiter.Max()
374 // return a WorkQueueStatus for the given queue. If q is nil (which
375 // should never happen except in test suites), return a zero status
376 // value instead of crashing.
377 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
379 // This should only happen during tests.
380 return WorkQueueStatus{}
385 // DeleteHandler processes DELETE requests.
387 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
388 // from all connected volumes.
390 // Only the Data Manager, or an Arvados admin with scope "all", are
391 // allowed to issue DELETE requests. If a DELETE request is not
392 // authenticated or is issued by a non-admin user, the server returns
393 // a PermissionError.
395 // Upon receiving a valid request from an authorized user,
396 // DeleteHandler deletes all copies of the specified block on local
401 // If the requested blocks was not found on any volume, the response
402 // code is HTTP 404 Not Found.
404 // Otherwise, the response code is 200 OK, with a response body
405 // consisting of the JSON message
407 // {"copies_deleted":d,"copies_failed":f}
409 // where d and f are integers representing the number of blocks that
410 // were successfully and unsuccessfully deleted.
412 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
413 hash := mux.Vars(req)["hash"]
415 // Confirm that this user is an admin and has a token with unlimited scope.
416 var tok = GetAPIToken(req)
417 if tok == "" || !CanDelete(tok) {
418 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
422 if !theConfig.EnableDelete {
423 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
427 // Delete copies of this block from all available volumes.
428 // Report how many blocks were successfully deleted, and how
429 // many were found on writable volumes but not deleted.
431 Deleted int `json:"copies_deleted"`
432 Failed int `json:"copies_failed"`
434 for _, vol := range KeepVM.AllWritable() {
435 if err := vol.Trash(hash); err == nil {
437 } else if os.IsNotExist(err) {
441 log.Println("DeleteHandler:", err)
447 if result.Deleted == 0 && result.Failed == 0 {
448 st = http.StatusNotFound
455 if st == http.StatusOK {
456 if body, err := json.Marshal(result); err == nil {
459 log.Printf("json.Marshal: %s (result = %v)", err, result)
460 http.Error(resp, err.Error(), 500)
465 /* PullHandler processes "PUT /pull" requests for the data manager.
466 The request body is a JSON message containing a list of pull
467 requests in the following format:
471 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
473 "keep0.qr1hi.arvadosapi.com:25107",
474 "keep1.qr1hi.arvadosapi.com:25108"
478 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
488 Each pull request in the list consists of a block locator string
489 and an ordered list of servers. Keepstore should try to fetch the
490 block from each server in turn.
492 If the request has not been sent by the Data Manager, return 401
495 If the JSON unmarshalling fails, return 400 Bad Request.
498 // PullRequest consists of a block locator and an ordered list of servers
499 type PullRequest struct {
500 Locator string `json:"locator"`
501 Servers []string `json:"servers"`
503 // Destination mount, or "" for "anywhere"
507 // PullHandler processes "PUT /pull" requests for the data manager.
508 func PullHandler(resp http.ResponseWriter, req *http.Request) {
509 // Reject unauthorized requests.
510 if !IsSystemAuth(GetAPIToken(req)) {
511 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
515 // Parse the request body.
517 r := json.NewDecoder(req.Body)
518 if err := r.Decode(&pr); err != nil {
519 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
523 // We have a properly formatted pull list sent from the data
524 // manager. Report success and send the list to the pull list
525 // manager for further handling.
526 resp.WriteHeader(http.StatusOK)
528 fmt.Sprintf("Received %d pull requests\n", len(pr))))
531 for _, p := range pr {
534 pullq.ReplaceQueue(plist)
537 // TrashRequest consists of a block locator and it's Mtime
538 type TrashRequest struct {
539 Locator string `json:"locator"`
540 BlockMtime int64 `json:"block_mtime"`
542 // Target mount, or "" for "everywhere"
546 // TrashHandler processes /trash requests.
547 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
548 // Reject unauthorized requests.
549 if !IsSystemAuth(GetAPIToken(req)) {
550 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
554 // Parse the request body.
555 var trash []TrashRequest
556 r := json.NewDecoder(req.Body)
557 if err := r.Decode(&trash); err != nil {
558 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
562 // We have a properly formatted trash list sent from the data
563 // manager. Report success and send the list to the trash work
564 // queue for further handling.
565 resp.WriteHeader(http.StatusOK)
567 fmt.Sprintf("Received %d trash requests\n", len(trash))))
570 for _, t := range trash {
573 trashq.ReplaceQueue(tlist)
576 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
577 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
578 // Reject unauthorized requests.
579 if !IsSystemAuth(GetAPIToken(req)) {
580 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
584 hash := mux.Vars(req)["hash"]
586 if len(KeepVM.AllWritable()) == 0 {
587 http.Error(resp, "No writable volumes", http.StatusNotFound)
591 var untrashedOn, failedOn []string
593 for _, vol := range KeepVM.AllWritable() {
594 err := vol.Untrash(hash)
596 if os.IsNotExist(err) {
598 } else if err != nil {
599 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
600 failedOn = append(failedOn, vol.String())
602 log.Printf("Untrashed %v on volume %v", hash, vol.String())
603 untrashedOn = append(untrashedOn, vol.String())
607 if numNotFound == len(KeepVM.AllWritable()) {
608 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
612 if len(failedOn) == len(KeepVM.AllWritable()) {
613 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
615 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
616 if len(failedOn) > 0 {
617 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
619 resp.Write([]byte(respBody))
623 // HealthCheckPingHandler processes "GET /_health/ping" requests
624 func HealthCheckPingHandler(resp http.ResponseWriter, req *http.Request) {
625 fn := func() interface{} {
626 return map[string]string{"health": "OK"}
629 healthCheckDo(resp, req, fn)
632 // Any health check handlers can pass this "func" which returns json to healthCheckDo
633 type healthCheckFunc func() interface{}
635 func healthCheckDo(resp http.ResponseWriter, req *http.Request, fn healthCheckFunc) {
636 msg, code := healthCheckAuth(resp, req)
638 http.Error(resp, msg, code)
642 ok, err := json.Marshal(fn())
644 http.Error(resp, err.Error(), 500)
651 func healthCheckAuth(resp http.ResponseWriter, req *http.Request) (string, int) {
652 if theConfig.ManagementToken == "" {
653 return "disabled", http.StatusNotFound
654 } else if h := req.Header.Get("Authorization"); h == "" {
655 return "authorization required", http.StatusUnauthorized
656 } else if h != "Bearer "+theConfig.ManagementToken {
657 return "authorization error", http.StatusForbidden
662 // GetBlock and PutBlock implement lower-level code for handling
663 // blocks by rooting through volumes connected to the local machine.
664 // Once the handler has determined that system policy permits the
665 // request, it calls these methods to perform the actual operation.
667 // TODO(twp): this code would probably be better located in the
668 // VolumeManager interface. As an abstraction, the VolumeManager
669 // should be the only part of the code that cares about which volume a
670 // block is stored on, so it should be responsible for figuring out
671 // which volume to check for fetching blocks, storing blocks, etc.
673 // GetBlock fetches the block identified by "hash" into the provided
674 // buf, and returns the data size.
676 // If the block cannot be found on any volume, returns NotFoundError.
678 // If the block found does not have the correct MD5 hash, returns
681 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
682 // Attempt to read the requested hash from a keep volume.
683 errorToCaller := NotFoundError
685 for _, vol := range KeepVM.AllReadable() {
686 size, err := vol.Get(ctx, hash, buf)
689 return 0, ErrClientDisconnect
693 // IsNotExist is an expected error and may be
694 // ignored. All other errors are logged. In
695 // any case we continue trying to read other
696 // volumes. If all volumes report IsNotExist,
697 // we return a NotFoundError.
698 if !os.IsNotExist(err) {
699 log.Printf("%s: Get(%s): %s", vol, hash, err)
703 // Check the file checksum.
705 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
706 if filehash != hash {
707 // TODO: Try harder to tell a sysadmin about
709 log.Printf("%s: checksum mismatch for request %s (actual %s)",
711 errorToCaller = DiskHashError
714 if errorToCaller == DiskHashError {
715 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
720 return 0, errorToCaller
723 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
725 // PutBlock(ctx, block, hash)
726 // Stores the BLOCK (identified by the content id HASH) in Keep.
728 // The MD5 checksum of the block must be identical to the content id HASH.
729 // If not, an error is returned.
731 // PutBlock stores the BLOCK on the first Keep volume with free space.
732 // A failure code is returned to the user only if all volumes fail.
734 // On success, PutBlock returns nil.
735 // On failure, it returns a KeepError with one of the following codes:
738 // A different block with the same hash already exists on this
741 // The MD5 hash of the BLOCK does not match the argument HASH.
743 // There was not enough space left in any Keep volume to store
746 // The object could not be stored for some other reason (e.g.
747 // all writes failed). The text of the error message should
748 // provide as much detail as possible.
750 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
751 // Check that BLOCK's checksum matches HASH.
752 blockhash := fmt.Sprintf("%x", md5.Sum(block))
753 if blockhash != hash {
754 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
755 return 0, RequestHashError
758 // If we already have this data, it's intact on disk, and we
759 // can update its timestamp, return success. If we have
760 // different data with the same hash, return failure.
761 if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
763 } else if ctx.Err() != nil {
764 return 0, ErrClientDisconnect
767 // Choose a Keep volume to write to.
768 // If this volume fails, try all of the volumes in order.
769 if vol := KeepVM.NextWritable(); vol != nil {
770 if err := vol.Put(ctx, hash, block); err == nil {
771 return vol.Replication(), nil // success!
773 if ctx.Err() != nil {
774 return 0, ErrClientDisconnect
778 writables := KeepVM.AllWritable()
779 if len(writables) == 0 {
780 log.Print("No writable volumes.")
785 for _, vol := range writables {
786 err := vol.Put(ctx, hash, block)
787 if ctx.Err() != nil {
788 return 0, ErrClientDisconnect
791 return vol.Replication(), nil // success!
793 if err != FullError {
794 // The volume is not full but the
795 // write did not succeed. Report the
796 // error and continue trying.
798 log.Printf("%s: Write(%s): %s", vol, hash, err)
803 log.Print("All volumes are full.")
806 // Already logged the non-full errors.
807 return 0, GenericError
810 // CompareAndTouch returns the current replication level if one of the
811 // volumes already has the given content and it successfully updates
812 // the relevant block's modification time in order to protect it from
813 // premature garbage collection. Otherwise, it returns a non-nil
815 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
816 var bestErr error = NotFoundError
817 for _, vol := range KeepVM.AllWritable() {
818 err := vol.Compare(ctx, hash, buf)
819 if ctx.Err() != nil {
821 } else if err == CollisionError {
822 // Stop if we have a block with same hash but
823 // different content. (It will be impossible
824 // to tell which one is wanted if we have
825 // both, so there's no point writing it even
826 // on a different volume.)
827 log.Printf("%s: Compare(%s): %s", vol, hash, err)
829 } else if os.IsNotExist(err) {
830 // Block does not exist. This is the only
831 // "normal" error: we don't log anything.
833 } else if err != nil {
834 // Couldn't open file, data is corrupt on
835 // disk, etc.: log this abnormal condition,
836 // and try the next volume.
837 log.Printf("%s: Compare(%s): %s", vol, hash, err)
840 if err := vol.Touch(hash); err != nil {
841 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
845 // Compare and Touch both worked --> done.
846 return vol.Replication(), nil
851 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
853 // IsValidLocator returns true if the specified string is a valid Keep locator.
854 // When Keep is extended to support hash types other than MD5,
855 // this should be updated to cover those as well.
857 func IsValidLocator(loc string) bool {
858 return validLocatorRe.MatchString(loc)
861 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
863 // GetAPIToken returns the OAuth2 token from the Authorization
864 // header of a HTTP request, or an empty string if no matching
866 func GetAPIToken(req *http.Request) string {
867 if auth, ok := req.Header["Authorization"]; ok {
868 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
875 // IsExpired returns true if the given Unix timestamp (expressed as a
876 // hexadecimal string) is in the past, or if timestampHex cannot be
877 // parsed as a hexadecimal string.
878 func IsExpired(timestampHex string) bool {
879 ts, err := strconv.ParseInt(timestampHex, 16, 0)
881 log.Printf("IsExpired: %s", err)
884 return time.Unix(ts, 0).Before(time.Now())
887 // CanDelete returns true if the user identified by apiToken is
888 // allowed to delete blocks.
889 func CanDelete(apiToken string) bool {
893 // Blocks may be deleted only when Keep has been configured with a
895 if IsSystemAuth(apiToken) {
898 // TODO(twp): look up apiToken with the API server
899 // return true if is_admin is true and if the token
900 // has unlimited scope
904 // IsSystemAuth returns true if the given token is allowed to perform
905 // system level actions like deleting data.
906 func IsSystemAuth(token string) bool {
907 return token != "" && token == theConfig.systemAuthToken