X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/bc14c62ad1528dbddc26781c5cea6a7968c93f2e..8a27fe370239ecb8e50d53f46b45ed61203a35ca:/services/keepstore/handlers.go diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index 72088e2b5e..63a23687ec 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepstore import ( "container/list" @@ -18,31 +18,41 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/ctxlog" - "git.curoverse.com/arvados.git/sdk/go/health" - "git.curoverse.com/arvados.git/sdk/go/httpserver" + "git.arvados.org/arvados.git/lib/cmd" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "git.arvados.org/arvados.git/sdk/go/health" + "git.arvados.org/arvados.git/sdk/go/httpserver" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" ) type router struct { *mux.Router - limiter httpserver.RequestCounter cluster *arvados.Cluster + logger logrus.FieldLogger remoteProxy remoteProxy metrics *nodeMetrics + volmgr *RRVolumeManager + pullq *WorkQueue + trashq *WorkQueue } // MakeRESTRouter returns a new router that forwards all Keep requests // to the appropriate handlers. -func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler { +func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler { rtr := &router{ Router: mux.NewRouter(), cluster: cluster, + logger: ctxlog.FromContext(ctx), metrics: &nodeMetrics{reg: reg}, + volmgr: volmgr, + pullq: pullq, + trashq: trashq, } rtr.HandleFunc( @@ -52,12 +62,14 @@ func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Han rtr.handleGET).Methods("GET", "HEAD") rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT") - rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE") + rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE") // List all blocks stored here. Privileged client only. - rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD") + rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD") // List blocks stored here whose hash has the given prefix. // Privileged client only. - rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD") + rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD") + // Update timestamp on existing block. Privileged client only. + rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH") // Internals/debugging info (runtime.MemStats) rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD") @@ -67,20 +79,20 @@ func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Han // List mounts: UUID, readonly, tier, device ID, ... rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET") - rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET") - rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET") + rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET") + rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET") // Replace the current pull queue. - rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT") + rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT") // Replace the current trash queue. - rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT") + rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT") // Untrash moves blocks from trash back into store - rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT") + rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT") rtr.Handle("/_health/{check}", &health.Handler{ - Token: theConfig.ManagementToken, + Token: cluster.ManagementToken, Prefix: "/_health/", }).Methods("GET") @@ -88,17 +100,11 @@ func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Han // 400 Bad Request. rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler) - rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr) rtr.metrics.setupBufferPoolMetrics(bufs) - rtr.metrics.setupWorkQueueMetrics(pullq, "pull") - rtr.metrics.setupWorkQueueMetrics(trashq, "trash") - rtr.metrics.setupRequestMetrics(rtr.limiter) + rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull") + rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash") - instrumented := httpserver.Instrument(rtr.metrics.reg, log, - httpserver.HandlerWithContext( - ctxlog.Context(context.Background(), log), - httpserver.AddRequestIDs(httpserver.LogRequests(rtr.limiter)))) - return instrumented.ServeAPI(theConfig.ManagementToken, instrumented) + return rtr } // BadRequestHandler is a HandleFunc to address bad requests. @@ -107,18 +113,15 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) { } func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) { - ctx, cancel := contextForResponse(context.TODO(), resp) - defer cancel() - locator := req.URL.Path[1:] if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") { - rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster) + rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr) return } - if theConfig.RequireSignatures { + if rtr.cluster.Collections.BlobSigning { locator := req.URL.Path[1:] // strip leading slash - if err := VerifySignature(locator, GetAPIToken(req)); err != nil { + if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil { http.Error(resp, err.Error(), err.(*KeepError).HTTPCode) return } @@ -131,14 +134,14 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) { // isn't here, we can return 404 now instead of waiting for a // buffer. - buf, err := getBufferWithContext(ctx, bufs, BlockSize) + buf, err := getBufferWithContext(req.Context(), bufs, BlockSize) if err != nil { http.Error(resp, err.Error(), http.StatusServiceUnavailable) return } defer bufs.Put(buf) - size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp) + size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp) if err != nil { code := http.StatusInternalServerError if err, ok := err.(*KeepError); ok { @@ -153,22 +156,6 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) { resp.Write(buf[:size]) } -// Return a new context that gets cancelled by resp's CloseNotifier. -func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) { - ctx, cancel := context.WithCancel(parent) - if cn, ok := resp.(http.CloseNotifier); ok { - go func(c <-chan bool) { - select { - case <-c: - theConfig.debugLogf("cancel context") - cancel() - case <-ctx.Done(): - } - }(cn.CloseNotify()) - } - return ctx, cancel -} - // Get a buffer from the pool -- but give up and return a non-nil // error if ctx ends before we get a buffer. func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) { @@ -190,10 +177,35 @@ func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([ } } -func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { - ctx, cancel := contextForResponse(context.TODO(), resp) - defer cancel() +func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) { + if !rtr.isSystemAuth(GetAPIToken(req)) { + http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode) + return + } + hash := mux.Vars(req)["hash"] + vols := rtr.volmgr.AllWritable() + if len(vols) == 0 { + http.Error(resp, "no volumes", http.StatusNotFound) + return + } + var err error + for _, mnt := range vols { + err = mnt.Touch(hash) + if err == nil { + break + } + } + switch { + case err == nil: + return + case os.IsNotExist(err): + http.Error(resp, err.Error(), http.StatusNotFound) + default: + http.Error(resp, err.Error(), http.StatusInternalServerError) + } +} +func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { hash := mux.Vars(req)["hash"] // Detect as many error conditions as possible before reading @@ -210,12 +222,27 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { return } - if len(KeepVM.AllWritable()) == 0 { + if len(rtr.volmgr.AllWritable()) == 0 { http.Error(resp, FullError.Error(), FullError.HTTPCode) return } - buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength)) + var wantStorageClasses []string + if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" { + wantStorageClasses = strings.Split(hdr, ",") + for i, sc := range wantStorageClasses { + wantStorageClasses[i] = strings.TrimSpace(sc) + } + } else { + // none specified -- use configured default + for class, cfg := range rtr.cluster.StorageClasses { + if cfg.Default { + wantStorageClasses = append(wantStorageClasses, class) + } + } + } + + buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength)) if err != nil { http.Error(resp, err.Error(), http.StatusServiceUnavailable) return @@ -228,7 +255,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { return } - replication, err := PutBlock(ctx, buf, hash) + result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses) bufs.Put(buf) if err != nil { @@ -244,18 +271,19 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { // return it to the client. returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength) apiToken := GetAPIToken(req) - if theConfig.blobSigningKey != nil && apiToken != "" { - expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration()) - returnHash = SignLocator(returnHash, apiToken, expiry) + if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" { + expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration()) + returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry) } - resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication)) + resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication()) + resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication()) resp.Write([]byte(returnHash + "\n")) } // IndexHandler responds to "/index", "/index/{prefix}", and // "/mounts/{uuid}/blocks" requests. -func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) { - if !IsSystemAuth(GetAPIToken(req)) { +func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) { + if !rtr.isSystemAuth(GetAPIToken(req)) { http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode) return } @@ -268,31 +296,27 @@ func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) { uuid := mux.Vars(req)["uuid"] - var vols []Volume + var vols []*VolumeMount if uuid == "" { - vols = KeepVM.AllReadable() - } else if v := KeepVM.Lookup(uuid, false); v == nil { + vols = rtr.volmgr.AllReadable() + } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil { http.Error(resp, "mount not found", http.StatusNotFound) return } else { - vols = []Volume{v} + vols = []*VolumeMount{mnt} } for _, v := range vols { if err := v.IndexTo(prefix, resp); err != nil { - // We can't send an error message to the - // client because we might have already sent - // headers and index content. All we can do is - // log the error in our own logs, and (in - // cases where headers haven't been sent yet) - // set a 500 status. + // We can't send an error status/message to + // the client because IndexTo() might have + // already written body content. All we can do + // is log the error in our own logs. // - // If headers have already been sent, the - // client must notice the lack of trailing + // The client must notice the lack of trailing // newline as an indication that the response // is incomplete. - log.Printf("index error from volume %s: %s", v, err) - http.Error(resp, "", http.StatusInternalServerError) + ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v) return } } @@ -303,9 +327,9 @@ func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) { // MountsHandler responds to "GET /mounts" requests. func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) { - err := json.NewEncoder(resp).Encode(KeepVM.Mounts()) + err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts()) if err != nil { - http.Error(resp, err.Error(), http.StatusInternalServerError) + httpserver.Error(resp, err.Error(), http.StatusInternalServerError) } } @@ -344,56 +368,52 @@ func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) { } var ds debugStats runtime.ReadMemStats(&ds.MemStats) - err := json.NewEncoder(resp).Encode(&ds) + data, err := json.Marshal(&ds) if err != nil { - http.Error(resp, err.Error(), 500) + http.Error(resp, err.Error(), http.StatusInternalServerError) + return } + resp.Write(data) } // StatusHandler addresses /status.json requests. func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) { stLock.Lock() rtr.readNodeStatus(&st) - jstat, err := json.Marshal(&st) + data, err := json.Marshal(&st) stLock.Unlock() - if err == nil { - resp.Write(jstat) - } else { - log.Printf("json.Marshal: %s", err) - log.Printf("NodeStatus = %v", &st) - http.Error(resp, err.Error(), 500) + if err != nil { + http.Error(resp, err.Error(), http.StatusInternalServerError) + return } + resp.Write(data) } // populate the given NodeStatus struct with current values. func (rtr *router) readNodeStatus(st *NodeStatus) { - st.Version = version - vols := KeepVM.AllReadable() + st.Version = strings.SplitN(cmd.Version.String(), " ", 2)[0] + vols := rtr.volmgr.AllReadable() if cap(st.Volumes) < len(vols) { st.Volumes = make([]*volumeStatusEnt, len(vols)) } st.Volumes = st.Volumes[:0] for _, vol := range vols { var internalStats interface{} - if vol, ok := vol.(InternalStatser); ok { + if vol, ok := vol.Volume.(InternalStatser); ok { internalStats = vol.InternalStats() } st.Volumes = append(st.Volumes, &volumeStatusEnt{ Label: vol.String(), Status: vol.Status(), InternalStats: internalStats, - //VolumeStats: KeepVM.VolumeStats(vol), + //VolumeStats: rtr.volmgr.VolumeStats(vol), }) } st.BufferPool.Alloc = bufs.Alloc() st.BufferPool.Cap = bufs.Cap() st.BufferPool.Len = bufs.Len() - st.PullQueue = getWorkQueueStatus(pullq) - st.TrashQueue = getWorkQueueStatus(trashq) - if rtr.limiter != nil { - st.RequestsCurrent = rtr.limiter.Current() - st.RequestsMax = rtr.limiter.Max() - } + st.PullQueue = getWorkQueueStatus(rtr.pullq) + st.TrashQueue = getWorkQueueStatus(rtr.trashq) } // return a WorkQueueStatus for the given queue. If q is nil (which @@ -407,7 +427,7 @@ func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus { return q.Status() } -// DeleteHandler processes DELETE requests. +// handleDELETE processes DELETE requests. // // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash // from all connected volumes. @@ -418,7 +438,7 @@ func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus { // a PermissionError. // // Upon receiving a valid request from an authorized user, -// DeleteHandler deletes all copies of the specified block on local +// handleDELETE deletes all copies of the specified block on local // writable volumes. // // Response format: @@ -434,17 +454,17 @@ func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus { // where d and f are integers representing the number of blocks that // were successfully and unsuccessfully deleted. // -func DeleteHandler(resp http.ResponseWriter, req *http.Request) { +func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) { hash := mux.Vars(req)["hash"] // Confirm that this user is an admin and has a token with unlimited scope. var tok = GetAPIToken(req) - if tok == "" || !CanDelete(tok) { + if tok == "" || !rtr.canDelete(tok) { http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode) return } - if !theConfig.EnableDelete { + if !rtr.cluster.Collections.BlobTrash { http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode) return } @@ -456,35 +476,26 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) { Deleted int `json:"copies_deleted"` Failed int `json:"copies_failed"` } - for _, vol := range KeepVM.AllWritable() { + for _, vol := range rtr.volmgr.AllWritable() { if err := vol.Trash(hash); err == nil { result.Deleted++ } else if os.IsNotExist(err) { continue } else { result.Failed++ - log.Println("DeleteHandler:", err) + ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol) } } - - var st int - if result.Deleted == 0 && result.Failed == 0 { - st = http.StatusNotFound - } else { - st = http.StatusOK + resp.WriteHeader(http.StatusNotFound) + return } - - resp.WriteHeader(st) - - if st == http.StatusOK { - if body, err := json.Marshal(result); err == nil { - resp.Write(body) - } else { - log.Printf("json.Marshal: %s (result = %v)", err, result) - http.Error(resp, err.Error(), 500) - } + body, err := json.Marshal(result) + if err != nil { + http.Error(resp, err.Error(), http.StatusInternalServerError) + return } + resp.Write(body) } /* PullHandler processes "PUT /pull" requests for the data manager. @@ -530,9 +541,9 @@ type PullRequest struct { } // PullHandler processes "PUT /pull" requests for the data manager. -func PullHandler(resp http.ResponseWriter, req *http.Request) { +func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) { // Reject unauthorized requests. - if !IsSystemAuth(GetAPIToken(req)) { + if !rtr.isSystemAuth(GetAPIToken(req)) { http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode) return } @@ -556,7 +567,7 @@ func PullHandler(resp http.ResponseWriter, req *http.Request) { for _, p := range pr { plist.PushBack(p) } - pullq.ReplaceQueue(plist) + rtr.pullq.ReplaceQueue(plist) } // TrashRequest consists of a block locator and its Mtime @@ -569,9 +580,9 @@ type TrashRequest struct { } // TrashHandler processes /trash requests. -func TrashHandler(resp http.ResponseWriter, req *http.Request) { +func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) { // Reject unauthorized requests. - if !IsSystemAuth(GetAPIToken(req)) { + if !rtr.isSystemAuth(GetAPIToken(req)) { http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode) return } @@ -595,53 +606,53 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) { for _, t := range trash { tlist.PushBack(t) } - trashq.ReplaceQueue(tlist) + rtr.trashq.ReplaceQueue(tlist) } // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager. -func UntrashHandler(resp http.ResponseWriter, req *http.Request) { +func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) { // Reject unauthorized requests. - if !IsSystemAuth(GetAPIToken(req)) { + if !rtr.isSystemAuth(GetAPIToken(req)) { http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode) return } + log := ctxlog.FromContext(req.Context()) hash := mux.Vars(req)["hash"] - if len(KeepVM.AllWritable()) == 0 { + if len(rtr.volmgr.AllWritable()) == 0 { http.Error(resp, "No writable volumes", http.StatusNotFound) return } var untrashedOn, failedOn []string var numNotFound int - for _, vol := range KeepVM.AllWritable() { + for _, vol := range rtr.volmgr.AllWritable() { err := vol.Untrash(hash) if os.IsNotExist(err) { numNotFound++ } else if err != nil { - log.Printf("Error untrashing %v on volume %v", hash, vol.String()) + log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol) failedOn = append(failedOn, vol.String()) } else { - log.Printf("Untrashed %v on volume %v", hash, vol.String()) + log.Infof("Untrashed %v on volume %v", hash, vol.String()) untrashedOn = append(untrashedOn, vol.String()) } } - if numNotFound == len(KeepVM.AllWritable()) { + if numNotFound == len(rtr.volmgr.AllWritable()) { http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound) - return - } - - if len(failedOn) == len(KeepVM.AllWritable()) { + } else if len(failedOn) == len(rtr.volmgr.AllWritable()) { http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError) } else { - respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",") + respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ") if len(failedOn) > 0 { - respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",") + respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ") + http.Error(resp, respBody, http.StatusInternalServerError) + } else { + fmt.Fprintln(resp, respBody) } - resp.Write([]byte(respBody)) } } @@ -664,11 +675,13 @@ func UntrashHandler(resp http.ResponseWriter, req *http.Request) { // If the block found does not have the correct MD5 hash, returns // DiskHashError. // -func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) { +func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) { + log := ctxlog.FromContext(ctx) + // Attempt to read the requested hash from a keep volume. errorToCaller := NotFoundError - for _, vol := range KeepVM.AllReadable() { + for _, vol := range volmgr.AllReadable() { size, err := vol.Get(ctx, hash, buf) select { case <-ctx.Done(): @@ -682,7 +695,7 @@ func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWr // volumes. If all volumes report IsNotExist, // we return a NotFoundError. if !os.IsNotExist(err) { - log.Printf("%s: Get(%s): %s", vol, hash, err) + log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol) } // If some volume returns a transient error, return it to the caller // instead of "Not found" so it can retry. @@ -692,131 +705,279 @@ func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWr continue } // Check the file checksum. - // filehash := fmt.Sprintf("%x", md5.Sum(buf[:size])) if filehash != hash { // TODO: Try harder to tell a sysadmin about // this. - log.Printf("%s: checksum mismatch for request %s (actual %s)", - vol, hash, filehash) + log.Errorf("checksum mismatch for block %s (actual %s), size %d on %s", hash, filehash, size, vol) errorToCaller = DiskHashError continue } if errorToCaller == DiskHashError { - log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned", - vol, hash) + log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol) } return size, nil } return 0, errorToCaller } -// PutBlock Stores the BLOCK (identified by the content id HASH) in Keep. -// -// PutBlock(ctx, block, hash) -// Stores the BLOCK (identified by the content id HASH) in Keep. -// -// The MD5 checksum of the block must be identical to the content id HASH. -// If not, an error is returned. +type putProgress struct { + classNeeded map[string]bool + classTodo map[string]bool + mountUsed map[*VolumeMount]bool + totalReplication int + classDone map[string]int +} + +// Number of distinct replicas stored. "2" can mean the block was +// stored on 2 different volumes with replication 1, or on 1 volume +// with replication 2. +func (pr putProgress) TotalReplication() string { + return strconv.Itoa(pr.totalReplication) +} + +// Number of replicas satisfying each storage class, formatted like +// "default=2; special=1". +func (pr putProgress) ClassReplication() string { + s := "" + for k, v := range pr.classDone { + if len(s) > 0 { + s += ", " + } + s += k + "=" + strconv.Itoa(v) + } + return s +} + +func (pr *putProgress) Add(mnt *VolumeMount) { + if pr.mountUsed[mnt] { + logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID) + return + } + pr.mountUsed[mnt] = true + pr.totalReplication += mnt.Replication + for class := range mnt.StorageClasses { + pr.classDone[class] += mnt.Replication + delete(pr.classTodo, class) + } +} + +func (pr *putProgress) Sub(mnt *VolumeMount) { + if !pr.mountUsed[mnt] { + logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID) + return + } + pr.mountUsed[mnt] = false + pr.totalReplication -= mnt.Replication + for class := range mnt.StorageClasses { + pr.classDone[class] -= mnt.Replication + if pr.classNeeded[class] { + pr.classTodo[class] = true + } + } +} + +func (pr *putProgress) Done() bool { + return len(pr.classTodo) == 0 && pr.totalReplication > 0 +} + +func (pr *putProgress) Want(mnt *VolumeMount) bool { + if pr.Done() || pr.mountUsed[mnt] { + return false + } + if len(pr.classTodo) == 0 { + // none specified == "any" + return true + } + for class := range mnt.StorageClasses { + if pr.classTodo[class] { + return true + } + } + return false +} + +func (pr *putProgress) Copy() *putProgress { + cp := putProgress{ + classNeeded: pr.classNeeded, + classTodo: make(map[string]bool, len(pr.classTodo)), + classDone: make(map[string]int, len(pr.classDone)), + mountUsed: make(map[*VolumeMount]bool, len(pr.mountUsed)), + totalReplication: pr.totalReplication, + } + for k, v := range pr.classTodo { + cp.classTodo[k] = v + } + for k, v := range pr.classDone { + cp.classDone[k] = v + } + for k, v := range pr.mountUsed { + cp.mountUsed[k] = v + } + return &cp +} + +func newPutProgress(classes []string) putProgress { + pr := putProgress{ + classNeeded: make(map[string]bool, len(classes)), + classTodo: make(map[string]bool, len(classes)), + classDone: map[string]int{}, + mountUsed: map[*VolumeMount]bool{}, + } + for _, c := range classes { + if c != "" { + pr.classNeeded[c] = true + pr.classTodo[c] = true + } + } + return pr +} + +// PutBlock stores the given block on one or more volumes. // -// PutBlock stores the BLOCK on the first Keep volume with free space. -// A failure code is returned to the user only if all volumes fail. +// The MD5 checksum of the block must match the given hash. // -// On success, PutBlock returns nil. -// On failure, it returns a KeepError with one of the following codes: +// The block is written to each writable volume (ordered by priority +// and then UUID, see volume.go) until at least one replica has been +// stored in each of the requested storage classes. // -// 500 Collision -// A different block with the same hash already exists on this -// Keep server. -// 422 MD5Fail -// The MD5 hash of the BLOCK does not match the argument HASH. -// 503 Full -// There was not enough space left in any Keep volume to store -// the object. -// 500 Fail -// The object could not be stored for some other reason (e.g. -// all writes failed). The text of the error message should -// provide as much detail as possible. +// The returned error, if any, is a KeepError with one of the +// following codes: // -func PutBlock(ctx context.Context, block []byte, hash string) (int, error) { +// 500 Collision +// A different block with the same hash already exists on this +// Keep server. +// 422 MD5Fail +// The MD5 hash of the BLOCK does not match the argument HASH. +// 503 Full +// There was not enough space left in any Keep volume to store +// the object. +// 500 Fail +// The object could not be stored for some other reason (e.g. +// all writes failed). The text of the error message should +// provide as much detail as possible. +func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) { + log := ctxlog.FromContext(ctx) + // Check that BLOCK's checksum matches HASH. blockhash := fmt.Sprintf("%x", md5.Sum(block)) if blockhash != hash { log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash) - return 0, RequestHashError + return putProgress{}, RequestHashError } + result := newPutProgress(wantStorageClasses) + // If we already have this data, it's intact on disk, and we // can update its timestamp, return success. If we have // different data with the same hash, return failure. - if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError { - return n, err - } else if ctx.Err() != nil { - return 0, ErrClientDisconnect + if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil || result.Done() { + return result, err } - - // Choose a Keep volume to write to. - // If this volume fails, try all of the volumes in order. - if vol := KeepVM.NextWritable(); vol != nil { - if err := vol.Put(ctx, hash, block); err == nil { - return vol.Replication(), nil // success! - } - if ctx.Err() != nil { - return 0, ErrClientDisconnect - } + if ctx.Err() != nil { + return result, ErrClientDisconnect } - writables := KeepVM.AllWritable() + writables := volmgr.NextWritable() if len(writables) == 0 { - log.Print("No writable volumes.") - return 0, FullError + log.Error("no writable volumes") + return result, FullError } - allFull := true - for _, vol := range writables { - err := vol.Put(ctx, hash, block) - if ctx.Err() != nil { - return 0, ErrClientDisconnect + var wg sync.WaitGroup + var mtx sync.Mutex + cond := sync.Cond{L: &mtx} + // pending predicts what result will be if all pending writes + // succeed. + pending := result.Copy() + var allFull atomic.Value + allFull.Store(true) + + // We hold the lock for the duration of the "each volume" loop + // below, except when it is released during cond.Wait(). + mtx.Lock() + + for _, mnt := range writables { + // Wait until our decision to use this mount does not + // depend on the outcome of pending writes. + for result.Want(mnt) && !pending.Want(mnt) { + cond.Wait() } - if err == nil { - return vol.Replication(), nil // success! - } - if err != FullError { - // The volume is not full but the - // write did not succeed. Report the - // error and continue trying. - allFull = false - log.Printf("%s: Write(%s): %s", vol, hash, err) + if !result.Want(mnt) { + continue } + mnt := mnt + pending.Add(mnt) + wg.Add(1) + go func() { + log.Debugf("PutBlock: start write to %s", mnt.UUID) + defer wg.Done() + err := mnt.Put(ctx, hash, block) + + mtx.Lock() + if err != nil { + log.Debugf("PutBlock: write to %s failed", mnt.UUID) + pending.Sub(mnt) + } else { + log.Debugf("PutBlock: write to %s succeeded", mnt.UUID) + result.Add(mnt) + } + cond.Broadcast() + mtx.Unlock() + + if err != nil && err != FullError && ctx.Err() == nil { + // The volume is not full but the + // write did not succeed. Report the + // error and continue trying. + allFull.Store(false) + log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash) + } + }() + } + mtx.Unlock() + wg.Wait() + if ctx.Err() != nil { + return result, ErrClientDisconnect + } + if result.Done() { + return result, nil } - if allFull { - log.Print("All volumes are full.") - return 0, FullError + if result.totalReplication > 0 { + // Some, but not all, of the storage classes were + // satisfied. This qualifies as success. + return result, nil + } else if allFull.Load().(bool) { + log.Error("all volumes with qualifying storage classes are full") + return putProgress{}, FullError + } else { + // Already logged the non-full errors. + return putProgress{}, GenericError } - // Already logged the non-full errors. - return 0, GenericError } -// CompareAndTouch returns the current replication level if one of the -// volumes already has the given content and it successfully updates -// the relevant block's modification time in order to protect it from -// premature garbage collection. Otherwise, it returns a non-nil -// error. -func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) { - var bestErr error = NotFoundError - for _, vol := range KeepVM.AllWritable() { - err := vol.Compare(ctx, hash, buf) +// CompareAndTouch looks for volumes where the given content already +// exists and its modification time can be updated (i.e., it is +// protected from garbage collection), and updates result accordingly. +// It returns when the result is Done() or all volumes have been +// checked. +func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putProgress) error { + log := ctxlog.FromContext(ctx) + for _, mnt := range volmgr.AllWritable() { + if !result.Want(mnt) { + continue + } + err := mnt.Compare(ctx, hash, buf) if ctx.Err() != nil { - return 0, ctx.Err() + return nil } else if err == CollisionError { // Stop if we have a block with same hash but // different content. (It will be impossible // to tell which one is wanted if we have // both, so there's no point writing it even // on a different volume.) - log.Printf("%s: Compare(%s): %s", vol, hash, err) - return 0, err + log.Errorf("collision in Compare(%s) on volume %s", hash, mnt.Volume) + return CollisionError } else if os.IsNotExist(err) { // Block does not exist. This is the only // "normal" error: we don't log anything. @@ -825,18 +986,20 @@ func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) // Couldn't open file, data is corrupt on // disk, etc.: log this abnormal condition, // and try the next volume. - log.Printf("%s: Compare(%s): %s", vol, hash, err) + log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume) continue } - if err := vol.Touch(hash); err != nil { - log.Printf("%s: Touch %s failed: %s", vol, hash, err) - bestErr = err + if err := mnt.Touch(hash); err != nil { + log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume) continue } // Compare and Touch both worked --> done. - return vol.Replication(), nil + result.Add(mnt) + if result.Done() { + return nil + } } - return 0, bestErr + return nil } var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`) @@ -863,27 +1026,15 @@ func GetAPIToken(req *http.Request) string { return "" } -// IsExpired returns true if the given Unix timestamp (expressed as a -// hexadecimal string) is in the past, or if timestampHex cannot be -// parsed as a hexadecimal string. -func IsExpired(timestampHex string) bool { - ts, err := strconv.ParseInt(timestampHex, 16, 0) - if err != nil { - log.Printf("IsExpired: %s", err) - return true - } - return time.Unix(ts, 0).Before(time.Now()) -} - -// CanDelete returns true if the user identified by apiToken is +// canDelete returns true if the user identified by apiToken is // allowed to delete blocks. -func CanDelete(apiToken string) bool { +func (rtr *router) canDelete(apiToken string) bool { if apiToken == "" { return false } // Blocks may be deleted only when Keep has been configured with a // data manager. - if IsSystemAuth(apiToken) { + if rtr.isSystemAuth(apiToken) { return true } // TODO(twp): look up apiToken with the API server @@ -892,8 +1043,8 @@ func CanDelete(apiToken string) bool { return false } -// IsSystemAuth returns true if the given token is allowed to perform +// isSystemAuth returns true if the given token is allowed to perform // system level actions like deleting data. -func IsSystemAuth(token string) bool { - return token != "" && token == theConfig.systemAuthToken +func (rtr *router) isSystemAuth(token string) bool { + return token != "" && token == rtr.cluster.SystemRootToken }