1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "git.arvados.org/arvados.git/sdk/go/ctxlog"
25 "git.arvados.org/arvados.git/sdk/go/health"
26 "git.arvados.org/arvados.git/sdk/go/httpserver"
27 "github.com/gorilla/mux"
28 "github.com/prometheus/client_golang/prometheus"
29 "github.com/sirupsen/logrus"
34 cluster *arvados.Cluster
35 logger logrus.FieldLogger
36 remoteProxy remoteProxy
38 volmgr *RRVolumeManager
43 // MakeRESTRouter returns a new router that forwards all Keep requests
44 // to the appropriate handlers.
45 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
47 Router: mux.NewRouter(),
49 logger: ctxlog.FromContext(ctx),
50 metrics: &nodeMetrics{reg: reg},
57 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
59 `/{hash:[0-9a-f]{32}}+{hints}`,
60 rtr.handleGET).Methods("GET", "HEAD")
62 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
63 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
64 // List all blocks stored here. Privileged client only.
65 rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
66 // List blocks stored here whose hash has the given prefix.
67 // Privileged client only.
68 rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
70 // Internals/debugging info (runtime.MemStats)
71 rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
73 // List volumes: path, device number, bytes used/avail.
74 rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
76 // List mounts: UUID, readonly, tier, device ID, ...
77 rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
78 rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
79 rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
81 // Replace the current pull queue.
82 rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
84 // Replace the current trash queue.
85 rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
87 // Untrash moves blocks from trash back into store
88 rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
90 rtr.Handle("/_health/{check}", &health.Handler{
91 Token: cluster.ManagementToken,
95 // Any request which does not match any of these routes gets
97 rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
99 rtr.metrics.setupBufferPoolMetrics(bufs)
100 rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
101 rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
106 // BadRequestHandler is a HandleFunc to address bad requests.
107 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
108 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
111 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
112 ctx, cancel := contextForResponse(context.TODO(), resp)
115 locator := req.URL.Path[1:]
116 if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
117 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
121 if rtr.cluster.Collections.BlobSigning {
122 locator := req.URL.Path[1:] // strip leading slash
123 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
124 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
129 // TODO: Probe volumes to check whether the block _might_
130 // exist. Some volumes/types could support a quick existence
131 // check without causing other operations to suffer. If all
132 // volumes support that, and assure us the block definitely
133 // isn't here, we can return 404 now instead of waiting for a
136 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
138 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
143 size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
145 code := http.StatusInternalServerError
146 if err, ok := err.(*KeepError); ok {
149 http.Error(resp, err.Error(), code)
153 resp.Header().Set("Content-Length", strconv.Itoa(size))
154 resp.Header().Set("Content-Type", "application/octet-stream")
155 resp.Write(buf[:size])
158 // Return a new context that gets cancelled by resp's CloseNotifier.
159 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
160 ctx, cancel := context.WithCancel(parent)
161 if cn, ok := resp.(http.CloseNotifier); ok {
162 go func(c <-chan bool) {
173 // Get a buffer from the pool -- but give up and return a non-nil
174 // error if ctx ends before we get a buffer.
175 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
176 bufReady := make(chan []byte)
178 bufReady <- bufs.Get(bufSize)
181 case buf := <-bufReady:
185 // Even if closeNotifier happened first, we
186 // need to keep waiting for our buf so we can
187 // return it to the pool.
190 return nil, ErrClientDisconnect
194 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
195 ctx, cancel := contextForResponse(context.TODO(), resp)
198 hash := mux.Vars(req)["hash"]
200 // Detect as many error conditions as possible before reading
201 // the body: avoid transmitting data that will not end up
202 // being written anyway.
204 if req.ContentLength == -1 {
205 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
209 if req.ContentLength > BlockSize {
210 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
214 if len(rtr.volmgr.AllWritable()) == 0 {
215 http.Error(resp, FullError.Error(), FullError.HTTPCode)
219 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
221 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
225 _, err = io.ReadFull(req.Body, buf)
227 http.Error(resp, err.Error(), 500)
232 replication, err := PutBlock(ctx, rtr.volmgr, buf, hash)
236 code := http.StatusInternalServerError
237 if err, ok := err.(*KeepError); ok {
240 http.Error(resp, err.Error(), code)
244 // Success; add a size hint, sign the locator if possible, and
245 // return it to the client.
246 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
247 apiToken := GetAPIToken(req)
248 if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
249 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
250 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
252 resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
253 resp.Write([]byte(returnHash + "\n"))
256 // IndexHandler responds to "/index", "/index/{prefix}", and
257 // "/mounts/{uuid}/blocks" requests.
258 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
259 if !rtr.isSystemAuth(GetAPIToken(req)) {
260 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
264 prefix := mux.Vars(req)["prefix"]
267 prefix = req.Form.Get("prefix")
270 uuid := mux.Vars(req)["uuid"]
272 var vols []*VolumeMount
274 vols = rtr.volmgr.AllReadable()
275 } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
276 http.Error(resp, "mount not found", http.StatusNotFound)
279 vols = []*VolumeMount{mnt}
282 for _, v := range vols {
283 if err := v.IndexTo(prefix, resp); err != nil {
284 // We can't send an error status/message to
285 // the client because IndexTo() might have
286 // already written body content. All we can do
287 // is log the error in our own logs.
289 // The client must notice the lack of trailing
290 // newline as an indication that the response
292 ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
296 // An empty line at EOF is the only way the client can be
297 // assured the entire index was received.
298 resp.Write([]byte{'\n'})
301 // MountsHandler responds to "GET /mounts" requests.
302 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
303 err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
305 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
310 type PoolStatus struct {
311 Alloc uint64 `json:"BytesAllocatedCumulative"`
312 Cap int `json:"BuffersMax"`
313 Len int `json:"BuffersInUse"`
316 type volumeStatusEnt struct {
318 Status *VolumeStatus `json:",omitempty"`
319 VolumeStats *ioStats `json:",omitempty"`
320 InternalStats interface{} `json:",omitempty"`
324 type NodeStatus struct {
325 Volumes []*volumeStatusEnt
326 BufferPool PoolStatus
327 PullQueue WorkQueueStatus
328 TrashQueue WorkQueueStatus
335 var stLock sync.Mutex
337 // DebugHandler addresses /debug.json requests.
338 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
339 type debugStats struct {
340 MemStats runtime.MemStats
343 runtime.ReadMemStats(&ds.MemStats)
344 data, err := json.Marshal(&ds)
346 http.Error(resp, err.Error(), http.StatusInternalServerError)
352 // StatusHandler addresses /status.json requests.
353 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
355 rtr.readNodeStatus(&st)
356 data, err := json.Marshal(&st)
359 http.Error(resp, err.Error(), http.StatusInternalServerError)
365 // populate the given NodeStatus struct with current values.
366 func (rtr *router) readNodeStatus(st *NodeStatus) {
368 vols := rtr.volmgr.AllReadable()
369 if cap(st.Volumes) < len(vols) {
370 st.Volumes = make([]*volumeStatusEnt, len(vols))
372 st.Volumes = st.Volumes[:0]
373 for _, vol := range vols {
374 var internalStats interface{}
375 if vol, ok := vol.Volume.(InternalStatser); ok {
376 internalStats = vol.InternalStats()
378 st.Volumes = append(st.Volumes, &volumeStatusEnt{
380 Status: vol.Status(),
381 InternalStats: internalStats,
382 //VolumeStats: rtr.volmgr.VolumeStats(vol),
385 st.BufferPool.Alloc = bufs.Alloc()
386 st.BufferPool.Cap = bufs.Cap()
387 st.BufferPool.Len = bufs.Len()
388 st.PullQueue = getWorkQueueStatus(rtr.pullq)
389 st.TrashQueue = getWorkQueueStatus(rtr.trashq)
392 // return a WorkQueueStatus for the given queue. If q is nil (which
393 // should never happen except in test suites), return a zero status
394 // value instead of crashing.
395 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
397 // This should only happen during tests.
398 return WorkQueueStatus{}
403 // handleDELETE processes DELETE requests.
405 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
406 // from all connected volumes.
408 // Only the Data Manager, or an Arvados admin with scope "all", are
409 // allowed to issue DELETE requests. If a DELETE request is not
410 // authenticated or is issued by a non-admin user, the server returns
411 // a PermissionError.
413 // Upon receiving a valid request from an authorized user,
414 // handleDELETE deletes all copies of the specified block on local
419 // If the requested blocks was not found on any volume, the response
420 // code is HTTP 404 Not Found.
422 // Otherwise, the response code is 200 OK, with a response body
423 // consisting of the JSON message
425 // {"copies_deleted":d,"copies_failed":f}
427 // where d and f are integers representing the number of blocks that
428 // were successfully and unsuccessfully deleted.
430 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
431 hash := mux.Vars(req)["hash"]
433 // Confirm that this user is an admin and has a token with unlimited scope.
434 var tok = GetAPIToken(req)
435 if tok == "" || !rtr.canDelete(tok) {
436 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
440 if !rtr.cluster.Collections.BlobTrash {
441 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
445 // Delete copies of this block from all available volumes.
446 // Report how many blocks were successfully deleted, and how
447 // many were found on writable volumes but not deleted.
449 Deleted int `json:"copies_deleted"`
450 Failed int `json:"copies_failed"`
452 for _, vol := range rtr.volmgr.AllWritable() {
453 if err := vol.Trash(hash); err == nil {
455 } else if os.IsNotExist(err) {
459 ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
462 if result.Deleted == 0 && result.Failed == 0 {
463 resp.WriteHeader(http.StatusNotFound)
466 body, err := json.Marshal(result)
468 http.Error(resp, err.Error(), http.StatusInternalServerError)
474 /* PullHandler processes "PUT /pull" requests for the data manager.
475 The request body is a JSON message containing a list of pull
476 requests in the following format:
480 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
482 "keep0.qr1hi.arvadosapi.com:25107",
483 "keep1.qr1hi.arvadosapi.com:25108"
487 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
497 Each pull request in the list consists of a block locator string
498 and an ordered list of servers. Keepstore should try to fetch the
499 block from each server in turn.
501 If the request has not been sent by the Data Manager, return 401
504 If the JSON unmarshalling fails, return 400 Bad Request.
507 // PullRequest consists of a block locator and an ordered list of servers
508 type PullRequest struct {
509 Locator string `json:"locator"`
510 Servers []string `json:"servers"`
512 // Destination mount, or "" for "anywhere"
513 MountUUID string `json:"mount_uuid"`
516 // PullHandler processes "PUT /pull" requests for the data manager.
517 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
518 // Reject unauthorized requests.
519 if !rtr.isSystemAuth(GetAPIToken(req)) {
520 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
524 // Parse the request body.
526 r := json.NewDecoder(req.Body)
527 if err := r.Decode(&pr); err != nil {
528 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
532 // We have a properly formatted pull list sent from the data
533 // manager. Report success and send the list to the pull list
534 // manager for further handling.
535 resp.WriteHeader(http.StatusOK)
537 fmt.Sprintf("Received %d pull requests\n", len(pr))))
540 for _, p := range pr {
543 rtr.pullq.ReplaceQueue(plist)
546 // TrashRequest consists of a block locator and its Mtime
547 type TrashRequest struct {
548 Locator string `json:"locator"`
549 BlockMtime int64 `json:"block_mtime"`
551 // Target mount, or "" for "everywhere"
552 MountUUID string `json:"mount_uuid"`
555 // TrashHandler processes /trash requests.
556 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
557 // Reject unauthorized requests.
558 if !rtr.isSystemAuth(GetAPIToken(req)) {
559 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
563 // Parse the request body.
564 var trash []TrashRequest
565 r := json.NewDecoder(req.Body)
566 if err := r.Decode(&trash); err != nil {
567 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
571 // We have a properly formatted trash list sent from the data
572 // manager. Report success and send the list to the trash work
573 // queue for further handling.
574 resp.WriteHeader(http.StatusOK)
576 fmt.Sprintf("Received %d trash requests\n", len(trash))))
579 for _, t := range trash {
582 rtr.trashq.ReplaceQueue(tlist)
585 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
586 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
587 // Reject unauthorized requests.
588 if !rtr.isSystemAuth(GetAPIToken(req)) {
589 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
593 log := ctxlog.FromContext(req.Context())
594 hash := mux.Vars(req)["hash"]
596 if len(rtr.volmgr.AllWritable()) == 0 {
597 http.Error(resp, "No writable volumes", http.StatusNotFound)
601 var untrashedOn, failedOn []string
603 for _, vol := range rtr.volmgr.AllWritable() {
604 err := vol.Untrash(hash)
606 if os.IsNotExist(err) {
608 } else if err != nil {
609 log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
610 failedOn = append(failedOn, vol.String())
612 log.Infof("Untrashed %v on volume %v", hash, vol.String())
613 untrashedOn = append(untrashedOn, vol.String())
617 if numNotFound == len(rtr.volmgr.AllWritable()) {
618 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
619 } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
620 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
622 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
623 if len(failedOn) > 0 {
624 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
625 http.Error(resp, respBody, http.StatusInternalServerError)
627 fmt.Fprintln(resp, respBody)
632 // GetBlock and PutBlock implement lower-level code for handling
633 // blocks by rooting through volumes connected to the local machine.
634 // Once the handler has determined that system policy permits the
635 // request, it calls these methods to perform the actual operation.
637 // TODO(twp): this code would probably be better located in the
638 // VolumeManager interface. As an abstraction, the VolumeManager
639 // should be the only part of the code that cares about which volume a
640 // block is stored on, so it should be responsible for figuring out
641 // which volume to check for fetching blocks, storing blocks, etc.
643 // GetBlock fetches the block identified by "hash" into the provided
644 // buf, and returns the data size.
646 // If the block cannot be found on any volume, returns NotFoundError.
648 // If the block found does not have the correct MD5 hash, returns
651 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
652 log := ctxlog.FromContext(ctx)
654 // Attempt to read the requested hash from a keep volume.
655 errorToCaller := NotFoundError
657 for _, vol := range volmgr.AllReadable() {
658 size, err := vol.Get(ctx, hash, buf)
661 return 0, ErrClientDisconnect
665 // IsNotExist is an expected error and may be
666 // ignored. All other errors are logged. In
667 // any case we continue trying to read other
668 // volumes. If all volumes report IsNotExist,
669 // we return a NotFoundError.
670 if !os.IsNotExist(err) {
671 log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
673 // If some volume returns a transient error, return it to the caller
674 // instead of "Not found" so it can retry.
675 if err == VolumeBusyError {
676 errorToCaller = err.(*KeepError)
680 // Check the file checksum.
681 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
682 if filehash != hash {
683 // TODO: Try harder to tell a sysadmin about
685 log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
686 errorToCaller = DiskHashError
689 if errorToCaller == DiskHashError {
690 log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
694 return 0, errorToCaller
697 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
699 // PutBlock(ctx, block, hash)
700 // Stores the BLOCK (identified by the content id HASH) in Keep.
702 // The MD5 checksum of the block must be identical to the content id HASH.
703 // If not, an error is returned.
705 // PutBlock stores the BLOCK on the first Keep volume with free space.
706 // A failure code is returned to the user only if all volumes fail.
708 // On success, PutBlock returns nil.
709 // On failure, it returns a KeepError with one of the following codes:
712 // A different block with the same hash already exists on this
715 // The MD5 hash of the BLOCK does not match the argument HASH.
717 // There was not enough space left in any Keep volume to store
720 // The object could not be stored for some other reason (e.g.
721 // all writes failed). The text of the error message should
722 // provide as much detail as possible.
724 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
725 log := ctxlog.FromContext(ctx)
727 // Check that BLOCK's checksum matches HASH.
728 blockhash := fmt.Sprintf("%x", md5.Sum(block))
729 if blockhash != hash {
730 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
731 return 0, RequestHashError
734 // If we already have this data, it's intact on disk, and we
735 // can update its timestamp, return success. If we have
736 // different data with the same hash, return failure.
737 if n, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
739 } else if ctx.Err() != nil {
740 return 0, ErrClientDisconnect
743 // Choose a Keep volume to write to.
744 // If this volume fails, try all of the volumes in order.
745 if mnt := volmgr.NextWritable(); mnt != nil {
746 if err := mnt.Put(ctx, hash, block); err != nil {
747 log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
749 return mnt.Replication, nil // success!
752 if ctx.Err() != nil {
753 return 0, ErrClientDisconnect
756 writables := volmgr.AllWritable()
757 if len(writables) == 0 {
758 log.Error("no writable volumes")
763 for _, vol := range writables {
764 err := vol.Put(ctx, hash, block)
765 if ctx.Err() != nil {
766 return 0, ErrClientDisconnect
770 return vol.Replication, nil // success!
774 // The volume is not full but the
775 // write did not succeed. Report the
776 // error and continue trying.
778 log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash)
783 log.Error("all volumes are full")
786 // Already logged the non-full errors.
787 return 0, GenericError
790 // CompareAndTouch returns the current replication level if one of the
791 // volumes already has the given content and it successfully updates
792 // the relevant block's modification time in order to protect it from
793 // premature garbage collection. Otherwise, it returns a non-nil
795 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
796 log := ctxlog.FromContext(ctx)
797 var bestErr error = NotFoundError
798 for _, mnt := range volmgr.AllWritable() {
799 err := mnt.Compare(ctx, hash, buf)
800 if ctx.Err() != nil {
802 } else if err == CollisionError {
803 // Stop if we have a block with same hash but
804 // different content. (It will be impossible
805 // to tell which one is wanted if we have
806 // both, so there's no point writing it even
807 // on a different volume.)
808 log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
810 } else if os.IsNotExist(err) {
811 // Block does not exist. This is the only
812 // "normal" error: we don't log anything.
814 } else if err != nil {
815 // Couldn't open file, data is corrupt on
816 // disk, etc.: log this abnormal condition,
817 // and try the next volume.
818 log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
821 if err := mnt.Touch(hash); err != nil {
822 log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
826 // Compare and Touch both worked --> done.
827 return mnt.Replication, nil
832 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
834 // IsValidLocator returns true if the specified string is a valid Keep locator.
835 // When Keep is extended to support hash types other than MD5,
836 // this should be updated to cover those as well.
838 func IsValidLocator(loc string) bool {
839 return validLocatorRe.MatchString(loc)
842 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
844 // GetAPIToken returns the OAuth2 token from the Authorization
845 // header of a HTTP request, or an empty string if no matching
847 func GetAPIToken(req *http.Request) string {
848 if auth, ok := req.Header["Authorization"]; ok {
849 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
856 // canDelete returns true if the user identified by apiToken is
857 // allowed to delete blocks.
858 func (rtr *router) canDelete(apiToken string) bool {
862 // Blocks may be deleted only when Keep has been configured with a
864 if rtr.isSystemAuth(apiToken) {
867 // TODO(twp): look up apiToken with the API server
868 // return true if is_admin is true and if the token
869 // has unlimited scope
873 // isSystemAuth returns true if the given token is allowed to perform
874 // system level actions like deleting data.
875 func (rtr *router) isSystemAuth(token string) bool {
876 return token != "" && token == rtr.cluster.SystemRootToken