X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e8dd5c58fdb7d07badb9389f8524e506c9c66f68..66a089434f38163273c3a5b9138f9c4347873b69:/services/keepstore/handlers.go diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index 4459adfa1c..2a90705a56 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepstore import ( "container/list" @@ -112,12 +112,9 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) { } func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) { - ctx, cancel := contextForResponse(context.TODO(), resp) - defer cancel() - locator := req.URL.Path[1:] if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") { - rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr) + rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr) return } @@ -136,14 +133,14 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) { // isn't here, we can return 404 now instead of waiting for a // buffer. - buf, err := getBufferWithContext(ctx, bufs, BlockSize) + buf, err := getBufferWithContext(req.Context(), bufs, BlockSize) if err != nil { http.Error(resp, err.Error(), http.StatusServiceUnavailable) return } defer bufs.Put(buf) - size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp) + size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp) if err != nil { code := http.StatusInternalServerError if err, ok := err.(*KeepError); ok { @@ -158,21 +155,6 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) { resp.Write(buf[:size]) } -// Return a new context that gets cancelled by resp's CloseNotifier. -func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) { - ctx, cancel := context.WithCancel(parent) - if cn, ok := resp.(http.CloseNotifier); ok { - go func(c <-chan bool) { - select { - case <-c: - cancel() - case <-ctx.Done(): - } - }(cn.CloseNotify()) - } - return ctx, cancel -} - // Get a buffer from the pool -- but give up and return a non-nil // error if ctx ends before we get a buffer. func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) { @@ -223,9 +205,6 @@ func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) { } func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { - ctx, cancel := contextForResponse(context.TODO(), resp) - defer cancel() - hash := mux.Vars(req)["hash"] // Detect as many error conditions as possible before reading @@ -262,7 +241,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { } } - buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength)) + buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength)) if err != nil { http.Error(resp, err.Error(), http.StatusServiceUnavailable) return @@ -275,7 +254,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) { return } - result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses) + result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses) bufs.Put(buf) if err != nil { @@ -853,33 +832,29 @@ func newPutProgress(classes []string) putProgress { return pr } -// PutBlock Stores the BLOCK (identified by the content id HASH) in Keep. -// -// PutBlock(ctx, block, hash) -// Stores the BLOCK (identified by the content id HASH) in Keep. -// -// The MD5 checksum of the block must be identical to the content id HASH. -// If not, an error is returned. +// PutBlock stores the given block on one or more volumes. // -// PutBlock stores the BLOCK on the first Keep volume with free space. -// A failure code is returned to the user only if all volumes fail. +// The MD5 checksum of the block must match the given hash. // -// On success, PutBlock returns nil. -// On failure, it returns a KeepError with one of the following codes: +// The block is written to each writable volume (ordered by priority +// and then UUID, see volume.go) until at least one replica has been +// stored in each of the requested storage classes. // -// 500 Collision -// A different block with the same hash already exists on this -// Keep server. -// 422 MD5Fail -// The MD5 hash of the BLOCK does not match the argument HASH. -// 503 Full -// There was not enough space left in any Keep volume to store -// the object. -// 500 Fail -// The object could not be stored for some other reason (e.g. -// all writes failed). The text of the error message should -// provide as much detail as possible. +// The returned error, if any, is a KeepError with one of the +// following codes: // +// 500 Collision +// A different block with the same hash already exists on this +// Keep server. +// 422 MD5Fail +// The MD5 hash of the BLOCK does not match the argument HASH. +// 503 Full +// There was not enough space left in any Keep volume to store +// the object. +// 500 Fail +// The object could not be stored for some other reason (e.g. +// all writes failed). The text of the error message should +// provide as much detail as possible. func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) { log := ctxlog.FromContext(ctx) @@ -916,7 +891,11 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s pending := result.Copy() var allFull atomic.Value allFull.Store(true) + + // We hold the lock for the duration of the "each volume" loop + // below, except when it is released during cond.Wait(). mtx.Lock() + for _, mnt := range writables { // Wait until our decision to use this mount does not // depend on the outcome of pending writes.