1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
23 "git.curoverse.com/arvados.git/sdk/go/arvados"
24 "git.curoverse.com/arvados.git/sdk/go/ctxlog"
25 "git.curoverse.com/arvados.git/sdk/go/health"
26 "git.curoverse.com/arvados.git/sdk/go/httpserver"
27 "github.com/gorilla/mux"
28 "github.com/prometheus/client_golang/prometheus"
33 limiter httpserver.RequestCounter
34 cluster *arvados.Cluster
35 remoteProxy remoteProxy
39 // MakeRESTRouter returns a new router that forwards all Keep requests
40 // to the appropriate handlers.
41 func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
43 Router: mux.NewRouter(),
45 metrics: &nodeMetrics{reg: reg},
49 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
51 `/{hash:[0-9a-f]{32}}+{hints}`,
52 rtr.handleGET).Methods("GET", "HEAD")
54 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
55 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
56 // List all blocks stored here. Privileged client only.
57 rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
58 // List blocks stored here whose hash has the given prefix.
59 // Privileged client only.
60 rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
62 // Internals/debugging info (runtime.MemStats)
63 rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
65 // List volumes: path, device number, bytes used/avail.
66 rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
68 // List mounts: UUID, readonly, tier, device ID, ...
69 rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
70 rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
71 rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
73 // Replace the current pull queue.
74 rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
76 // Replace the current trash queue.
77 rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
79 // Untrash moves blocks from trash back into store
80 rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
82 rtr.Handle("/_health/{check}", &health.Handler{
83 Token: theConfig.ManagementToken,
87 // Any request which does not match any of these routes gets
89 rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
91 rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
92 rtr.metrics.setupBufferPoolMetrics(bufs)
93 rtr.metrics.setupWorkQueueMetrics(pullq, "pull")
94 rtr.metrics.setupWorkQueueMetrics(trashq, "trash")
95 rtr.metrics.setupRequestMetrics(rtr.limiter)
97 instrumented := httpserver.Instrument(rtr.metrics.reg, log,
98 httpserver.HandlerWithContext(
99 ctxlog.Context(context.Background(), log),
100 httpserver.AddRequestIDs(httpserver.LogRequests(rtr.limiter))))
101 return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
104 // BadRequestHandler is a HandleFunc to address bad requests.
105 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
106 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
109 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
110 ctx, cancel := contextForResponse(context.TODO(), resp)
113 locator := req.URL.Path[1:]
114 if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
115 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster)
119 if theConfig.RequireSignatures {
120 locator := req.URL.Path[1:] // strip leading slash
121 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
122 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
127 // TODO: Probe volumes to check whether the block _might_
128 // exist. Some volumes/types could support a quick existence
129 // check without causing other operations to suffer. If all
130 // volumes support that, and assure us the block definitely
131 // isn't here, we can return 404 now instead of waiting for a
134 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
136 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
141 size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
143 code := http.StatusInternalServerError
144 if err, ok := err.(*KeepError); ok {
147 http.Error(resp, err.Error(), code)
151 resp.Header().Set("Content-Length", strconv.Itoa(size))
152 resp.Header().Set("Content-Type", "application/octet-stream")
153 resp.Write(buf[:size])
156 // Return a new context that gets cancelled by resp's CloseNotifier.
157 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
158 ctx, cancel := context.WithCancel(parent)
159 if cn, ok := resp.(http.CloseNotifier); ok {
160 go func(c <-chan bool) {
163 theConfig.debugLogf("cancel context")
172 // Get a buffer from the pool -- but give up and return a non-nil
173 // error if ctx ends before we get a buffer.
174 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
175 bufReady := make(chan []byte)
177 bufReady <- bufs.Get(bufSize)
180 case buf := <-bufReady:
184 // Even if closeNotifier happened first, we
185 // need to keep waiting for our buf so we can
186 // return it to the pool.
189 return nil, ErrClientDisconnect
193 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
194 ctx, cancel := contextForResponse(context.TODO(), resp)
197 hash := mux.Vars(req)["hash"]
199 // Detect as many error conditions as possible before reading
200 // the body: avoid transmitting data that will not end up
201 // being written anyway.
203 if req.ContentLength == -1 {
204 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
208 if req.ContentLength > BlockSize {
209 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
213 if len(KeepVM.AllWritable()) == 0 {
214 http.Error(resp, FullError.Error(), FullError.HTTPCode)
218 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
220 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
224 _, err = io.ReadFull(req.Body, buf)
226 http.Error(resp, err.Error(), 500)
231 replication, err := PutBlock(ctx, buf, hash)
235 code := http.StatusInternalServerError
236 if err, ok := err.(*KeepError); ok {
239 http.Error(resp, err.Error(), code)
243 // Success; add a size hint, sign the locator if possible, and
244 // return it to the client.
245 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
246 apiToken := GetAPIToken(req)
247 if theConfig.blobSigningKey != nil && apiToken != "" {
248 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
249 returnHash = SignLocator(returnHash, apiToken, expiry)
251 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
252 resp.Write([]byte(returnHash + "\n"))
255 // IndexHandler responds to "/index", "/index/{prefix}", and
256 // "/mounts/{uuid}/blocks" requests.
257 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
258 if !IsSystemAuth(GetAPIToken(req)) {
259 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
263 prefix := mux.Vars(req)["prefix"]
266 prefix = req.Form.Get("prefix")
269 uuid := mux.Vars(req)["uuid"]
273 vols = KeepVM.AllReadable()
274 } else if v := KeepVM.Lookup(uuid, false); v == nil {
275 http.Error(resp, "mount not found", http.StatusNotFound)
281 for _, v := range vols {
282 if err := v.IndexTo(prefix, resp); err != nil {
283 // We can't send an error message to the
284 // client because we might have already sent
285 // headers and index content. All we can do is
286 // log the error in our own logs, and (in
287 // cases where headers haven't been sent yet)
290 // If headers have already been sent, the
291 // client must notice the lack of trailing
292 // newline as an indication that the response
294 log.Printf("index error from volume %s: %s", v, err)
295 http.Error(resp, "", http.StatusInternalServerError)
299 // An empty line at EOF is the only way the client can be
300 // assured the entire index was received.
301 resp.Write([]byte{'\n'})
304 // MountsHandler responds to "GET /mounts" requests.
305 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
306 err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
308 http.Error(resp, err.Error(), http.StatusInternalServerError)
313 type PoolStatus struct {
314 Alloc uint64 `json:"BytesAllocatedCumulative"`
315 Cap int `json:"BuffersMax"`
316 Len int `json:"BuffersInUse"`
319 type volumeStatusEnt struct {
321 Status *VolumeStatus `json:",omitempty"`
322 VolumeStats *ioStats `json:",omitempty"`
323 InternalStats interface{} `json:",omitempty"`
327 type NodeStatus struct {
328 Volumes []*volumeStatusEnt
329 BufferPool PoolStatus
330 PullQueue WorkQueueStatus
331 TrashQueue WorkQueueStatus
338 var stLock sync.Mutex
340 // DebugHandler addresses /debug.json requests.
341 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
342 type debugStats struct {
343 MemStats runtime.MemStats
346 runtime.ReadMemStats(&ds.MemStats)
347 err := json.NewEncoder(resp).Encode(&ds)
349 http.Error(resp, err.Error(), 500)
353 // StatusHandler addresses /status.json requests.
354 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
356 rtr.readNodeStatus(&st)
357 jstat, err := json.Marshal(&st)
362 log.Printf("json.Marshal: %s", err)
363 log.Printf("NodeStatus = %v", &st)
364 http.Error(resp, err.Error(), 500)
368 // populate the given NodeStatus struct with current values.
369 func (rtr *router) readNodeStatus(st *NodeStatus) {
371 vols := KeepVM.AllReadable()
372 if cap(st.Volumes) < len(vols) {
373 st.Volumes = make([]*volumeStatusEnt, len(vols))
375 st.Volumes = st.Volumes[:0]
376 for _, vol := range vols {
377 var internalStats interface{}
378 if vol, ok := vol.(InternalStatser); ok {
379 internalStats = vol.InternalStats()
381 st.Volumes = append(st.Volumes, &volumeStatusEnt{
383 Status: vol.Status(),
384 InternalStats: internalStats,
385 //VolumeStats: KeepVM.VolumeStats(vol),
388 st.BufferPool.Alloc = bufs.Alloc()
389 st.BufferPool.Cap = bufs.Cap()
390 st.BufferPool.Len = bufs.Len()
391 st.PullQueue = getWorkQueueStatus(pullq)
392 st.TrashQueue = getWorkQueueStatus(trashq)
393 if rtr.limiter != nil {
394 st.RequestsCurrent = rtr.limiter.Current()
395 st.RequestsMax = rtr.limiter.Max()
399 // return a WorkQueueStatus for the given queue. If q is nil (which
400 // should never happen except in test suites), return a zero status
401 // value instead of crashing.
402 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
404 // This should only happen during tests.
405 return WorkQueueStatus{}
410 // DeleteHandler processes DELETE requests.
412 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
413 // from all connected volumes.
415 // Only the Data Manager, or an Arvados admin with scope "all", are
416 // allowed to issue DELETE requests. If a DELETE request is not
417 // authenticated or is issued by a non-admin user, the server returns
418 // a PermissionError.
420 // Upon receiving a valid request from an authorized user,
421 // DeleteHandler deletes all copies of the specified block on local
426 // If the requested blocks was not found on any volume, the response
427 // code is HTTP 404 Not Found.
429 // Otherwise, the response code is 200 OK, with a response body
430 // consisting of the JSON message
432 // {"copies_deleted":d,"copies_failed":f}
434 // where d and f are integers representing the number of blocks that
435 // were successfully and unsuccessfully deleted.
437 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
438 hash := mux.Vars(req)["hash"]
440 // Confirm that this user is an admin and has a token with unlimited scope.
441 var tok = GetAPIToken(req)
442 if tok == "" || !CanDelete(tok) {
443 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
447 if !theConfig.EnableDelete {
448 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
452 // Delete copies of this block from all available volumes.
453 // Report how many blocks were successfully deleted, and how
454 // many were found on writable volumes but not deleted.
456 Deleted int `json:"copies_deleted"`
457 Failed int `json:"copies_failed"`
459 for _, vol := range KeepVM.AllWritable() {
460 if err := vol.Trash(hash); err == nil {
462 } else if os.IsNotExist(err) {
466 log.Println("DeleteHandler:", err)
472 if result.Deleted == 0 && result.Failed == 0 {
473 st = http.StatusNotFound
480 if st == http.StatusOK {
481 if body, err := json.Marshal(result); err == nil {
484 log.Printf("json.Marshal: %s (result = %v)", err, result)
485 http.Error(resp, err.Error(), 500)
490 /* PullHandler processes "PUT /pull" requests for the data manager.
491 The request body is a JSON message containing a list of pull
492 requests in the following format:
496 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
498 "keep0.qr1hi.arvadosapi.com:25107",
499 "keep1.qr1hi.arvadosapi.com:25108"
503 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
513 Each pull request in the list consists of a block locator string
514 and an ordered list of servers. Keepstore should try to fetch the
515 block from each server in turn.
517 If the request has not been sent by the Data Manager, return 401
520 If the JSON unmarshalling fails, return 400 Bad Request.
523 // PullRequest consists of a block locator and an ordered list of servers
524 type PullRequest struct {
525 Locator string `json:"locator"`
526 Servers []string `json:"servers"`
528 // Destination mount, or "" for "anywhere"
529 MountUUID string `json:"mount_uuid"`
532 // PullHandler processes "PUT /pull" requests for the data manager.
533 func PullHandler(resp http.ResponseWriter, req *http.Request) {
534 // Reject unauthorized requests.
535 if !IsSystemAuth(GetAPIToken(req)) {
536 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
540 // Parse the request body.
542 r := json.NewDecoder(req.Body)
543 if err := r.Decode(&pr); err != nil {
544 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
548 // We have a properly formatted pull list sent from the data
549 // manager. Report success and send the list to the pull list
550 // manager for further handling.
551 resp.WriteHeader(http.StatusOK)
553 fmt.Sprintf("Received %d pull requests\n", len(pr))))
556 for _, p := range pr {
559 pullq.ReplaceQueue(plist)
562 // TrashRequest consists of a block locator and its Mtime
563 type TrashRequest struct {
564 Locator string `json:"locator"`
565 BlockMtime int64 `json:"block_mtime"`
567 // Target mount, or "" for "everywhere"
568 MountUUID string `json:"mount_uuid"`
571 // TrashHandler processes /trash requests.
572 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
573 // Reject unauthorized requests.
574 if !IsSystemAuth(GetAPIToken(req)) {
575 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
579 // Parse the request body.
580 var trash []TrashRequest
581 r := json.NewDecoder(req.Body)
582 if err := r.Decode(&trash); err != nil {
583 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
587 // We have a properly formatted trash list sent from the data
588 // manager. Report success and send the list to the trash work
589 // queue for further handling.
590 resp.WriteHeader(http.StatusOK)
592 fmt.Sprintf("Received %d trash requests\n", len(trash))))
595 for _, t := range trash {
598 trashq.ReplaceQueue(tlist)
601 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
602 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
603 // Reject unauthorized requests.
604 if !IsSystemAuth(GetAPIToken(req)) {
605 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
609 hash := mux.Vars(req)["hash"]
611 if len(KeepVM.AllWritable()) == 0 {
612 http.Error(resp, "No writable volumes", http.StatusNotFound)
616 var untrashedOn, failedOn []string
618 for _, vol := range KeepVM.AllWritable() {
619 err := vol.Untrash(hash)
621 if os.IsNotExist(err) {
623 } else if err != nil {
624 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
625 failedOn = append(failedOn, vol.String())
627 log.Printf("Untrashed %v on volume %v", hash, vol.String())
628 untrashedOn = append(untrashedOn, vol.String())
632 if numNotFound == len(KeepVM.AllWritable()) {
633 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
637 if len(failedOn) == len(KeepVM.AllWritable()) {
638 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
640 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
641 if len(failedOn) > 0 {
642 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
644 resp.Write([]byte(respBody))
648 // GetBlock and PutBlock implement lower-level code for handling
649 // blocks by rooting through volumes connected to the local machine.
650 // Once the handler has determined that system policy permits the
651 // request, it calls these methods to perform the actual operation.
653 // TODO(twp): this code would probably be better located in the
654 // VolumeManager interface. As an abstraction, the VolumeManager
655 // should be the only part of the code that cares about which volume a
656 // block is stored on, so it should be responsible for figuring out
657 // which volume to check for fetching blocks, storing blocks, etc.
659 // GetBlock fetches the block identified by "hash" into the provided
660 // buf, and returns the data size.
662 // If the block cannot be found on any volume, returns NotFoundError.
664 // If the block found does not have the correct MD5 hash, returns
667 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
668 // Attempt to read the requested hash from a keep volume.
669 errorToCaller := NotFoundError
671 for _, vol := range KeepVM.AllReadable() {
672 size, err := vol.Get(ctx, hash, buf)
675 return 0, ErrClientDisconnect
679 // IsNotExist is an expected error and may be
680 // ignored. All other errors are logged. In
681 // any case we continue trying to read other
682 // volumes. If all volumes report IsNotExist,
683 // we return a NotFoundError.
684 if !os.IsNotExist(err) {
685 log.Printf("%s: Get(%s): %s", vol, hash, err)
687 // If some volume returns a transient error, return it to the caller
688 // instead of "Not found" so it can retry.
689 if err == VolumeBusyError {
690 errorToCaller = err.(*KeepError)
694 // Check the file checksum.
696 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
697 if filehash != hash {
698 // TODO: Try harder to tell a sysadmin about
700 log.Printf("%s: checksum mismatch for request %s (actual %s)",
702 errorToCaller = DiskHashError
705 if errorToCaller == DiskHashError {
706 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
711 return 0, errorToCaller
714 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
716 // PutBlock(ctx, block, hash)
717 // Stores the BLOCK (identified by the content id HASH) in Keep.
719 // The MD5 checksum of the block must be identical to the content id HASH.
720 // If not, an error is returned.
722 // PutBlock stores the BLOCK on the first Keep volume with free space.
723 // A failure code is returned to the user only if all volumes fail.
725 // On success, PutBlock returns nil.
726 // On failure, it returns a KeepError with one of the following codes:
729 // A different block with the same hash already exists on this
732 // The MD5 hash of the BLOCK does not match the argument HASH.
734 // There was not enough space left in any Keep volume to store
737 // The object could not be stored for some other reason (e.g.
738 // all writes failed). The text of the error message should
739 // provide as much detail as possible.
741 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
742 // Check that BLOCK's checksum matches HASH.
743 blockhash := fmt.Sprintf("%x", md5.Sum(block))
744 if blockhash != hash {
745 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
746 return 0, RequestHashError
749 // If we already have this data, it's intact on disk, and we
750 // can update its timestamp, return success. If we have
751 // different data with the same hash, return failure.
752 if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
754 } else if ctx.Err() != nil {
755 return 0, ErrClientDisconnect
758 // Choose a Keep volume to write to.
759 // If this volume fails, try all of the volumes in order.
760 if vol := KeepVM.NextWritable(); vol != nil {
761 if err := vol.Put(ctx, hash, block); err == nil {
762 return vol.Replication(), nil // success!
764 if ctx.Err() != nil {
765 return 0, ErrClientDisconnect
769 writables := KeepVM.AllWritable()
770 if len(writables) == 0 {
771 log.Print("No writable volumes.")
776 for _, vol := range writables {
777 err := vol.Put(ctx, hash, block)
778 if ctx.Err() != nil {
779 return 0, ErrClientDisconnect
782 return vol.Replication(), nil // success!
784 if err != FullError {
785 // The volume is not full but the
786 // write did not succeed. Report the
787 // error and continue trying.
789 log.Printf("%s: Write(%s): %s", vol, hash, err)
794 log.Print("All volumes are full.")
797 // Already logged the non-full errors.
798 return 0, GenericError
801 // CompareAndTouch returns the current replication level if one of the
802 // volumes already has the given content and it successfully updates
803 // the relevant block's modification time in order to protect it from
804 // premature garbage collection. Otherwise, it returns a non-nil
806 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
807 var bestErr error = NotFoundError
808 for _, vol := range KeepVM.AllWritable() {
809 err := vol.Compare(ctx, hash, buf)
810 if ctx.Err() != nil {
812 } else if err == CollisionError {
813 // Stop if we have a block with same hash but
814 // different content. (It will be impossible
815 // to tell which one is wanted if we have
816 // both, so there's no point writing it even
817 // on a different volume.)
818 log.Printf("%s: Compare(%s): %s", vol, hash, err)
820 } else if os.IsNotExist(err) {
821 // Block does not exist. This is the only
822 // "normal" error: we don't log anything.
824 } else if err != nil {
825 // Couldn't open file, data is corrupt on
826 // disk, etc.: log this abnormal condition,
827 // and try the next volume.
828 log.Printf("%s: Compare(%s): %s", vol, hash, err)
831 if err := vol.Touch(hash); err != nil {
832 log.Printf("%s: Touch %s failed: %s", vol, hash, err)
836 // Compare and Touch both worked --> done.
837 return vol.Replication(), nil
842 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
844 // IsValidLocator returns true if the specified string is a valid Keep locator.
845 // When Keep is extended to support hash types other than MD5,
846 // this should be updated to cover those as well.
848 func IsValidLocator(loc string) bool {
849 return validLocatorRe.MatchString(loc)
852 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
854 // GetAPIToken returns the OAuth2 token from the Authorization
855 // header of a HTTP request, or an empty string if no matching
857 func GetAPIToken(req *http.Request) string {
858 if auth, ok := req.Header["Authorization"]; ok {
859 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
866 // IsExpired returns true if the given Unix timestamp (expressed as a
867 // hexadecimal string) is in the past, or if timestampHex cannot be
868 // parsed as a hexadecimal string.
869 func IsExpired(timestampHex string) bool {
870 ts, err := strconv.ParseInt(timestampHex, 16, 0)
872 log.Printf("IsExpired: %s", err)
875 return time.Unix(ts, 0).Before(time.Now())
878 // CanDelete returns true if the user identified by apiToken is
879 // allowed to delete blocks.
880 func CanDelete(apiToken string) bool {
884 // Blocks may be deleted only when Keep has been configured with a
886 if IsSystemAuth(apiToken) {
889 // TODO(twp): look up apiToken with the API server
890 // return true if is_admin is true and if the token
891 // has unlimited scope
895 // IsSystemAuth returns true if the given token is allowed to perform
896 // system level actions like deleting data.
897 func IsSystemAuth(token string) bool {
898 return token != "" && token == theConfig.systemAuthToken