1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
24 "git.curoverse.com/arvados.git/sdk/go/arvados"
25 "git.curoverse.com/arvados.git/sdk/go/ctxlog"
26 "git.curoverse.com/arvados.git/sdk/go/health"
27 "git.curoverse.com/arvados.git/sdk/go/httpserver"
28 "github.com/gorilla/mux"
29 "github.com/prometheus/client_golang/prometheus"
30 "github.com/sirupsen/logrus"
35 cluster *arvados.Cluster
36 logger logrus.FieldLogger
37 remoteProxy remoteProxy
39 volmgr *RRVolumeManager
44 // MakeRESTRouter returns a new router that forwards all Keep requests
45 // to the appropriate handlers.
46 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
48 Router: mux.NewRouter(),
50 logger: ctxlog.FromContext(ctx),
51 metrics: &nodeMetrics{reg: reg},
58 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
60 `/{hash:[0-9a-f]{32}}+{hints}`,
61 rtr.handleGET).Methods("GET", "HEAD")
63 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
64 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
65 // List all blocks stored here. Privileged client only.
66 rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
67 // List blocks stored here whose hash has the given prefix.
68 // Privileged client only.
69 rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
71 // Internals/debugging info (runtime.MemStats)
72 rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
74 // List volumes: path, device number, bytes used/avail.
75 rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
77 // List mounts: UUID, readonly, tier, device ID, ...
78 rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
79 rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
80 rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
82 // Replace the current pull queue.
83 rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
85 // Replace the current trash queue.
86 rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
88 // Untrash moves blocks from trash back into store
89 rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
91 rtr.Handle("/_health/{check}", &health.Handler{
92 Token: cluster.ManagementToken,
96 // Any request which does not match any of these routes gets
98 rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
100 rtr.metrics.setupBufferPoolMetrics(bufs)
101 rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
102 rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
107 // BadRequestHandler is a HandleFunc to address bad requests.
108 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
109 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
112 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
113 ctx, cancel := contextForResponse(context.TODO(), resp)
116 locator := req.URL.Path[1:]
117 if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
118 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
122 if rtr.cluster.Collections.BlobSigning {
123 locator := req.URL.Path[1:] // strip leading slash
124 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
125 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
130 // TODO: Probe volumes to check whether the block _might_
131 // exist. Some volumes/types could support a quick existence
132 // check without causing other operations to suffer. If all
133 // volumes support that, and assure us the block definitely
134 // isn't here, we can return 404 now instead of waiting for a
137 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
139 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
144 size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
146 code := http.StatusInternalServerError
147 if err, ok := err.(*KeepError); ok {
150 http.Error(resp, err.Error(), code)
154 resp.Header().Set("Content-Length", strconv.Itoa(size))
155 resp.Header().Set("Content-Type", "application/octet-stream")
156 resp.Write(buf[:size])
159 // Return a new context that gets cancelled by resp's CloseNotifier.
160 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
161 ctx, cancel := context.WithCancel(parent)
162 if cn, ok := resp.(http.CloseNotifier); ok {
163 go func(c <-chan bool) {
174 // Get a buffer from the pool -- but give up and return a non-nil
175 // error if ctx ends before we get a buffer.
176 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
177 bufReady := make(chan []byte)
179 bufReady <- bufs.Get(bufSize)
182 case buf := <-bufReady:
186 // Even if closeNotifier happened first, we
187 // need to keep waiting for our buf so we can
188 // return it to the pool.
191 return nil, ErrClientDisconnect
195 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
196 ctx, cancel := contextForResponse(context.TODO(), resp)
199 hash := mux.Vars(req)["hash"]
201 // Detect as many error conditions as possible before reading
202 // the body: avoid transmitting data that will not end up
203 // being written anyway.
205 if req.ContentLength == -1 {
206 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
210 if req.ContentLength > BlockSize {
211 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
215 if len(rtr.volmgr.AllWritable()) == 0 {
216 http.Error(resp, FullError.Error(), FullError.HTTPCode)
220 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
222 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
226 _, err = io.ReadFull(req.Body, buf)
228 http.Error(resp, err.Error(), 500)
233 replication, err := PutBlock(ctx, rtr.volmgr, buf, hash)
237 code := http.StatusInternalServerError
238 if err, ok := err.(*KeepError); ok {
241 http.Error(resp, err.Error(), code)
245 // Success; add a size hint, sign the locator if possible, and
246 // return it to the client.
247 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
248 apiToken := GetAPIToken(req)
249 if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
250 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
251 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
253 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
254 resp.Write([]byte(returnHash + "\n"))
257 // IndexHandler responds to "/index", "/index/{prefix}", and
258 // "/mounts/{uuid}/blocks" requests.
259 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
260 if !rtr.isSystemAuth(GetAPIToken(req)) {
261 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
265 prefix := mux.Vars(req)["prefix"]
268 prefix = req.Form.Get("prefix")
271 uuid := mux.Vars(req)["uuid"]
273 var vols []*VolumeMount
275 vols = rtr.volmgr.AllReadable()
276 } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
277 http.Error(resp, "mount not found", http.StatusNotFound)
280 vols = []*VolumeMount{mnt}
283 for _, v := range vols {
284 if err := v.IndexTo(prefix, resp); err != nil {
285 // We can't send an error message to the
286 // client because we might have already sent
287 // headers and index content. All we can do is
288 // log the error in our own logs, and (in
289 // cases where headers haven't been sent yet)
292 // If headers have already been sent, the
293 // client must notice the lack of trailing
294 // newline as an indication that the response
296 log.Printf("index error from volume %s: %s", v, err)
297 http.Error(resp, "", http.StatusInternalServerError)
301 // An empty line at EOF is the only way the client can be
302 // assured the entire index was received.
303 resp.Write([]byte{'\n'})
306 // MountsHandler responds to "GET /mounts" requests.
307 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
308 err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
310 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
315 type PoolStatus struct {
316 Alloc uint64 `json:"BytesAllocatedCumulative"`
317 Cap int `json:"BuffersMax"`
318 Len int `json:"BuffersInUse"`
321 type volumeStatusEnt struct {
323 Status *VolumeStatus `json:",omitempty"`
324 VolumeStats *ioStats `json:",omitempty"`
325 InternalStats interface{} `json:",omitempty"`
329 type NodeStatus struct {
330 Volumes []*volumeStatusEnt
331 BufferPool PoolStatus
332 PullQueue WorkQueueStatus
333 TrashQueue WorkQueueStatus
340 var stLock sync.Mutex
342 // DebugHandler addresses /debug.json requests.
343 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
344 type debugStats struct {
345 MemStats runtime.MemStats
348 runtime.ReadMemStats(&ds.MemStats)
349 err := json.NewEncoder(resp).Encode(&ds)
351 http.Error(resp, err.Error(), 500)
355 // StatusHandler addresses /status.json requests.
356 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
358 rtr.readNodeStatus(&st)
359 jstat, err := json.Marshal(&st)
364 log.Printf("json.Marshal: %s", err)
365 log.Printf("NodeStatus = %v", &st)
366 http.Error(resp, err.Error(), 500)
370 // populate the given NodeStatus struct with current values.
371 func (rtr *router) readNodeStatus(st *NodeStatus) {
373 vols := rtr.volmgr.AllReadable()
374 if cap(st.Volumes) < len(vols) {
375 st.Volumes = make([]*volumeStatusEnt, len(vols))
377 st.Volumes = st.Volumes[:0]
378 for _, vol := range vols {
379 var internalStats interface{}
380 if vol, ok := vol.Volume.(InternalStatser); ok {
381 internalStats = vol.InternalStats()
383 st.Volumes = append(st.Volumes, &volumeStatusEnt{
385 Status: vol.Status(),
386 InternalStats: internalStats,
387 //VolumeStats: rtr.volmgr.VolumeStats(vol),
390 st.BufferPool.Alloc = bufs.Alloc()
391 st.BufferPool.Cap = bufs.Cap()
392 st.BufferPool.Len = bufs.Len()
393 st.PullQueue = getWorkQueueStatus(rtr.pullq)
394 st.TrashQueue = getWorkQueueStatus(rtr.trashq)
397 // return a WorkQueueStatus for the given queue. If q is nil (which
398 // should never happen except in test suites), return a zero status
399 // value instead of crashing.
400 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
402 // This should only happen during tests.
403 return WorkQueueStatus{}
408 // handleDELETE processes DELETE requests.
410 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
411 // from all connected volumes.
413 // Only the Data Manager, or an Arvados admin with scope "all", are
414 // allowed to issue DELETE requests. If a DELETE request is not
415 // authenticated or is issued by a non-admin user, the server returns
416 // a PermissionError.
418 // Upon receiving a valid request from an authorized user,
419 // handleDELETE deletes all copies of the specified block on local
424 // If the requested blocks was not found on any volume, the response
425 // code is HTTP 404 Not Found.
427 // Otherwise, the response code is 200 OK, with a response body
428 // consisting of the JSON message
430 // {"copies_deleted":d,"copies_failed":f}
432 // where d and f are integers representing the number of blocks that
433 // were successfully and unsuccessfully deleted.
435 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
436 hash := mux.Vars(req)["hash"]
438 // Confirm that this user is an admin and has a token with unlimited scope.
439 var tok = GetAPIToken(req)
440 if tok == "" || !rtr.canDelete(tok) {
441 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
445 if !rtr.cluster.Collections.BlobTrash {
446 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
450 // Delete copies of this block from all available volumes.
451 // Report how many blocks were successfully deleted, and how
452 // many were found on writable volumes but not deleted.
454 Deleted int `json:"copies_deleted"`
455 Failed int `json:"copies_failed"`
457 for _, vol := range rtr.volmgr.AllWritable() {
458 if err := vol.Trash(hash); err == nil {
460 } else if os.IsNotExist(err) {
464 log.Println("DeleteHandler:", err)
470 if result.Deleted == 0 && result.Failed == 0 {
471 st = http.StatusNotFound
478 if st == http.StatusOK {
479 if body, err := json.Marshal(result); err == nil {
482 log.Printf("json.Marshal: %s (result = %v)", err, result)
483 http.Error(resp, err.Error(), 500)
488 /* PullHandler processes "PUT /pull" requests for the data manager.
489 The request body is a JSON message containing a list of pull
490 requests in the following format:
494 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
496 "keep0.qr1hi.arvadosapi.com:25107",
497 "keep1.qr1hi.arvadosapi.com:25108"
501 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
511 Each pull request in the list consists of a block locator string
512 and an ordered list of servers. Keepstore should try to fetch the
513 block from each server in turn.
515 If the request has not been sent by the Data Manager, return 401
518 If the JSON unmarshalling fails, return 400 Bad Request.
521 // PullRequest consists of a block locator and an ordered list of servers
522 type PullRequest struct {
523 Locator string `json:"locator"`
524 Servers []string `json:"servers"`
526 // Destination mount, or "" for "anywhere"
527 MountUUID string `json:"mount_uuid"`
530 // PullHandler processes "PUT /pull" requests for the data manager.
531 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
532 // Reject unauthorized requests.
533 if !rtr.isSystemAuth(GetAPIToken(req)) {
534 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
538 // Parse the request body.
540 r := json.NewDecoder(req.Body)
541 if err := r.Decode(&pr); err != nil {
542 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
546 // We have a properly formatted pull list sent from the data
547 // manager. Report success and send the list to the pull list
548 // manager for further handling.
549 resp.WriteHeader(http.StatusOK)
551 fmt.Sprintf("Received %d pull requests\n", len(pr))))
554 for _, p := range pr {
557 rtr.pullq.ReplaceQueue(plist)
560 // TrashRequest consists of a block locator and its Mtime
561 type TrashRequest struct {
562 Locator string `json:"locator"`
563 BlockMtime int64 `json:"block_mtime"`
565 // Target mount, or "" for "everywhere"
566 MountUUID string `json:"mount_uuid"`
569 // TrashHandler processes /trash requests.
570 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
571 // Reject unauthorized requests.
572 if !rtr.isSystemAuth(GetAPIToken(req)) {
573 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
577 // Parse the request body.
578 var trash []TrashRequest
579 r := json.NewDecoder(req.Body)
580 if err := r.Decode(&trash); err != nil {
581 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
585 // We have a properly formatted trash list sent from the data
586 // manager. Report success and send the list to the trash work
587 // queue for further handling.
588 resp.WriteHeader(http.StatusOK)
590 fmt.Sprintf("Received %d trash requests\n", len(trash))))
593 for _, t := range trash {
596 rtr.trashq.ReplaceQueue(tlist)
599 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
600 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
601 // Reject unauthorized requests.
602 if !rtr.isSystemAuth(GetAPIToken(req)) {
603 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
607 hash := mux.Vars(req)["hash"]
609 if len(rtr.volmgr.AllWritable()) == 0 {
610 http.Error(resp, "No writable volumes", http.StatusNotFound)
614 var untrashedOn, failedOn []string
616 for _, vol := range rtr.volmgr.AllWritable() {
617 err := vol.Untrash(hash)
619 if os.IsNotExist(err) {
621 } else if err != nil {
622 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
623 failedOn = append(failedOn, vol.String())
625 log.Printf("Untrashed %v on volume %v", hash, vol.String())
626 untrashedOn = append(untrashedOn, vol.String())
630 if numNotFound == len(rtr.volmgr.AllWritable()) {
631 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
635 if len(failedOn) == len(rtr.volmgr.AllWritable()) {
636 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
638 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
639 if len(failedOn) > 0 {
640 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
642 resp.Write([]byte(respBody))
646 // GetBlock and PutBlock implement lower-level code for handling
647 // blocks by rooting through volumes connected to the local machine.
648 // Once the handler has determined that system policy permits the
649 // request, it calls these methods to perform the actual operation.
651 // TODO(twp): this code would probably be better located in the
652 // VolumeManager interface. As an abstraction, the VolumeManager
653 // should be the only part of the code that cares about which volume a
654 // block is stored on, so it should be responsible for figuring out
655 // which volume to check for fetching blocks, storing blocks, etc.
657 // GetBlock fetches the block identified by "hash" into the provided
658 // buf, and returns the data size.
660 // If the block cannot be found on any volume, returns NotFoundError.
662 // If the block found does not have the correct MD5 hash, returns
665 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
666 // Attempt to read the requested hash from a keep volume.
667 errorToCaller := NotFoundError
669 for _, vol := range volmgr.AllReadable() {
670 size, err := vol.Get(ctx, hash, buf)
673 return 0, ErrClientDisconnect
677 // IsNotExist is an expected error and may be
678 // ignored. All other errors are logged. In
679 // any case we continue trying to read other
680 // volumes. If all volumes report IsNotExist,
681 // we return a NotFoundError.
682 if !os.IsNotExist(err) {
683 log.Printf("%s: Get(%s): %s", vol, hash, err)
685 // If some volume returns a transient error, return it to the caller
686 // instead of "Not found" so it can retry.
687 if err == VolumeBusyError {
688 errorToCaller = err.(*KeepError)
692 // Check the file checksum.
694 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
695 if filehash != hash {
696 // TODO: Try harder to tell a sysadmin about
698 log.Printf("%s: checksum mismatch for request %s (actual %s)",
700 errorToCaller = DiskHashError
703 if errorToCaller == DiskHashError {
704 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
709 return 0, errorToCaller
712 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
714 // PutBlock(ctx, block, hash)
715 // Stores the BLOCK (identified by the content id HASH) in Keep.
717 // The MD5 checksum of the block must be identical to the content id HASH.
718 // If not, an error is returned.
720 // PutBlock stores the BLOCK on the first Keep volume with free space.
721 // A failure code is returned to the user only if all volumes fail.
723 // On success, PutBlock returns nil.
724 // On failure, it returns a KeepError with one of the following codes:
727 // A different block with the same hash already exists on this
730 // The MD5 hash of the BLOCK does not match the argument HASH.
732 // There was not enough space left in any Keep volume to store
735 // The object could not be stored for some other reason (e.g.
736 // all writes failed). The text of the error message should
737 // provide as much detail as possible.
739 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
740 // Check that BLOCK's checksum matches HASH.
741 blockhash := fmt.Sprintf("%x", md5.Sum(block))
742 if blockhash != hash {
743 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
744 return 0, RequestHashError
747 // If we already have this data, it's intact on disk, and we
748 // can update its timestamp, return success. If we have
749 // different data with the same hash, return failure.
750 if n, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
752 } else if ctx.Err() != nil {
753 return 0, ErrClientDisconnect
756 // Choose a Keep volume to write to.
757 // If this volume fails, try all of the volumes in order.
758 if mnt := volmgr.NextWritable(); mnt != nil {
759 if err := mnt.Put(ctx, hash, block); err == nil {
760 return mnt.Replication, nil // success!
762 if ctx.Err() != nil {
763 return 0, ErrClientDisconnect
767 writables := volmgr.AllWritable()
768 if len(writables) == 0 {
769 log.Print("No writable volumes.")
774 for _, vol := range writables {
775 err := vol.Put(ctx, hash, block)
776 if ctx.Err() != nil {
777 return 0, ErrClientDisconnect
780 return vol.Replication, nil // success!
782 if err != FullError {
783 // The volume is not full but the
784 // write did not succeed. Report the
785 // error and continue trying.
787 log.Printf("%s: Write(%s): %s", vol, hash, err)
792 log.Print("All volumes are full.")
795 // Already logged the non-full errors.
796 return 0, GenericError
799 // CompareAndTouch returns the current replication level if one of the
800 // volumes already has the given content and it successfully updates
801 // the relevant block's modification time in order to protect it from
802 // premature garbage collection. Otherwise, it returns a non-nil
804 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
805 var bestErr error = NotFoundError
806 for _, mnt := range volmgr.AllWritable() {
807 err := mnt.Compare(ctx, hash, buf)
808 if ctx.Err() != nil {
810 } else if err == CollisionError {
811 // Stop if we have a block with same hash but
812 // different content. (It will be impossible
813 // to tell which one is wanted if we have
814 // both, so there's no point writing it even
815 // on a different volume.)
816 log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
818 } else if os.IsNotExist(err) {
819 // Block does not exist. This is the only
820 // "normal" error: we don't log anything.
822 } else if err != nil {
823 // Couldn't open file, data is corrupt on
824 // disk, etc.: log this abnormal condition,
825 // and try the next volume.
826 log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
829 if err := mnt.Touch(hash); err != nil {
830 log.Printf("%s: Touch %s failed: %s", mnt.Volume, hash, err)
834 // Compare and Touch both worked --> done.
835 return mnt.Replication, nil
840 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
842 // IsValidLocator returns true if the specified string is a valid Keep locator.
843 // When Keep is extended to support hash types other than MD5,
844 // this should be updated to cover those as well.
846 func IsValidLocator(loc string) bool {
847 return validLocatorRe.MatchString(loc)
850 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
852 // GetAPIToken returns the OAuth2 token from the Authorization
853 // header of a HTTP request, or an empty string if no matching
855 func GetAPIToken(req *http.Request) string {
856 if auth, ok := req.Header["Authorization"]; ok {
857 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
864 // IsExpired returns true if the given Unix timestamp (expressed as a
865 // hexadecimal string) is in the past, or if timestampHex cannot be
866 // parsed as a hexadecimal string.
867 func IsExpired(timestampHex string) bool {
868 ts, err := strconv.ParseInt(timestampHex, 16, 0)
870 log.Printf("IsExpired: %s", err)
873 return time.Unix(ts, 0).Before(time.Now())
876 // canDelete returns true if the user identified by apiToken is
877 // allowed to delete blocks.
878 func (rtr *router) canDelete(apiToken string) bool {
882 // Blocks may be deleted only when Keep has been configured with a
884 if rtr.isSystemAuth(apiToken) {
887 // TODO(twp): look up apiToken with the API server
888 // return true if is_admin is true and if the token
889 // has unlimited scope
893 // isSystemAuth returns true if the given token is allowed to perform
894 // system level actions like deleting data.
895 func (rtr *router) isSystemAuth(token string) bool {
896 return token != "" && token == rtr.cluster.SystemRootToken