Merge branch '16265-security-updates' into dependabot/bundler/apps/workbench/loofah...
[arvados.git] / services / keepstore / handlers.go
index fb327a386b0f33fdae30f1e0d3e4f880c8d0bfa1..3d0f893d82fc20c8a01a833aa050f6203a1c70c7 100644 (file)
@@ -4,13 +4,6 @@
 
 package main
 
-// REST handlers for Keep are implemented here.
-//
-// GetBlockHandler (GET /locator)
-// PutBlockHandler (PUT /locator)
-// IndexHandler    (GET /index, GET /index/prefix)
-// StatusHandler   (GET /status.json)
-
 import (
        "container/list"
        "context"
@@ -27,35 +20,52 @@ import (
        "sync"
        "time"
 
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "git.arvados.org/arvados.git/sdk/go/health"
+       "git.arvados.org/arvados.git/sdk/go/httpserver"
        "github.com/gorilla/mux"
-
-       "git.curoverse.com/arvados.git/sdk/go/health"
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
 )
 
 type router struct {
        *mux.Router
-       limiter httpserver.RequestCounter
+       cluster     *arvados.Cluster
+       logger      logrus.FieldLogger
+       remoteProxy remoteProxy
+       metrics     *nodeMetrics
+       volmgr      *RRVolumeManager
+       pullq       *WorkQueue
+       trashq      *WorkQueue
 }
 
 // MakeRESTRouter returns a new router that forwards all Keep requests
 // to the appropriate handlers.
-func MakeRESTRouter() http.Handler {
-       rtr := &router{Router: mux.NewRouter()}
+func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
+       rtr := &router{
+               Router:  mux.NewRouter(),
+               cluster: cluster,
+               logger:  ctxlog.FromContext(ctx),
+               metrics: &nodeMetrics{reg: reg},
+               volmgr:  volmgr,
+               pullq:   pullq,
+               trashq:  trashq,
+       }
 
        rtr.HandleFunc(
-               `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
+               `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
        rtr.HandleFunc(
                `/{hash:[0-9a-f]{32}}+{hints}`,
-               GetBlockHandler).Methods("GET", "HEAD")
+               rtr.handleGET).Methods("GET", "HEAD")
 
-       rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
-       rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
+       rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
+       rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
        // List all blocks stored here. Privileged client only.
-       rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
+       rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
        // List blocks stored here whose hash has the given prefix.
        // Privileged client only.
-       rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
+       rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
 
        // Internals/debugging info (runtime.MemStats)
        rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
@@ -65,20 +75,20 @@ func MakeRESTRouter() http.Handler {
 
        // List mounts: UUID, readonly, tier, device ID, ...
        rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
-       rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
-       rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
+       rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
+       rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
 
        // Replace the current pull queue.
-       rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
+       rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
 
        // Replace the current trash queue.
-       rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
+       rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
 
        // Untrash moves blocks from trash back into store
-       rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
+       rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
 
        rtr.Handle("/_health/{check}", &health.Handler{
-               Token:  theConfig.ManagementToken,
+               Token:  cluster.ManagementToken,
                Prefix: "/_health/",
        }).Methods("GET")
 
@@ -86,17 +96,11 @@ func MakeRESTRouter() http.Handler {
        // 400 Bad Request.
        rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
 
-       theConfig.metrics.setup()
+       rtr.metrics.setupBufferPoolMetrics(bufs)
+       rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
+       rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
 
-       rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
-
-       mux := http.NewServeMux()
-       mux.Handle("/", theConfig.metrics.Instrument(
-               httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter))))
-       mux.HandleFunc("/metrics.json", theConfig.metrics.exportJSON)
-       mux.Handle("/metrics", theConfig.metrics.exportProm)
-
-       return mux
+       return rtr
 }
 
 // BadRequestHandler is a HandleFunc to address bad requests.
@@ -104,14 +108,19 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
        http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
 }
 
