1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "git.arvados.org/arvados.git/sdk/go/ctxlog"
25 "git.arvados.org/arvados.git/sdk/go/health"
26 "git.arvados.org/arvados.git/sdk/go/httpserver"
27 "github.com/gorilla/mux"
28 "github.com/prometheus/client_golang/prometheus"
29 "github.com/sirupsen/logrus"
34 cluster *arvados.Cluster
35 logger logrus.FieldLogger
36 remoteProxy remoteProxy
38 volmgr *RRVolumeManager
43 // MakeRESTRouter returns a new router that forwards all Keep requests
44 // to the appropriate handlers.
45 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
47 Router: mux.NewRouter(),
49 logger: ctxlog.FromContext(ctx),
50 metrics: &nodeMetrics{reg: reg},
57 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
59 `/{hash:[0-9a-f]{32}}+{hints}`,
60 rtr.handleGET).Methods("GET", "HEAD")
62 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
63 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
64 // List all blocks stored here. Privileged client only.
65 rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
66 // List blocks stored here whose hash has the given prefix.
67 // Privileged client only.
68 rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
69 // Update timestamp on existing block. Privileged client only.
70 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
72 // Internals/debugging info (runtime.MemStats)
73 rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
75 // List volumes: path, device number, bytes used/avail.
76 rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
78 // List mounts: UUID, readonly, tier, device ID, ...
79 rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
80 rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
81 rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
83 // Replace the current pull queue.
84 rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
86 // Replace the current trash queue.
87 rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
89 // Untrash moves blocks from trash back into store
90 rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
92 rtr.Handle("/_health/{check}", &health.Handler{
93 Token: cluster.ManagementToken,
97 // Any request which does not match any of these routes gets
99 rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
101 rtr.metrics.setupBufferPoolMetrics(bufs)
102 rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
103 rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
108 // BadRequestHandler is a HandleFunc to address bad requests.
109 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
110 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
113 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
114 ctx, cancel := contextForResponse(context.TODO(), resp)
117 locator := req.URL.Path[1:]
118 if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
119 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
123 if rtr.cluster.Collections.BlobSigning {
124 locator := req.URL.Path[1:] // strip leading slash
125 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
126 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
131 // TODO: Probe volumes to check whether the block _might_
132 // exist. Some volumes/types could support a quick existence
133 // check without causing other operations to suffer. If all
134 // volumes support that, and assure us the block definitely
135 // isn't here, we can return 404 now instead of waiting for a
138 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
140 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
145 size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
147 code := http.StatusInternalServerError
148 if err, ok := err.(*KeepError); ok {
151 http.Error(resp, err.Error(), code)
155 resp.Header().Set("Content-Length", strconv.Itoa(size))
156 resp.Header().Set("Content-Type", "application/octet-stream")
157 resp.Write(buf[:size])
160 // Return a new context that gets cancelled by resp's CloseNotifier.
161 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
162 ctx, cancel := context.WithCancel(parent)
163 if cn, ok := resp.(http.CloseNotifier); ok {
164 go func(c <-chan bool) {
175 // Get a buffer from the pool -- but give up and return a non-nil
176 // error if ctx ends before we get a buffer.
177 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
178 bufReady := make(chan []byte)
180 bufReady <- bufs.Get(bufSize)
183 case buf := <-bufReady:
187 // Even if closeNotifier happened first, we
188 // need to keep waiting for our buf so we can
189 // return it to the pool.
192 return nil, ErrClientDisconnect
196 func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
197 if !rtr.isSystemAuth(GetAPIToken(req)) {
198 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
201 hash := mux.Vars(req)["hash"]
202 vols := rtr.volmgr.AllWritable()
204 http.Error(resp, "no volumes", http.StatusNotFound)
208 for _, mnt := range vols {
209 err = mnt.Touch(hash)
217 case os.IsNotExist(err):
218 http.Error(resp, err.Error(), http.StatusNotFound)
220 http.Error(resp, err.Error(), http.StatusInternalServerError)
224 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
225 ctx, cancel := contextForResponse(context.TODO(), resp)
228 hash := mux.Vars(req)["hash"]
230 // Detect as many error conditions as possible before reading
231 // the body: avoid transmitting data that will not end up
232 // being written anyway.
234 if req.ContentLength == -1 {
235 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
239 if req.ContentLength > BlockSize {
240 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
244 if len(rtr.volmgr.AllWritable()) == 0 {
245 http.Error(resp, FullError.Error(), FullError.HTTPCode)
249 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
251 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
255 _, err = io.ReadFull(req.Body, buf)
257 http.Error(resp, err.Error(), 500)
262 replication, err := PutBlock(ctx, rtr.volmgr, buf, hash)
266 code := http.StatusInternalServerError
267 if err, ok := err.(*KeepError); ok {
270 http.Error(resp, err.Error(), code)
274 // Success; add a size hint, sign the locator if possible, and
275 // return it to the client.
276 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
277 apiToken := GetAPIToken(req)
278 if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
279 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
280 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
282 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
283 resp.Write([]byte(returnHash + "\n"))
286 // IndexHandler responds to "/index", "/index/{prefix}", and
287 // "/mounts/{uuid}/blocks" requests.
288 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
289 if !rtr.isSystemAuth(GetAPIToken(req)) {
290 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
294 prefix := mux.Vars(req)["prefix"]
297 prefix = req.Form.Get("prefix")
300 uuid := mux.Vars(req)["uuid"]
302 var vols []*VolumeMount
304 vols = rtr.volmgr.AllReadable()
305 } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
306 http.Error(resp, "mount not found", http.StatusNotFound)
309 vols = []*VolumeMount{mnt}
312 for _, v := range vols {
313 if err := v.IndexTo(prefix, resp); err != nil {
314 // We can't send an error status/message to
315 // the client because IndexTo() might have
316 // already written body content. All we can do
317 // is log the error in our own logs.
319 // The client must notice the lack of trailing
320 // newline as an indication that the response
322 ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
326 // An empty line at EOF is the only way the client can be
327 // assured the entire index was received.
328 resp.Write([]byte{'\n'})
331 // MountsHandler responds to "GET /mounts" requests.
332 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
333 err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
335 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
340 type PoolStatus struct {
341 Alloc uint64 `json:"BytesAllocatedCumulative"`
342 Cap int `json:"BuffersMax"`
343 Len int `json:"BuffersInUse"`
346 type volumeStatusEnt struct {
348 Status *VolumeStatus `json:",omitempty"`
349 VolumeStats *ioStats `json:",omitempty"`
350 InternalStats interface{} `json:",omitempty"`
354 type NodeStatus struct {
355 Volumes []*volumeStatusEnt
356 BufferPool PoolStatus
357 PullQueue WorkQueueStatus
358 TrashQueue WorkQueueStatus
365 var stLock sync.Mutex
367 // DebugHandler addresses /debug.json requests.
368 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
369 type debugStats struct {
370 MemStats runtime.MemStats
373 runtime.ReadMemStats(&ds.MemStats)
374 data, err := json.Marshal(&ds)
376 http.Error(resp, err.Error(), http.StatusInternalServerError)
382 // StatusHandler addresses /status.json requests.
383 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
385 rtr.readNodeStatus(&st)
386 data, err := json.Marshal(&st)
389 http.Error(resp, err.Error(), http.StatusInternalServerError)
395 // populate the given NodeStatus struct with current values.
396 func (rtr *router) readNodeStatus(st *NodeStatus) {
398 vols := rtr.volmgr.AllReadable()
399 if cap(st.Volumes) < len(vols) {
400 st.Volumes = make([]*volumeStatusEnt, len(vols))
402 st.Volumes = st.Volumes[:0]
403 for _, vol := range vols {
404 var internalStats interface{}
405 if vol, ok := vol.Volume.(InternalStatser); ok {
406 internalStats = vol.InternalStats()
408 st.Volumes = append(st.Volumes, &volumeStatusEnt{
410 Status: vol.Status(),
411 InternalStats: internalStats,
412 //VolumeStats: rtr.volmgr.VolumeStats(vol),
415 st.BufferPool.Alloc = bufs.Alloc()
416 st.BufferPool.Cap = bufs.Cap()
417 st.BufferPool.Len = bufs.Len()
418 st.PullQueue = getWorkQueueStatus(rtr.pullq)
419 st.TrashQueue = getWorkQueueStatus(rtr.trashq)
422 // return a WorkQueueStatus for the given queue. If q is nil (which
423 // should never happen except in test suites), return a zero status
424 // value instead of crashing.
425 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
427 // This should only happen during tests.
428 return WorkQueueStatus{}
433 // handleDELETE processes DELETE requests.
435 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
436 // from all connected volumes.
438 // Only the Data Manager, or an Arvados admin with scope "all", are
439 // allowed to issue DELETE requests. If a DELETE request is not
440 // authenticated or is issued by a non-admin user, the server returns
441 // a PermissionError.
443 // Upon receiving a valid request from an authorized user,
444 // handleDELETE deletes all copies of the specified block on local
449 // If the requested blocks was not found on any volume, the response
450 // code is HTTP 404 Not Found.
452 // Otherwise, the response code is 200 OK, with a response body
453 // consisting of the JSON message
455 // {"copies_deleted":d,"copies_failed":f}
457 // where d and f are integers representing the number of blocks that
458 // were successfully and unsuccessfully deleted.
460 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
461 hash := mux.Vars(req)["hash"]
463 // Confirm that this user is an admin and has a token with unlimited scope.
464 var tok = GetAPIToken(req)
465 if tok == "" || !rtr.canDelete(tok) {
466 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
470 if !rtr.cluster.Collections.BlobTrash {
471 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
475 // Delete copies of this block from all available volumes.
476 // Report how many blocks were successfully deleted, and how
477 // many were found on writable volumes but not deleted.
479 Deleted int `json:"copies_deleted"`
480 Failed int `json:"copies_failed"`
482 for _, vol := range rtr.volmgr.AllWritable() {
483 if err := vol.Trash(hash); err == nil {
485 } else if os.IsNotExist(err) {
489 ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
492 if result.Deleted == 0 && result.Failed == 0 {
493 resp.WriteHeader(http.StatusNotFound)
496 body, err := json.Marshal(result)
498 http.Error(resp, err.Error(), http.StatusInternalServerError)
504 /* PullHandler processes "PUT /pull" requests for the data manager.
505 The request body is a JSON message containing a list of pull
506 requests in the following format:
510 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
512 "keep0.qr1hi.arvadosapi.com:25107",
513 "keep1.qr1hi.arvadosapi.com:25108"
517 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
527 Each pull request in the list consists of a block locator string
528 and an ordered list of servers. Keepstore should try to fetch the
529 block from each server in turn.
531 If the request has not been sent by the Data Manager, return 401
534 If the JSON unmarshalling fails, return 400 Bad Request.
537 // PullRequest consists of a block locator and an ordered list of servers
538 type PullRequest struct {
539 Locator string `json:"locator"`
540 Servers []string `json:"servers"`
542 // Destination mount, or "" for "anywhere"
543 MountUUID string `json:"mount_uuid"`
546 // PullHandler processes "PUT /pull" requests for the data manager.
547 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
548 // Reject unauthorized requests.
549 if !rtr.isSystemAuth(GetAPIToken(req)) {
550 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
554 // Parse the request body.
556 r := json.NewDecoder(req.Body)
557 if err := r.Decode(&pr); err != nil {
558 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
562 // We have a properly formatted pull list sent from the data
563 // manager. Report success and send the list to the pull list
564 // manager for further handling.
565 resp.WriteHeader(http.StatusOK)
567 fmt.Sprintf("Received %d pull requests\n", len(pr))))
570 for _, p := range pr {
573 rtr.pullq.ReplaceQueue(plist)
576 // TrashRequest consists of a block locator and its Mtime
577 type TrashRequest struct {
578 Locator string `json:"locator"`
579 BlockMtime int64 `json:"block_mtime"`
581 // Target mount, or "" for "everywhere"
582 MountUUID string `json:"mount_uuid"`
585 // TrashHandler processes /trash requests.
586 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
587 // Reject unauthorized requests.
588 if !rtr.isSystemAuth(GetAPIToken(req)) {
589 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
593 // Parse the request body.
594 var trash []TrashRequest
595 r := json.NewDecoder(req.Body)
596 if err := r.Decode(&trash); err != nil {
597 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
601 // We have a properly formatted trash list sent from the data
602 // manager. Report success and send the list to the trash work
603 // queue for further handling.
604 resp.WriteHeader(http.StatusOK)
606 fmt.Sprintf("Received %d trash requests\n", len(trash))))
609 for _, t := range trash {
612 rtr.trashq.ReplaceQueue(tlist)
615 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
616 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
617 // Reject unauthorized requests.
618 if !rtr.isSystemAuth(GetAPIToken(req)) {
619 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
623 log := ctxlog.FromContext(req.Context())
624 hash := mux.Vars(req)["hash"]
626 if len(rtr.volmgr.AllWritable()) == 0 {
627 http.Error(resp, "No writable volumes", http.StatusNotFound)
631 var untrashedOn, failedOn []string
633 for _, vol := range rtr.volmgr.AllWritable() {
634 err := vol.Untrash(hash)
636 if os.IsNotExist(err) {
638 } else if err != nil {
639 log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
640 failedOn = append(failedOn, vol.String())
642 log.Infof("Untrashed %v on volume %v", hash, vol.String())
643 untrashedOn = append(untrashedOn, vol.String())
647 if numNotFound == len(rtr.volmgr.AllWritable()) {
648 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
649 } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
650 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
652 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
653 if len(failedOn) > 0 {
654 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
655 http.Error(resp, respBody, http.StatusInternalServerError)
657 fmt.Fprintln(resp, respBody)
662 // GetBlock and PutBlock implement lower-level code for handling
663 // blocks by rooting through volumes connected to the local machine.
664 // Once the handler has determined that system policy permits the
665 // request, it calls these methods to perform the actual operation.
667 // TODO(twp): this code would probably be better located in the
668 // VolumeManager interface. As an abstraction, the VolumeManager
669 // should be the only part of the code that cares about which volume a
670 // block is stored on, so it should be responsible for figuring out
671 // which volume to check for fetching blocks, storing blocks, etc.
673 // GetBlock fetches the block identified by "hash" into the provided
674 // buf, and returns the data size.
676 // If the block cannot be found on any volume, returns NotFoundError.
678 // If the block found does not have the correct MD5 hash, returns
681 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
682 log := ctxlog.FromContext(ctx)
684 // Attempt to read the requested hash from a keep volume.
685 errorToCaller := NotFoundError
687 for _, vol := range volmgr.AllReadable() {
688 size, err := vol.Get(ctx, hash, buf)
691 return 0, ErrClientDisconnect
695 // IsNotExist is an expected error and may be
696 // ignored. All other errors are logged. In
697 // any case we continue trying to read other
698 // volumes. If all volumes report IsNotExist,
699 // we return a NotFoundError.
700 if !os.IsNotExist(err) {
701 log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
703 // If some volume returns a transient error, return it to the caller
704 // instead of "Not found" so it can retry.
705 if err == VolumeBusyError {
706 errorToCaller = err.(*KeepError)
710 // Check the file checksum.
711 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
712 if filehash != hash {
713 // TODO: Try harder to tell a sysadmin about
715 log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
716 errorToCaller = DiskHashError
719 if errorToCaller == DiskHashError {
720 log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
724 return 0, errorToCaller
727 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
729 // PutBlock(ctx, block, hash)
730 // Stores the BLOCK (identified by the content id HASH) in Keep.
732 // The MD5 checksum of the block must be identical to the content id HASH.
733 // If not, an error is returned.
735 // PutBlock stores the BLOCK on the first Keep volume with free space.
736 // A failure code is returned to the user only if all volumes fail.
738 // On success, PutBlock returns nil.
739 // On failure, it returns a KeepError with one of the following codes:
742 // A different block with the same hash already exists on this
745 // The MD5 hash of the BLOCK does not match the argument HASH.
747 // There was not enough space left in any Keep volume to store
750 // The object could not be stored for some other reason (e.g.
751 // all writes failed). The text of the error message should
752 // provide as much detail as possible.
754 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
755 log := ctxlog.FromContext(ctx)
757 // Check that BLOCK's checksum matches HASH.
758 blockhash := fmt.Sprintf("%x", md5.Sum(block))
759 if blockhash != hash {
760 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
761 return 0, RequestHashError
764 // If we already have this data, it's intact on disk, and we
765 // can update its timestamp, return success. If we have
766 // different data with the same hash, return failure.
767 if n, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
769 } else if ctx.Err() != nil {
770 return 0, ErrClientDisconnect
773 // Choose a Keep volume to write to.
774 // If this volume fails, try all of the volumes in order.
775 if mnt := volmgr.NextWritable(); mnt != nil {
776 if err := mnt.Put(ctx, hash, block); err != nil {
777 log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
779 return mnt.Replication, nil // success!
782 if ctx.Err() != nil {
783 return 0, ErrClientDisconnect
786 writables := volmgr.AllWritable()
787 if len(writables) == 0 {
788 log.Error("no writable volumes")
793 for _, vol := range writables {
794 err := vol.Put(ctx, hash, block)
795 if ctx.Err() != nil {
796 return 0, ErrClientDisconnect
800 return vol.Replication, nil // success!
804 // The volume is not full but the
805 // write did not succeed. Report the
806 // error and continue trying.
808 log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash)
813 log.Error("all volumes are full")
816 // Already logged the non-full errors.
817 return 0, GenericError
820 // CompareAndTouch returns the current replication level if one of the
821 // volumes already has the given content and it successfully updates
822 // the relevant block's modification time in order to protect it from
823 // premature garbage collection. Otherwise, it returns a non-nil
825 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
826 log := ctxlog.FromContext(ctx)
827 var bestErr error = NotFoundError
828 for _, mnt := range volmgr.AllWritable() {
829 err := mnt.Compare(ctx, hash, buf)
830 if ctx.Err() != nil {
832 } else if err == CollisionError {
833 // Stop if we have a block with same hash but
834 // different content. (It will be impossible
835 // to tell which one is wanted if we have
836 // both, so there's no point writing it even
837 // on a different volume.)
838 log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
840 } else if os.IsNotExist(err) {
841 // Block does not exist. This is the only
842 // "normal" error: we don't log anything.
844 } else if err != nil {
845 // Couldn't open file, data is corrupt on
846 // disk, etc.: log this abnormal condition,
847 // and try the next volume.
848 log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
851 if err := mnt.Touch(hash); err != nil {
852 log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
856 // Compare and Touch both worked --> done.
857 return mnt.Replication, nil
862 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
864 // IsValidLocator returns true if the specified string is a valid Keep locator.
865 // When Keep is extended to support hash types other than MD5,
866 // this should be updated to cover those as well.
868 func IsValidLocator(loc string) bool {
869 return validLocatorRe.MatchString(loc)
872 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
874 // GetAPIToken returns the OAuth2 token from the Authorization
875 // header of a HTTP request, or an empty string if no matching
877 func GetAPIToken(req *http.Request) string {
878 if auth, ok := req.Header["Authorization"]; ok {
879 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
886 // canDelete returns true if the user identified by apiToken is
887 // allowed to delete blocks.
888 func (rtr *router) canDelete(apiToken string) bool {
892 // Blocks may be deleted only when Keep has been configured with a
894 if rtr.isSystemAuth(apiToken) {
897 // TODO(twp): look up apiToken with the API server
898 // return true if is_admin is true and if the token
899 // has unlimited scope
903 // isSystemAuth returns true if the given token is allowed to perform
904 // system level actions like deleting data.
905 func (rtr *router) isSystemAuth(token string) bool {
906 return token != "" && token == rtr.cluster.SystemRootToken