X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6911361cec42d4858fb8e345e07bcf14d5e163b6..da83807d6bcef1c1f0bb78479c5ec17f150f5eda:/services/keepstore/handlers.go diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index 910033ebb1..abeb20fe86 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepstore import ( "container/list" @@ -21,6 +21,7 @@ import ( "sync/atomic" "time" + "git.arvados.org/arvados.git/lib/cmd" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/health" @@ -112,12 +113,9 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) { } func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) { - ctx, cancel := contextForResponse(context.TODO(), resp) - defer cancel() - locator := req.URL.Path[1:] if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") { - rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr) + rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr) return } @@ -136,14 +134,14 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) { // isn't here, we can return 404 now instead of waiting for a // buffer. - buf, err := getBufferWithContext(ctx, bufs, BlockSize) + buf, err := getBufferWithContext(req.Context(), bufs, BlockSize) if err != nil { http.Error(resp, err.Error(), http.StatusServiceUnavailable) return } defer bufs.Put(buf) - size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp) + size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp) if err != nil { code := http.StatusInternalServerError if err, ok := err.(*KeepError); ok { @@ -158,21 +156,6 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) { resp.Write(buf[:size]) } -// Return a new context that gets cancelled by resp's CloseNotifier. -func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) { - ctx, cancel := context.WithCancel(parent) - if cn, ok := resp.(http.CloseNotifier); ok { - go func(c <-chan bool) { - select { - case <-c: - cancel() - case <-ctx.Done(): - } - }(cn.CloseNotify()) - } - return ctx, cancel -} - // Get a buffer from the pool -- but give up and return a non-nil // error if ctx ends before we get a buffer. func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) { @@ -223,9 +206,6 @@ func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) { } func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { - ctx, cancel := contextForResponse(context.TODO(), resp) - defer cancel() - hash := mux.Vars(req)["hash"] // Detect as many error conditions as possible before reading @@ -262,7 +242,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { } } - buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength)) + buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength)) if err != nil { http.Error(resp, err.Error(), http.StatusServiceUnavailable) return @@ -275,7 +255,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { return } - result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses) + result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses) bufs.Put(buf) if err != nil { @@ -411,7 +391,7 @@ func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) { // populate the given NodeStatus struct with current values. func (rtr *router) readNodeStatus(st *NodeStatus) { - st.Version = version + st.Version = strings.SplitN(cmd.Version.String(), " ", 2)[0] vols := rtr.volmgr.AllReadable() if cap(st.Volumes) < len(vols) { st.Volumes = make([]*volumeStatusEnt, len(vols)) @@ -469,11 +449,10 @@ func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus { // Otherwise, the response code is 200 OK, with a response body // consisting of the JSON message // -// {"copies_deleted":d,"copies_failed":f} +// {"copies_deleted":d,"copies_failed":f} // // where d and f are integers representing the number of blocks that // were successfully and unsuccessfully deleted. -// func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) { hash := mux.Vars(req)["hash"] @@ -496,8 +475,10 @@ func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) { Deleted int `json:"copies_deleted"` Failed int `json:"copies_failed"` } - for _, vol := range rtr.volmgr.AllWritable() { - if err := vol.Trash(hash); err == nil { + for _, vol := range rtr.volmgr.Mounts() { + if !vol.KeepMount.AllowTrash { + continue + } else if err := vol.Trash(hash); err == nil { result.Deleted++ } else if os.IsNotExist(err) { continue @@ -694,7 +675,6 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) { // // If the block found does not have the correct MD5 hash, returns // DiskHashError. -// func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) { log := ctxlog.FromContext(ctx) @@ -729,7 +709,7 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b if filehash != hash { // TODO: Try harder to tell a sysadmin about // this. - log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol) + log.Errorf("checksum mismatch for block %s (actual %s), size %d on %s", hash, filehash, size, vol) errorToCaller = DiskHashError continue } @@ -865,17 +845,24 @@ func newPutProgress(classes []string) putProgress { // following codes: // // 500 Collision -// A different block with the same hash already exists on this -// Keep server. +// +// A different block with the same hash already exists on this +// Keep server. +// // 422 MD5Fail -// The MD5 hash of the BLOCK does not match the argument HASH. +// +// The MD5 hash of the BLOCK does not match the argument HASH. +// // 503 Full -// There was not enough space left in any Keep volume to store -// the object. +// +// There was not enough space left in any Keep volume to store +// the object. +// // 500 Fail -// The object could not be stored for some other reason (e.g. -// all writes failed). The text of the error message should -// provide as much detail as possible. +// +// The object could not be stored for some other reason (e.g. +// all writes failed). The text of the error message should +// provide as much detail as possible. func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) { log := ctxlog.FromContext(ctx) @@ -996,7 +983,7 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, // to tell which one is wanted if we have // both, so there's no point writing it even // on a different volume.) - log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume) + log.Errorf("collision in Compare(%s) on volume %s", hash, mnt.Volume) return CollisionError } else if os.IsNotExist(err) { // Block does not exist. This is the only @@ -1024,10 +1011,9 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`) -// IsValidLocator returns true if the specified string is a valid Keep locator. -// When Keep is extended to support hash types other than MD5, -// this should be updated to cover those as well. -// +// IsValidLocator returns true if the specified string is a valid Keep +// locator. When Keep is extended to support hash types other than +// MD5, this should be updated to cover those as well. func IsValidLocator(loc string) bool { return validLocatorRe.MatchString(loc) }