-// GetBlockHandler is a HandleFunc to address Get block requests.
-func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
        ctx, cancel := contextForResponse(context.TODO(), resp)
        defer cancel()
 
-       if theConfig.RequireSignatures {
+       locator := req.URL.Path[1:]
+       if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
+               rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
+               return
+       }
+
+       if rtr.cluster.Collections.BlobSigning {
                locator := req.URL.Path[1:] // strip leading slash
-               if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
+               if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
                        http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
                        return
                }
@@ -131,7 +140,7 @@ func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
        }
        defer bufs.Put(buf)
 
-       size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
+       size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
        if err != nil {
                code := http.StatusInternalServerError
                if err, ok := err.(*KeepError); ok {
@@ -153,7 +162,6 @@ func contextForResponse(parent context.Context, resp http.ResponseWriter) (conte
                go func(c <-chan bool) {
                        select {
                        case <-c:
-                               theConfig.debugLogf("cancel context")
                                cancel()
                        case <-ctx.Done():
                        }
@@ -183,8 +191,7 @@ func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([
        }
 }
 
-// PutBlockHandler is a HandleFunc to address Put block requests.
-func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
        ctx, cancel := contextForResponse(context.TODO(), resp)
        defer cancel()
 
@@ -204,7 +211,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       if len(KeepVM.AllWritable()) == 0 {
+       if len(rtr.volmgr.AllWritable()) == 0 {
                http.Error(resp, FullError.Error(), FullError.HTTPCode)
                return
        }
@@ -222,7 +229,7 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       replication, err := PutBlock(ctx, buf, hash)
+       replication, err := PutBlock(ctx, rtr.volmgr, buf, hash)
        bufs.Put(buf)
 
        if err != nil {
@@ -238,9 +245,9 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
        // return it to the client.
        returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
        apiToken := GetAPIToken(req)
-       if theConfig.blobSigningKey != nil && apiToken != "" {
-               expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
-               returnHash = SignLocator(returnHash, apiToken, expiry)
+       if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
+               expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
+               returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
        }
        resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
        resp.Write([]byte(returnHash + "\n"))
@@ -248,8 +255,8 @@ func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
 
 // IndexHandler responds to "/index", "/index/{prefix}", and
 // "/mounts/{uuid}/blocks" requests.
-func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
-       if !IsSystemAuth(GetAPIToken(req)) {
+func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
+       if !rtr.isSystemAuth(GetAPIToken(req)) {
                http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
                return
        }
@@ -262,25 +269,27 @@ func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
 
        uuid := mux.Vars(req)["uuid"]
 
-       var vols []Volume
+       var vols []*VolumeMount
        if uuid == "" {
-               vols = KeepVM.AllReadable()
-       } else if v := KeepVM.Lookup(uuid, false); v == nil {
+               vols = rtr.volmgr.AllReadable()
+       } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
                http.Error(resp, "mount not found", http.StatusNotFound)
                return
        } else {
-               vols = []Volume{v}
+               vols = []*VolumeMount{mnt}
        }
 
        for _, v := range vols {
                if err := v.IndexTo(prefix, resp); err != nil {
-                       // The only errors returned by IndexTo are
-                       // write errors returned by resp.Write(),
-                       // which probably means the client has
-                       // disconnected and this error will never be
-                       // reported to the client -- but it will
-                       // appear in our own error log.
-                       http.Error(resp, err.Error(), http.StatusInternalServerError)
+                       // We can't send an error status/message to
+                       // the client because IndexTo() might have
+                       // already written body content. All we can do
+                       // is log the error in our own logs.
+                       //
+                       // The client must notice the lack of trailing
+                       // newline as an indication that the response
+                       // is incomplete.
+                       ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
                        return
                }
        }
@@ -291,9 +300,9 @@ func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
 
 // MountsHandler responds to "GET /mounts" requests.
 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
-       err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
+       err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
        if err != nil {
-               http.Error(resp, err.Error(), http.StatusInternalServerError)
+               httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
        }
 }
 
@@ -332,56 +341,52 @@ func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
        }
        var ds debugStats
        runtime.ReadMemStats(&ds.MemStats)
-       err := json.NewEncoder(resp).Encode(&ds)
+       data, err := json.Marshal(&ds)
        if err != nil {
-               http.Error(resp, err.Error(), 500)
+               http.Error(resp, err.Error(), http.StatusInternalServerError)
+               return
        }
+       resp.Write(data)
 }
 
 // StatusHandler addresses /status.json requests.
 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
        stLock.Lock()
        rtr.readNodeStatus(&st)
-       jstat, err := json.Marshal(&st)
+       data, err := json.Marshal(&st)
        stLock.Unlock()
-       if err == nil {
-               resp.Write(jstat)
-       } else {
-               log.Printf("json.Marshal: %s", err)
-               log.Printf("NodeStatus = %v", &st)
-               http.Error(resp, err.Error(), 500)
+       if err != nil {
+               http.Error(resp, err.Error(), http.StatusInternalServerError)
+               return
        }
+       resp.Write(data)
 }
 
 // populate the given NodeStatus struct with current values.
 func (rtr *router) readNodeStatus(st *NodeStatus) {
        st.Version = version
-       vols := KeepVM.AllReadable()
+       vols := rtr.volmgr.AllReadable()
        if cap(st.Volumes) < len(vols) {
                st.Volumes = make([]*volumeStatusEnt, len(vols))
        }
        st.Volumes = st.Volumes[:0]
        for _, vol := range vols {
                var internalStats interface{}
-               if vol, ok := vol.(InternalStatser); ok {
+               if vol, ok := vol.Volume.(InternalStatser); ok {
                        internalStats = vol.InternalStats()
                }
                st.Volumes = append(st.Volumes, &volumeStatusEnt{
                        Label:         vol.String(),
                        Status:        vol.Status(),
                        InternalStats: internalStats,
-                       //VolumeStats: KeepVM.VolumeStats(vol),
+                       //VolumeStats: rtr.volmgr.VolumeStats(vol),
                })
        }
        st.BufferPool.Alloc = bufs.Alloc()
        st.BufferPool.Cap = bufs.Cap()
        st.BufferPool.Len = bufs.Len()
-       st.PullQueue = getWorkQueueStatus(pullq)
-       st.TrashQueue = getWorkQueueStatus(trashq)
-       if rtr.limiter != nil {
-               st.RequestsCurrent = rtr.limiter.Current()
-               st.RequestsMax = rtr.limiter.Max()
-       }
+       st.PullQueue = getWorkQueueStatus(rtr.pullq)
+       st.TrashQueue = getWorkQueueStatus(rtr.trashq)
 }
 
 // return a WorkQueueStatus for the given queue. If q is nil (which
@@ -395,7 +400,7 @@ func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
        return q.Status()
 }
 
-// DeleteHandler processes DELETE requests.
+// handleDELETE processes DELETE requests.
 //
 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
 // from all connected volumes.
@@ -406,7 +411,7 @@ func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
 // a PermissionError.
 //
 // Upon receiving a valid request from an authorized user,
-// DeleteHandler deletes all copies of the specified block on local
+// handleDELETE deletes all copies of the specified block on local
 // writable volumes.
 //
 // Response format:
@@ -422,17 +427,17 @@ func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
 // where d and f are integers representing the number of blocks that
 // were successfully and unsuccessfully deleted.
 //
-func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
        hash := mux.Vars(req)["hash"]
 
        // Confirm that this user is an admin and has a token with unlimited scope.
        var tok = GetAPIToken(req)
-       if tok == "" || !CanDelete(tok) {
+       if tok == "" || !rtr.canDelete(tok) {
                http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
                return
        }
 
-       if !theConfig.EnableDelete {
+       if !rtr.cluster.Collections.BlobTrash {
                http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
                return
        }
@@ -444,35 +449,26 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
                Deleted int `json:"copies_deleted"`
                Failed  int `json:"copies_failed"`
        }
-       for _, vol := range KeepVM.AllWritable() {
+       for _, vol := range rtr.volmgr.AllWritable() {
                if err := vol.Trash(hash); err == nil {
                        result.Deleted++
                } else if os.IsNotExist(err) {
                        continue
                } else {
                        result.Failed++
-                       log.Println("DeleteHandler:", err)
+                       ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
                }
        }
-
-       var st int
-
        if result.Deleted == 0 && result.Failed == 0 {
-               st = http.StatusNotFound
-       } else {
-               st = http.StatusOK
+               resp.WriteHeader(http.StatusNotFound)
+               return
        }
-
-       resp.WriteHeader(st)
-
-       if st == http.StatusOK {
-               if body, err := json.Marshal(result); err == nil {
-                       resp.Write(body)
-               } else {
-                       log.Printf("json.Marshal: %s (result = %v)", err, result)
-                       http.Error(resp, err.Error(), 500)
-               }
+       body, err := json.Marshal(result)
+       if err != nil {
+               http.Error(resp, err.Error(), http.StatusInternalServerError)
+               return
        }
+       resp.Write(body)
 }
 
 /* PullHandler processes "PUT /pull" requests for the data manager.
@@ -518,9 +514,9 @@ type PullRequest struct {
 }
 
 // PullHandler processes "PUT /pull" requests for the data manager.
-func PullHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
        // Reject unauthorized requests.
-       if !IsSystemAuth(GetAPIToken(req)) {
+       if !rtr.isSystemAuth(GetAPIToken(req)) {
                http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
                return
        }
@@ -544,7 +540,7 @@ func PullHandler(resp http.ResponseWriter, req *http.Request) {
        for _, p := range pr {
                plist.PushBack(p)
        }
-       pullq.ReplaceQueue(plist)
+       rtr.pullq.ReplaceQueue(plist)
 }
 
 // TrashRequest consists of a block locator and its Mtime
@@ -557,9 +553,9 @@ type TrashRequest struct {
 }
 
 // TrashHandler processes /trash requests.
-func TrashHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
        // Reject unauthorized requests.
-       if !IsSystemAuth(GetAPIToken(req)) {
+       if !rtr.isSystemAuth(GetAPIToken(req)) {
                http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
                return
        }
@@ -583,53 +579,53 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
        for _, t := range trash {
                tlist.PushBack(t)
        }
-       trashq.ReplaceQueue(tlist)
+       rtr.trashq.ReplaceQueue(tlist)
 }
 
 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
-func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
        // Reject unauthorized requests.
-       if !IsSystemAuth(GetAPIToken(req)) {
+       if !rtr.isSystemAuth(GetAPIToken(req)) {
                http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
                return
        }
 
+       log := ctxlog.FromContext(req.Context())
        hash := mux.Vars(req)["hash"]
 
-       if len(KeepVM.AllWritable()) == 0 {
+       if len(rtr.volmgr.AllWritable()) == 0 {
                http.Error(resp, "No writable volumes", http.StatusNotFound)
                return
        }
 
        var untrashedOn, failedOn []string
        var numNotFound int
-       for _, vol := range KeepVM.AllWritable() {
+       for _, vol := range rtr.volmgr.AllWritable() {
                err := vol.Untrash(hash)
 
                if os.IsNotExist(err) {
                        numNotFound++
                } else if err != nil {
-                       log.Printf("Error untrashing %v on volume %v", hash, vol.String())
+                       log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
                        failedOn = append(failedOn, vol.String())
                } else {
-                       log.Printf("Untrashed %v on volume %v", hash, vol.String())
+                       log.Infof("Untrashed %v on volume %v", hash, vol.String())
                        untrashedOn = append(untrashedOn, vol.String())
                }
        }
 
-       if numNotFound == len(KeepVM.AllWritable()) {
+       if numNotFound == len(rtr.volmgr.AllWritable()) {
                http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
-               return
-       }
-
-       if len(failedOn) == len(KeepVM.AllWritable()) {
+       } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
                http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
        } else {
-               respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
+               respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
                if len(failedOn) > 0 {
-                       respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
+                       respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
+                       http.Error(resp, respBody, http.StatusInternalServerError)
+               } else {
+                       fmt.Fprintln(resp, respBody)
                }
-               resp.Write([]byte(respBody))
        }
 }
 
@@ -652,11 +648,13 @@ func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
 // If the block found does not have the correct MD5 hash, returns
 // DiskHashError.
 //
-func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
+func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
+       log := ctxlog.FromContext(ctx)
+
        // Attempt to read the requested hash from a keep volume.
        errorToCaller := NotFoundError
 
-       for _, vol := range KeepVM.AllReadable() {
+       for _, vol := range volmgr.AllReadable() {
                size, err := vol.Get(ctx, hash, buf)
                select {
                case <-ctx.Done():
@@ -670,24 +668,26 @@ func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWr
                        // volumes. If all volumes report IsNotExist,
                        // we return a NotFoundError.
                        if !os.IsNotExist(err) {
-                               log.Printf("%s: Get(%s): %s", vol, hash, err)
+                               log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
+                       }
+                       // If some volume returns a transient error, return it to the caller
+                       // instead of "Not found" so it can retry.
+                       if err == VolumeBusyError {
+                               errorToCaller = err.(*KeepError)
                        }
                        continue
                }
                // Check the file checksum.
-               //
                filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
                if filehash != hash {
                        // TODO: Try harder to tell a sysadmin about
                        // this.
-                       log.Printf("%s: checksum mismatch for request %s (actual %s)",
-                               vol, hash, filehash)
+                       log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
                        errorToCaller = DiskHashError
                        continue
                }
                if errorToCaller == DiskHashError {
-                       log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
-                               vol, hash)
+                       log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
                }
                return size, nil
        }
@@ -721,7 +721,9 @@ func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWr
 //          all writes failed). The text of the error message should
 //          provide as much detail as possible.
 //
-func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
+func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
+       log := ctxlog.FromContext(ctx)
+
        // Check that BLOCK's checksum matches HASH.
        blockhash := fmt.Sprintf("%x", md5.Sum(block))
        if blockhash != hash {
@@ -732,7 +734,7 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
        // If we already have this data, it's intact on disk, and we
        // can update its timestamp, return success. If we have
        // different data with the same hash, return failure.
-       if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
+       if n, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
                return n, err
        } else if ctx.Err() != nil {
                return 0, ErrClientDisconnect
@@ -740,18 +742,20 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
 
        // Choose a Keep volume to write to.
        // If this volume fails, try all of the volumes in order.
-       if vol := KeepVM.NextWritable(); vol != nil {
-               if err := vol.Put(ctx, hash, block); err == nil {
-                       return vol.Replication(), nil // success!
-               }
-               if ctx.Err() != nil {
-                       return 0, ErrClientDisconnect
+       if mnt := volmgr.NextWritable(); mnt != nil {
+               if err := mnt.Put(ctx, hash, block); err != nil {
+                       log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
+               } else {
+                       return mnt.Replication, nil // success!
                }
        }
+       if ctx.Err() != nil {
+               return 0, ErrClientDisconnect
+       }
 
-       writables := KeepVM.AllWritable()
+       writables := volmgr.AllWritable()
        if len(writables) == 0 {
-               log.Print("No writable volumes.")
+               log.Error("no writable volumes")
                return 0, FullError
        }
 
@@ -761,20 +765,22 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
                if ctx.Err() != nil {
                        return 0, ErrClientDisconnect
                }
-               if err == nil {
-                       return vol.Replication(), nil // success!
-               }
-               if err != FullError {
+               switch err {
+               case nil:
+                       return vol.Replication, nil // success!
+               case FullError:
+                       continue
+               default:
                        // The volume is not full but the
                        // write did not succeed.  Report the
                        // error and continue trying.
                        allFull = false
-                       log.Printf("%s: Write(%s): %s", vol, hash, err)
+                       log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash)
                }
        }
 
        if allFull {
-               log.Print("All volumes are full.")
+               log.Error("all volumes are full")
                return 0, FullError
        }
        // Already logged the non-full errors.
@@ -786,10 +792,11 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
 // the relevant block's modification time in order to protect it from
 // premature garbage collection. Otherwise, it returns a non-nil
 // error.
-func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
+func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
+       log := ctxlog.FromContext(ctx)
        var bestErr error = NotFoundError
-       for _, vol := range KeepVM.AllWritable() {
-               err := vol.Compare(ctx, hash, buf)
+       for _, mnt := range volmgr.AllWritable() {
+               err := mnt.Compare(ctx, hash, buf)
                if ctx.Err() != nil {
                        return 0, ctx.Err()
                } else if err == CollisionError {
@@ -798,7 +805,7 @@ func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error)
                        // to tell which one is wanted if we have
                        // both, so there's no point writing it even
                        // on a different volume.)
-                       log.Printf("%s: Compare(%s): %s", vol, hash, err)
+                       log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
                        return 0, err
                } else if os.IsNotExist(err) {
                        // Block does not exist. This is the only
@@ -808,16 +815,16 @@ func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error)
                        // Couldn't open file, data is corrupt on
                        // disk, etc.: log this abnormal condition,
                        // and try the next volume.
-                       log.Printf("%s: Compare(%s): %s", vol, hash, err)
+                       log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
                        continue
                }
-               if err := vol.Touch(hash); err != nil {
-                       log.Printf("%s: Touch %s failed: %s", vol, hash, err)
+               if err := mnt.Touch(hash); err != nil {
+                       log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
                        bestErr = err
                        continue
                }
                // Compare and Touch both worked --> done.
-               return vol.Replication(), nil
+               return mnt.Replication, nil
        }
        return 0, bestErr
 }
@@ -832,7 +839,7 @@ func IsValidLocator(loc string) bool {
        return validLocatorRe.MatchString(loc)
 }
 
-var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
+var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
 
 // GetAPIToken returns the OAuth2 token from the Authorization
 // header of a HTTP request, or an empty string if no matching
@@ -840,33 +847,21 @@ var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
 func GetAPIToken(req *http.Request) string {
        if auth, ok := req.Header["Authorization"]; ok {
                if match := authRe.FindStringSubmatch(auth[0]); match != nil {
-                       return match[1]
+                       return match[2]
                }
        }
        return ""
 }
 
-// IsExpired returns true if the given Unix timestamp (expressed as a
-// hexadecimal string) is in the past, or if timestampHex cannot be
-// parsed as a hexadecimal string.
-func IsExpired(timestampHex string) bool {
-       ts, err := strconv.ParseInt(timestampHex, 16, 0)
-       if err != nil {
-               log.Printf("IsExpired: %s", err)
-               return true
-       }
-       return time.Unix(ts, 0).Before(time.Now())
-}
-
-// CanDelete returns true if the user identified by apiToken is
+// canDelete returns true if the user identified by apiToken is
 // allowed to delete blocks.
-func CanDelete(apiToken string) bool {
+func (rtr *router) canDelete(apiToken string) bool {
        if apiToken == "" {
                return false
        }
        // Blocks may be deleted only when Keep has been configured with a
        // data manager.
-       if IsSystemAuth(apiToken) {
+       if rtr.isSystemAuth(apiToken) {
                return true
        }
        // TODO(twp): look up apiToken with the API server
@@ -875,8 +870,8 @@ func CanDelete(apiToken string) bool {
        return false
 }
 
-// IsSystemAuth returns true if the given token is allowed to perform
+// isSystemAuth returns true if the given token is allowed to perform
 // system level actions like deleting data.
-func IsSystemAuth(token string) bool {
-       return token != "" && token == theConfig.systemAuthToken
+func (rtr *router) isSystemAuth(token string) bool {
+       return token != "" && token == rtr.cluster.SystemRootToken
 }