1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
24 "git.curoverse.com/arvados.git/sdk/go/arvados"
25 "git.curoverse.com/arvados.git/sdk/go/ctxlog"
26 "git.curoverse.com/arvados.git/sdk/go/health"
27 "git.curoverse.com/arvados.git/sdk/go/httpserver"
28 "github.com/gorilla/mux"
29 "github.com/prometheus/client_golang/prometheus"
30 "github.com/sirupsen/logrus"
35 cluster *arvados.Cluster
36 logger logrus.FieldLogger
37 remoteProxy remoteProxy
39 volmgr *RRVolumeManager
44 // MakeRESTRouter returns a new router that forwards all Keep requests
45 // to the appropriate handlers.
46 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
48 Router: mux.NewRouter(),
50 logger: ctxlog.FromContext(ctx),
51 metrics: &nodeMetrics{reg: reg},
58 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
60 `/{hash:[0-9a-f]{32}}+{hints}`,
61 rtr.handleGET).Methods("GET", "HEAD")
63 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
64 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
65 // List all blocks stored here. Privileged client only.
66 rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
67 // List blocks stored here whose hash has the given prefix.
68 // Privileged client only.
69 rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
71 // Internals/debugging info (runtime.MemStats)
72 rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
74 // List volumes: path, device number, bytes used/avail.
75 rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
77 // List mounts: UUID, readonly, tier, device ID, ...
78 rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
79 rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
80 rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
82 // Replace the current pull queue.
83 rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
85 // Replace the current trash queue.
86 rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
88 // Untrash moves blocks from trash back into store
89 rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
91 rtr.Handle("/_health/{check}", &health.Handler{
92 Token: cluster.ManagementToken,
96 // Any request which does not match any of these routes gets
98 rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
100 rtr.metrics.setupBufferPoolMetrics(bufs)
101 rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
102 rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
107 // BadRequestHandler is a HandleFunc to address bad requests.
108 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
109 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
112 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
113 ctx, cancel := contextForResponse(context.TODO(), resp)
116 locator := req.URL.Path[1:]
117 if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
118 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
122 if rtr.cluster.Collections.BlobSigning {
123 locator := req.URL.Path[1:] // strip leading slash
124 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
125 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
130 // TODO: Probe volumes to check whether the block _might_
131 // exist. Some volumes/types could support a quick existence
132 // check without causing other operations to suffer. If all
133 // volumes support that, and assure us the block definitely
134 // isn't here, we can return 404 now instead of waiting for a
137 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
139 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
144 size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
146 code := http.StatusInternalServerError
147 if err, ok := err.(*KeepError); ok {
150 http.Error(resp, err.Error(), code)
154 resp.Header().Set("Content-Length", strconv.Itoa(size))
155 resp.Header().Set("Content-Type", "application/octet-stream")
156 resp.Write(buf[:size])
159 // Return a new context that gets cancelled by resp's CloseNotifier.
160 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
161 ctx, cancel := context.WithCancel(parent)
162 if cn, ok := resp.(http.CloseNotifier); ok {
163 go func(c <-chan bool) {
174 // Get a buffer from the pool -- but give up and return a non-nil
175 // error if ctx ends before we get a buffer.
176 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
177 bufReady := make(chan []byte)
179 bufReady <- bufs.Get(bufSize)
182 case buf := <-bufReady:
186 // Even if closeNotifier happened first, we
187 // need to keep waiting for our buf so we can
188 // return it to the pool.
191 return nil, ErrClientDisconnect
195 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
196 ctx, cancel := contextForResponse(context.TODO(), resp)
199 hash := mux.Vars(req)["hash"]
201 // Detect as many error conditions as possible before reading
202 // the body: avoid transmitting data that will not end up
203 // being written anyway.
205 if req.ContentLength == -1 {
206 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
210 if req.ContentLength > BlockSize {
211 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
215 if len(rtr.volmgr.AllWritable()) == 0 {
216 http.Error(resp, FullError.Error(), FullError.HTTPCode)
220 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
222 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
226 _, err = io.ReadFull(req.Body, buf)
228 http.Error(resp, err.Error(), 500)
233 replication, err := PutBlock(ctx, rtr.volmgr, buf, hash)
237 code := http.StatusInternalServerError
238 if err, ok := err.(*KeepError); ok {
241 http.Error(resp, err.Error(), code)
245 // Success; add a size hint, sign the locator if possible, and
246 // return it to the client.
247 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
248 apiToken := GetAPIToken(req)
249 if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
250 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
251 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
253 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
254 resp.Write([]byte(returnHash + "\n"))
257 // IndexHandler responds to "/index", "/index/{prefix}", and
258 // "/mounts/{uuid}/blocks" requests.
259 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
260 if !rtr.isSystemAuth(GetAPIToken(req)) {
261 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
265 prefix := mux.Vars(req)["prefix"]
268 prefix = req.Form.Get("prefix")
271 uuid := mux.Vars(req)["uuid"]
273 var vols []*VolumeMount
275 vols = rtr.volmgr.AllReadable()
276 } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
277 http.Error(resp, "mount not found", http.StatusNotFound)
280 vols = []*VolumeMount{mnt}
283 for _, v := range vols {
284 if err := v.IndexTo(prefix, resp); err != nil {
285 // We can't send an error status/message to
286 // the client because IndexTo() might have
287 // already written body content. All we can do
288 // is log the error in our own logs.
290 // The client must notice the lack of trailing
291 // newline as an indication that the response
293 ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
297 // An empty line at EOF is the only way the client can be
298 // assured the entire index was received.
299 resp.Write([]byte{'\n'})
302 // MountsHandler responds to "GET /mounts" requests.
303 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
304 err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
306 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
311 type PoolStatus struct {
312 Alloc uint64 `json:"BytesAllocatedCumulative"`
313 Cap int `json:"BuffersMax"`
314 Len int `json:"BuffersInUse"`
317 type volumeStatusEnt struct {
319 Status *VolumeStatus `json:",omitempty"`
320 VolumeStats *ioStats `json:",omitempty"`
321 InternalStats interface{} `json:",omitempty"`
325 type NodeStatus struct {
326 Volumes []*volumeStatusEnt
327 BufferPool PoolStatus
328 PullQueue WorkQueueStatus
329 TrashQueue WorkQueueStatus
336 var stLock sync.Mutex
338 // DebugHandler addresses /debug.json requests.
339 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
340 type debugStats struct {
341 MemStats runtime.MemStats
344 runtime.ReadMemStats(&ds.MemStats)
345 err := json.NewEncoder(resp).Encode(&ds)
347 http.Error(resp, err.Error(), 500)
351 // StatusHandler addresses /status.json requests.
352 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
354 rtr.readNodeStatus(&st)
355 jstat, err := json.Marshal(&st)
360 log.Printf("json.Marshal: %s", err)
361 log.Printf("NodeStatus = %v", &st)
362 http.Error(resp, err.Error(), 500)
366 // populate the given NodeStatus struct with current values.
367 func (rtr *router) readNodeStatus(st *NodeStatus) {
369 vols := rtr.volmgr.AllReadable()
370 if cap(st.Volumes) < len(vols) {
371 st.Volumes = make([]*volumeStatusEnt, len(vols))
373 st.Volumes = st.Volumes[:0]
374 for _, vol := range vols {
375 var internalStats interface{}
376 if vol, ok := vol.Volume.(InternalStatser); ok {
377 internalStats = vol.InternalStats()
379 st.Volumes = append(st.Volumes, &volumeStatusEnt{
381 Status: vol.Status(),
382 InternalStats: internalStats,
383 //VolumeStats: rtr.volmgr.VolumeStats(vol),
386 st.BufferPool.Alloc = bufs.Alloc()
387 st.BufferPool.Cap = bufs.Cap()
388 st.BufferPool.Len = bufs.Len()
389 st.PullQueue = getWorkQueueStatus(rtr.pullq)
390 st.TrashQueue = getWorkQueueStatus(rtr.trashq)
393 // return a WorkQueueStatus for the given queue. If q is nil (which
394 // should never happen except in test suites), return a zero status
395 // value instead of crashing.
396 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
398 // This should only happen during tests.
399 return WorkQueueStatus{}
404 // handleDELETE processes DELETE requests.
406 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
407 // from all connected volumes.
409 // Only the Data Manager, or an Arvados admin with scope "all", are
410 // allowed to issue DELETE requests. If a DELETE request is not
411 // authenticated or is issued by a non-admin user, the server returns
412 // a PermissionError.
414 // Upon receiving a valid request from an authorized user,
415 // handleDELETE deletes all copies of the specified block on local
420 // If the requested blocks was not found on any volume, the response
421 // code is HTTP 404 Not Found.
423 // Otherwise, the response code is 200 OK, with a response body
424 // consisting of the JSON message
426 // {"copies_deleted":d,"copies_failed":f}
428 // where d and f are integers representing the number of blocks that
429 // were successfully and unsuccessfully deleted.
431 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
432 hash := mux.Vars(req)["hash"]
434 // Confirm that this user is an admin and has a token with unlimited scope.
435 var tok = GetAPIToken(req)
436 if tok == "" || !rtr.canDelete(tok) {
437 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
441 if !rtr.cluster.Collections.BlobTrash {
442 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
446 // Delete copies of this block from all available volumes.
447 // Report how many blocks were successfully deleted, and how
448 // many were found on writable volumes but not deleted.
450 Deleted int `json:"copies_deleted"`
451 Failed int `json:"copies_failed"`
453 for _, vol := range rtr.volmgr.AllWritable() {
454 if err := vol.Trash(hash); err == nil {
456 } else if os.IsNotExist(err) {
460 log.Println("DeleteHandler:", err)
466 if result.Deleted == 0 && result.Failed == 0 {
467 st = http.StatusNotFound
474 if st == http.StatusOK {
475 if body, err := json.Marshal(result); err == nil {
478 log.Printf("json.Marshal: %s (result = %v)", err, result)
479 http.Error(resp, err.Error(), 500)
484 /* PullHandler processes "PUT /pull" requests for the data manager.
485 The request body is a JSON message containing a list of pull
486 requests in the following format:
490 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
492 "keep0.qr1hi.arvadosapi.com:25107",
493 "keep1.qr1hi.arvadosapi.com:25108"
497 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
507 Each pull request in the list consists of a block locator string
508 and an ordered list of servers. Keepstore should try to fetch the
509 block from each server in turn.
511 If the request has not been sent by the Data Manager, return 401
514 If the JSON unmarshalling fails, return 400 Bad Request.
517 // PullRequest consists of a block locator and an ordered list of servers
518 type PullRequest struct {
519 Locator string `json:"locator"`
520 Servers []string `json:"servers"`
522 // Destination mount, or "" for "anywhere"
523 MountUUID string `json:"mount_uuid"`
526 // PullHandler processes "PUT /pull" requests for the data manager.
527 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
528 // Reject unauthorized requests.
529 if !rtr.isSystemAuth(GetAPIToken(req)) {
530 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
534 // Parse the request body.
536 r := json.NewDecoder(req.Body)
537 if err := r.Decode(&pr); err != nil {
538 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
542 // We have a properly formatted pull list sent from the data
543 // manager. Report success and send the list to the pull list
544 // manager for further handling.
545 resp.WriteHeader(http.StatusOK)
547 fmt.Sprintf("Received %d pull requests\n", len(pr))))
550 for _, p := range pr {
553 rtr.pullq.ReplaceQueue(plist)
556 // TrashRequest consists of a block locator and its Mtime
557 type TrashRequest struct {
558 Locator string `json:"locator"`
559 BlockMtime int64 `json:"block_mtime"`
561 // Target mount, or "" for "everywhere"
562 MountUUID string `json:"mount_uuid"`
565 // TrashHandler processes /trash requests.
566 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
567 // Reject unauthorized requests.
568 if !rtr.isSystemAuth(GetAPIToken(req)) {
569 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
573 // Parse the request body.
574 var trash []TrashRequest
575 r := json.NewDecoder(req.Body)
576 if err := r.Decode(&trash); err != nil {
577 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
581 // We have a properly formatted trash list sent from the data
582 // manager. Report success and send the list to the trash work
583 // queue for further handling.
584 resp.WriteHeader(http.StatusOK)
586 fmt.Sprintf("Received %d trash requests\n", len(trash))))
589 for _, t := range trash {
592 rtr.trashq.ReplaceQueue(tlist)
595 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
596 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
597 // Reject unauthorized requests.
598 if !rtr.isSystemAuth(GetAPIToken(req)) {
599 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
603 hash := mux.Vars(req)["hash"]
605 if len(rtr.volmgr.AllWritable()) == 0 {
606 http.Error(resp, "No writable volumes", http.StatusNotFound)
610 var untrashedOn, failedOn []string
612 for _, vol := range rtr.volmgr.AllWritable() {
613 err := vol.Untrash(hash)
615 if os.IsNotExist(err) {
617 } else if err != nil {
618 log.Printf("Error untrashing %v on volume %v", hash, vol.String())
619 failedOn = append(failedOn, vol.String())
621 log.Printf("Untrashed %v on volume %v", hash, vol.String())
622 untrashedOn = append(untrashedOn, vol.String())
626 if numNotFound == len(rtr.volmgr.AllWritable()) {
627 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
631 if len(failedOn) == len(rtr.volmgr.AllWritable()) {
632 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
634 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
635 if len(failedOn) > 0 {
636 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
638 resp.Write([]byte(respBody))
642 // GetBlock and PutBlock implement lower-level code for handling
643 // blocks by rooting through volumes connected to the local machine.
644 // Once the handler has determined that system policy permits the
645 // request, it calls these methods to perform the actual operation.
647 // TODO(twp): this code would probably be better located in the
648 // VolumeManager interface. As an abstraction, the VolumeManager
649 // should be the only part of the code that cares about which volume a
650 // block is stored on, so it should be responsible for figuring out
651 // which volume to check for fetching blocks, storing blocks, etc.
653 // GetBlock fetches the block identified by "hash" into the provided
654 // buf, and returns the data size.
656 // If the block cannot be found on any volume, returns NotFoundError.
658 // If the block found does not have the correct MD5 hash, returns
661 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
662 // Attempt to read the requested hash from a keep volume.
663 errorToCaller := NotFoundError
665 for _, vol := range volmgr.AllReadable() {
666 size, err := vol.Get(ctx, hash, buf)
669 return 0, ErrClientDisconnect
673 // IsNotExist is an expected error and may be
674 // ignored. All other errors are logged. In
675 // any case we continue trying to read other
676 // volumes. If all volumes report IsNotExist,
677 // we return a NotFoundError.
678 if !os.IsNotExist(err) {
679 log.Printf("%s: Get(%s): %s", vol, hash, err)
681 // If some volume returns a transient error, return it to the caller
682 // instead of "Not found" so it can retry.
683 if err == VolumeBusyError {
684 errorToCaller = err.(*KeepError)
688 // Check the file checksum.
690 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
691 if filehash != hash {
692 // TODO: Try harder to tell a sysadmin about
694 log.Printf("%s: checksum mismatch for request %s (actual %s)",
696 errorToCaller = DiskHashError
699 if errorToCaller == DiskHashError {
700 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
705 return 0, errorToCaller
708 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
710 // PutBlock(ctx, block, hash)
711 // Stores the BLOCK (identified by the content id HASH) in Keep.
713 // The MD5 checksum of the block must be identical to the content id HASH.
714 // If not, an error is returned.
716 // PutBlock stores the BLOCK on the first Keep volume with free space.
717 // A failure code is returned to the user only if all volumes fail.
719 // On success, PutBlock returns nil.
720 // On failure, it returns a KeepError with one of the following codes:
723 // A different block with the same hash already exists on this
726 // The MD5 hash of the BLOCK does not match the argument HASH.
728 // There was not enough space left in any Keep volume to store
731 // The object could not be stored for some other reason (e.g.
732 // all writes failed). The text of the error message should
733 // provide as much detail as possible.
735 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
736 // Check that BLOCK's checksum matches HASH.
737 blockhash := fmt.Sprintf("%x", md5.Sum(block))
738 if blockhash != hash {
739 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
740 return 0, RequestHashError
743 // If we already have this data, it's intact on disk, and we
744 // can update its timestamp, return success. If we have
745 // different data with the same hash, return failure.
746 if n, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
748 } else if ctx.Err() != nil {
749 return 0, ErrClientDisconnect
752 // Choose a Keep volume to write to.
753 // If this volume fails, try all of the volumes in order.
754 if mnt := volmgr.NextWritable(); mnt != nil {
755 if err := mnt.Put(ctx, hash, block); err == nil {
756 return mnt.Replication, nil // success!
758 if ctx.Err() != nil {
759 return 0, ErrClientDisconnect
763 writables := volmgr.AllWritable()
764 if len(writables) == 0 {
765 log.Print("No writable volumes.")
770 for _, vol := range writables {
771 err := vol.Put(ctx, hash, block)
772 if ctx.Err() != nil {
773 return 0, ErrClientDisconnect
776 return vol.Replication, nil // success!
778 if err != FullError {
779 // The volume is not full but the
780 // write did not succeed. Report the
781 // error and continue trying.
783 log.Printf("%s: Write(%s): %s", vol, hash, err)
788 log.Print("All volumes are full.")
791 // Already logged the non-full errors.
792 return 0, GenericError
795 // CompareAndTouch returns the current replication level if one of the
796 // volumes already has the given content and it successfully updates
797 // the relevant block's modification time in order to protect it from
798 // premature garbage collection. Otherwise, it returns a non-nil
800 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
801 var bestErr error = NotFoundError
802 for _, mnt := range volmgr.AllWritable() {
803 err := mnt.Compare(ctx, hash, buf)
804 if ctx.Err() != nil {
806 } else if err == CollisionError {
807 // Stop if we have a block with same hash but
808 // different content. (It will be impossible
809 // to tell which one is wanted if we have
810 // both, so there's no point writing it even
811 // on a different volume.)
812 log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
814 } else if os.IsNotExist(err) {
815 // Block does not exist. This is the only
816 // "normal" error: we don't log anything.
818 } else if err != nil {
819 // Couldn't open file, data is corrupt on
820 // disk, etc.: log this abnormal condition,
821 // and try the next volume.
822 log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
825 if err := mnt.Touch(hash); err != nil {
826 log.Printf("%s: Touch %s failed: %s", mnt.Volume, hash, err)
830 // Compare and Touch both worked --> done.
831 return mnt.Replication, nil
836 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
838 // IsValidLocator returns true if the specified string is a valid Keep locator.
839 // When Keep is extended to support hash types other than MD5,
840 // this should be updated to cover those as well.
842 func IsValidLocator(loc string) bool {
843 return validLocatorRe.MatchString(loc)
846 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
848 // GetAPIToken returns the OAuth2 token from the Authorization
849 // header of a HTTP request, or an empty string if no matching
851 func GetAPIToken(req *http.Request) string {
852 if auth, ok := req.Header["Authorization"]; ok {
853 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
860 // IsExpired returns true if the given Unix timestamp (expressed as a
861 // hexadecimal string) is in the past, or if timestampHex cannot be
862 // parsed as a hexadecimal string.
863 func IsExpired(timestampHex string) bool {
864 ts, err := strconv.ParseInt(timestampHex, 16, 0)
866 log.Printf("IsExpired: %s", err)
869 return time.Unix(ts, 0).Before(time.Now())
872 // canDelete returns true if the user identified by apiToken is
873 // allowed to delete blocks.
874 func (rtr *router) canDelete(apiToken string) bool {
878 // Blocks may be deleted only when Keep has been configured with a
880 if rtr.isSystemAuth(apiToken) {
883 // TODO(twp): look up apiToken with the API server
884 // return true if is_admin is true and if the token
885 // has unlimited scope
889 // isSystemAuth returns true if the given token is allowed to perform
890 // system level actions like deleting data.
891 func (rtr *router) isSystemAuth(token string) bool {
892 return token != "" && token == rtr.cluster.SystemRootToken