X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f0553505e32ee00999d1d680da14260a9a0f6b99..f04693da1811e670d4cbb981debeecf14d79137c:/services/keepstore/handlers.go diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index e4f025d6b1..86504422d5 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -11,6 +11,7 @@ import ( "encoding/json" "fmt" "io" + "log" "net/http" "os" "regexp" @@ -21,28 +22,36 @@ import ( "time" "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/ctxlog" "git.curoverse.com/arvados.git/sdk/go/health" "git.curoverse.com/arvados.git/sdk/go/httpserver" "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" ) type router struct { *mux.Router - limiter httpserver.RequestCounter cluster *arvados.Cluster + logger logrus.FieldLogger remoteProxy remoteProxy - registry *prometheus.Registry - metrics nodeMetrics + metrics *nodeMetrics + volmgr *RRVolumeManager + pullq *WorkQueue + trashq *WorkQueue } // MakeRESTRouter returns a new router that forwards all Keep requests // to the appropriate handlers. -func MakeRESTRouter(cluster *arvados.Cluster) http.Handler { +func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler { rtr := &router{ - Router: mux.NewRouter(), - cluster: cluster, - registry: prometheus.NewRegistry(), + Router: mux.NewRouter(), + cluster: cluster, + logger: ctxlog.FromContext(ctx), + metrics: &nodeMetrics{reg: reg}, + volmgr: volmgr, + pullq: pullq, + trashq: trashq, } rtr.HandleFunc( @@ -52,12 +61,12 @@ func MakeRESTRouter(cluster *arvados.Cluster) http.Handler { rtr.handleGET).Methods("GET", "HEAD") rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT") - rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE") + rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE") // List all blocks stored here. Privileged client only. - rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD") + rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD") // List blocks stored here whose hash has the given prefix. // Privileged client only. - rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD") + rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD") // Internals/debugging info (runtime.MemStats) rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD") @@ -67,20 +76,20 @@ func MakeRESTRouter(cluster *arvados.Cluster) http.Handler { // List mounts: UUID, readonly, tier, device ID, ... rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET") - rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET") - rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET") + rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET") + rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET") // Replace the current pull queue. - rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT") + rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT") // Replace the current trash queue. - rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT") + rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT") // Untrash moves blocks from trash back into store - rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT") + rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT") rtr.Handle("/_health/{check}", &health.Handler{ - Token: theConfig.ManagementToken, + Token: cluster.ManagementToken, Prefix: "/_health/", }).Methods("GET") @@ -88,16 +97,11 @@ func MakeRESTRouter(cluster *arvados.Cluster) http.Handler { // 400 Bad Request. rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler) - rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr) - rtr.metrics = nodeMetrics{ - reg: rtr.registry, - rc: rtr.limiter, - } - rtr.metrics.setup() + rtr.metrics.setupBufferPoolMetrics(bufs) + rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull") + rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash") - instrumented := httpserver.Instrument(rtr.registry, nil, - httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter))) - return instrumented.ServeAPI(theConfig.ManagementToken, instrumented) + return rtr } // BadRequestHandler is a HandleFunc to address bad requests. @@ -111,13 +115,13 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) { locator := req.URL.Path[1:] if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") { - rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster) + rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr) return } - if theConfig.RequireSignatures { + if rtr.cluster.Collections.BlobSigning { locator := req.URL.Path[1:] // strip leading slash - if err := VerifySignature(locator, GetAPIToken(req)); err != nil { + if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil { http.Error(resp, err.Error(), err.(*KeepError).HTTPCode) return } @@ -137,7 +141,7 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) { } defer bufs.Put(buf) - size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp) + size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp) if err != nil { code := http.StatusInternalServerError if err, ok := err.(*KeepError); ok { @@ -159,7 +163,6 @@ func contextForResponse(parent context.Context, resp http.ResponseWriter) (conte go func(c <-chan bool) { select { case <-c: - theConfig.debugLogf("cancel context") cancel() case <-ctx.Done(): } @@ -209,7 +212,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { return } - if len(KeepVM.AllWritable()) == 0 { + if len(rtr.volmgr.AllWritable()) == 0 { http.Error(resp, FullError.Error(), FullError.HTTPCode) return } @@ -227,7 +230,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { return } - replication, err := PutBlock(ctx, buf, hash) + replication, err := PutBlock(ctx, rtr.volmgr, buf, hash) bufs.Put(buf) if err != nil { @@ -243,9 +246,9 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { // return it to the client. returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength) apiToken := GetAPIToken(req) - if theConfig.blobSigningKey != nil && apiToken != "" { - expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration()) - returnHash = SignLocator(returnHash, apiToken, expiry) + if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" { + expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration()) + returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry) } resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication)) resp.Write([]byte(returnHash + "\n")) @@ -253,8 +256,8 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { // IndexHandler responds to "/index", "/index/{prefix}", and // "/mounts/{uuid}/blocks" requests. -func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) { - if !IsSystemAuth(GetAPIToken(req)) { +func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) { + if !rtr.isSystemAuth(GetAPIToken(req)) { http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode) return } @@ -267,25 +270,31 @@ func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) { uuid := mux.Vars(req)["uuid"] - var vols []Volume + var vols []*VolumeMount if uuid == "" { - vols = KeepVM.AllReadable() - } else if v := KeepVM.Lookup(uuid, false); v == nil { + vols = rtr.volmgr.AllReadable() + } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil { http.Error(resp, "mount not found", http.StatusNotFound) return } else { - vols = []Volume{v} + vols = []*VolumeMount{mnt} } for _, v := range vols { if err := v.IndexTo(prefix, resp); err != nil { - // The only errors returned by IndexTo are - // write errors returned by resp.Write(), - // which probably means the client has - // disconnected and this error will never be - // reported to the client -- but it will - // appear in our own error log. - http.Error(resp, err.Error(), http.StatusInternalServerError) + // We can't send an error message to the + // client because we might have already sent + // headers and index content. All we can do is + // log the error in our own logs, and (in + // cases where headers haven't been sent yet) + // set a 500 status. + // + // If headers have already been sent, the + // client must notice the lack of trailing + // newline as an indication that the response + // is incomplete. + log.Printf("index error from volume %s: %s", v, err) + http.Error(resp, "", http.StatusInternalServerError) return } } @@ -296,9 +305,9 @@ func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) { // MountsHandler responds to "GET /mounts" requests. func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) { - err := json.NewEncoder(resp).Encode(KeepVM.Mounts()) + err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts()) if err != nil { - http.Error(resp, err.Error(), http.StatusInternalServerError) + httpserver.Error(resp, err.Error(), http.StatusInternalServerError) } } @@ -361,32 +370,28 @@ func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) { // populate the given NodeStatus struct with current values. func (rtr *router) readNodeStatus(st *NodeStatus) { st.Version = version - vols := KeepVM.AllReadable() + vols := rtr.volmgr.AllReadable() if cap(st.Volumes) < len(vols) { st.Volumes = make([]*volumeStatusEnt, len(vols)) } st.Volumes = st.Volumes[:0] for _, vol := range vols { var internalStats interface{} - if vol, ok := vol.(InternalStatser); ok { + if vol, ok := vol.Volume.(InternalStatser); ok { internalStats = vol.InternalStats() } st.Volumes = append(st.Volumes, &volumeStatusEnt{ Label: vol.String(), Status: vol.Status(), InternalStats: internalStats, - //VolumeStats: KeepVM.VolumeStats(vol), + //VolumeStats: rtr.volmgr.VolumeStats(vol), }) } st.BufferPool.Alloc = bufs.Alloc() st.BufferPool.Cap = bufs.Cap() st.BufferPool.Len = bufs.Len() - st.PullQueue = getWorkQueueStatus(pullq) - st.TrashQueue = getWorkQueueStatus(trashq) - if rtr.limiter != nil { - st.RequestsCurrent = rtr.limiter.Current() - st.RequestsMax = rtr.limiter.Max() - } + st.PullQueue = getWorkQueueStatus(rtr.pullq) + st.TrashQueue = getWorkQueueStatus(rtr.trashq) } // return a WorkQueueStatus for the given queue. If q is nil (which @@ -400,7 +405,7 @@ func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus { return q.Status() } -// DeleteHandler processes DELETE requests. +// handleDELETE processes DELETE requests. // // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash // from all connected volumes. @@ -411,7 +416,7 @@ func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus { // a PermissionError. // // Upon receiving a valid request from an authorized user, -// DeleteHandler deletes all copies of the specified block on local +// handleDELETE deletes all copies of the specified block on local // writable volumes. // // Response format: @@ -427,17 +432,17 @@ func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus { // where d and f are integers representing the number of blocks that // were successfully and unsuccessfully deleted. // -func DeleteHandler(resp http.ResponseWriter, req *http.Request) { +func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) { hash := mux.Vars(req)["hash"] // Confirm that this user is an admin and has a token with unlimited scope. var tok = GetAPIToken(req) - if tok == "" || !CanDelete(tok) { + if tok == "" || !rtr.canDelete(tok) { http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode) return } - if !theConfig.EnableDelete { + if !rtr.cluster.Collections.BlobTrash { http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode) return } @@ -449,7 +454,7 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) { Deleted int `json:"copies_deleted"` Failed int `json:"copies_failed"` } - for _, vol := range KeepVM.AllWritable() { + for _, vol := range rtr.volmgr.AllWritable() { if err := vol.Trash(hash); err == nil { result.Deleted++ } else if os.IsNotExist(err) { @@ -523,9 +528,9 @@ type PullRequest struct { } // PullHandler processes "PUT /pull" requests for the data manager. -func PullHandler(resp http.ResponseWriter, req *http.Request) { +func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) { // Reject unauthorized requests. - if !IsSystemAuth(GetAPIToken(req)) { + if !rtr.isSystemAuth(GetAPIToken(req)) { http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode) return } @@ -549,7 +554,7 @@ func PullHandler(resp http.ResponseWriter, req *http.Request) { for _, p := range pr { plist.PushBack(p) } - pullq.ReplaceQueue(plist) + rtr.pullq.ReplaceQueue(plist) } // TrashRequest consists of a block locator and its Mtime @@ -562,9 +567,9 @@ type TrashRequest struct { } // TrashHandler processes /trash requests. -func TrashHandler(resp http.ResponseWriter, req *http.Request) { +func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) { // Reject unauthorized requests. - if !IsSystemAuth(GetAPIToken(req)) { + if !rtr.isSystemAuth(GetAPIToken(req)) { http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode) return } @@ -588,27 +593,27 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) { for _, t := range trash { tlist.PushBack(t) } - trashq.ReplaceQueue(tlist) + rtr.trashq.ReplaceQueue(tlist) } // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager. -func UntrashHandler(resp http.ResponseWriter, req *http.Request) { +func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) { // Reject unauthorized requests. - if !IsSystemAuth(GetAPIToken(req)) { + if !rtr.isSystemAuth(GetAPIToken(req)) { http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode) return } hash := mux.Vars(req)["hash"] - if len(KeepVM.AllWritable()) == 0 { + if len(rtr.volmgr.AllWritable()) == 0 { http.Error(resp, "No writable volumes", http.StatusNotFound) return } var untrashedOn, failedOn []string var numNotFound int - for _, vol := range KeepVM.AllWritable() { + for _, vol := range rtr.volmgr.AllWritable() { err := vol.Untrash(hash) if os.IsNotExist(err) { @@ -622,12 +627,12 @@ func UntrashHandler(resp http.ResponseWriter, req *http.Request) { } } - if numNotFound == len(KeepVM.AllWritable()) { + if numNotFound == len(rtr.volmgr.AllWritable()) { http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound) return } - if len(failedOn) == len(KeepVM.AllWritable()) { + if len(failedOn) == len(rtr.volmgr.AllWritable()) { http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError) } else { respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",") @@ -657,11 +662,11 @@ func UntrashHandler(resp http.ResponseWriter, req *http.Request) { // If the block found does not have the correct MD5 hash, returns // DiskHashError. // -func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) { +func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) { // Attempt to read the requested hash from a keep volume. errorToCaller := NotFoundError - for _, vol := range KeepVM.AllReadable() { + for _, vol := range volmgr.AllReadable() { size, err := vol.Get(ctx, hash, buf) select { case <-ctx.Done(): @@ -677,6 +682,11 @@ func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWr if !os.IsNotExist(err) { log.Printf("%s: Get(%s): %s", vol, hash, err) } + // If some volume returns a transient error, return it to the caller + // instead of "Not found" so it can retry. + if err == VolumeBusyError { + errorToCaller = err.(*KeepError) + } continue } // Check the file checksum. @@ -726,7 +736,7 @@ func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWr // all writes failed). The text of the error message should // provide as much detail as possible. // -func PutBlock(ctx context.Context, block []byte, hash string) (int, error) { +func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) { // Check that BLOCK's checksum matches HASH. blockhash := fmt.Sprintf("%x", md5.Sum(block)) if blockhash != hash { @@ -737,7 +747,7 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) { // If we already have this data, it's intact on disk, and we // can update its timestamp, return success. If we have // different data with the same hash, return failure. - if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError { + if n, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError { return n, err } else if ctx.Err() != nil { return 0, ErrClientDisconnect @@ -745,16 +755,16 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) { // Choose a Keep volume to write to. // If this volume fails, try all of the volumes in order. - if vol := KeepVM.NextWritable(); vol != nil { - if err := vol.Put(ctx, hash, block); err == nil { - return vol.Replication(), nil // success! + if mnt := volmgr.NextWritable(); mnt != nil { + if err := mnt.Put(ctx, hash, block); err == nil { + return mnt.Replication, nil // success! } if ctx.Err() != nil { return 0, ErrClientDisconnect } } - writables := KeepVM.AllWritable() + writables := volmgr.AllWritable() if len(writables) == 0 { log.Print("No writable volumes.") return 0, FullError @@ -767,7 +777,7 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) { return 0, ErrClientDisconnect } if err == nil { - return vol.Replication(), nil // success! + return vol.Replication, nil // success! } if err != FullError { // The volume is not full but the @@ -791,10 +801,10 @@ func PutBlock(ctx context.Context, block []byte, hash string) (int, error) { // the relevant block's modification time in order to protect it from // premature garbage collection. Otherwise, it returns a non-nil // error. -func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) { +func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) { var bestErr error = NotFoundError - for _, vol := range KeepVM.AllWritable() { - err := vol.Compare(ctx, hash, buf) + for _, mnt := range volmgr.AllWritable() { + err := mnt.Compare(ctx, hash, buf) if ctx.Err() != nil { return 0, ctx.Err() } else if err == CollisionError { @@ -803,7 +813,7 @@ func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) // to tell which one is wanted if we have // both, so there's no point writing it even // on a different volume.) - log.Printf("%s: Compare(%s): %s", vol, hash, err) + log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err) return 0, err } else if os.IsNotExist(err) { // Block does not exist. This is the only @@ -813,16 +823,16 @@ func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) // Couldn't open file, data is corrupt on // disk, etc.: log this abnormal condition, // and try the next volume. - log.Printf("%s: Compare(%s): %s", vol, hash, err) + log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err) continue } - if err := vol.Touch(hash); err != nil { - log.Printf("%s: Touch %s failed: %s", vol, hash, err) + if err := mnt.Touch(hash); err != nil { + log.Printf("%s: Touch %s failed: %s", mnt.Volume, hash, err) bestErr = err continue } // Compare and Touch both worked --> done. - return vol.Replication(), nil + return mnt.Replication, nil } return 0, bestErr } @@ -863,15 +873,15 @@ func IsExpired(timestampHex string) bool { return time.Unix(ts, 0).Before(time.Now()) } -// CanDelete returns true if the user identified by apiToken is +// canDelete returns true if the user identified by apiToken is // allowed to delete blocks. -func CanDelete(apiToken string) bool { +func (rtr *router) canDelete(apiToken string) bool { if apiToken == "" { return false } // Blocks may be deleted only when Keep has been configured with a // data manager. - if IsSystemAuth(apiToken) { + if rtr.isSystemAuth(apiToken) { return true } // TODO(twp): look up apiToken with the API server @@ -880,8 +890,8 @@ func CanDelete(apiToken string) bool { return false } -// IsSystemAuth returns true if the given token is allowed to perform +// isSystemAuth returns true if the given token is allowed to perform // system level actions like deleting data. -func IsSystemAuth(token string) bool { - return token != "" && token == theConfig.systemAuthToken +func (rtr *router) isSystemAuth(token string) bool { + return token != "" && token == rtr.cluster.SystemRootToken }