18264: Merge branch 'main' into 18264-cwl-test-running-improvements
[arvados.git] / services / keepstore / handlers.go
index 4459adfa1c1a6ffc20866fc9622344b300d2eb75..2a90705a56cbf0c1eec78a03dcd5b45463f84851 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepstore
 
 import (
        "container/list"
@@ -112,12 +112,9 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
 }
 
 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
-       ctx, cancel := contextForResponse(context.TODO(), resp)
-       defer cancel()
-
        locator := req.URL.Path[1:]
        if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
-               rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
+               rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr)
                return
        }
 
@@ -136,14 +133,14 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
        // isn't here, we can return 404 now instead of waiting for a
        // buffer.
 
-       buf, err := getBufferWithContext(ctx, bufs, BlockSize)
+       buf, err := getBufferWithContext(req.Context(), bufs, BlockSize)
        if err != nil {
                http.Error(resp, err.Error(), http.StatusServiceUnavailable)
                return
        }
        defer bufs.Put(buf)
 
-       size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
+       size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
        if err != nil {
                code := http.StatusInternalServerError
                if err, ok := err.(*KeepError); ok {
@@ -158,21 +155,6 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
        resp.Write(buf[:size])
 }
 
-// Return a new context that gets cancelled by resp's CloseNotifier.
-func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
-       ctx, cancel := context.WithCancel(parent)
-       if cn, ok := resp.(http.CloseNotifier); ok {
-               go func(c <-chan bool) {
-                       select {
-                       case <-c:
-                               cancel()
-                       case <-ctx.Done():
-                       }
-               }(cn.CloseNotify())
-       }
-       return ctx, cancel
-}
-
 // Get a buffer from the pool -- but give up and return a non-nil
 // error if ctx ends before we get a buffer.
 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
@@ -223,9 +205,6 @@ func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
 }
 
 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
-       ctx, cancel := contextForResponse(context.TODO(), resp)
-       defer cancel()
-
        hash := mux.Vars(req)["hash"]
 
        // Detect as many error conditions as possible before reading
@@ -262,7 +241,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
                }
        }
 
-       buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
+       buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength))
        if err != nil {
                http.Error(resp, err.Error(), http.StatusServiceUnavailable)
                return
@@ -275,7 +254,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses)
+       result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses)
        bufs.Put(buf)
 
        if err != nil {
@@ -853,33 +832,29 @@ func newPutProgress(classes []string) putProgress {
        return pr
 }
 
-// PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
-//
-// PutBlock(ctx, block, hash)
-//   Stores the BLOCK (identified by the content id HASH) in Keep.
-//
-//   The MD5 checksum of the block must be identical to the content id HASH.
-//   If not, an error is returned.
+// PutBlock stores the given block on one or more volumes.
 //
-//   PutBlock stores the BLOCK on the first Keep volume with free space.
-//   A failure code is returned to the user only if all volumes fail.
+// The MD5 checksum of the block must match the given hash.
 //
-//   On success, PutBlock returns nil.
-//   On failure, it returns a KeepError with one of the following codes:
+// The block is written to each writable volume (ordered by priority
+// and then UUID, see volume.go) until at least one replica has been
+// stored in each of the requested storage classes.
 //
-//   500 Collision
-//          A different block with the same hash already exists on this
-//          Keep server.
-//   422 MD5Fail
-//          The MD5 hash of the BLOCK does not match the argument HASH.
-//   503 Full
-//          There was not enough space left in any Keep volume to store
-//          the object.
-//   500 Fail
-//          The object could not be stored for some other reason (e.g.
-//          all writes failed). The text of the error message should
-//          provide as much detail as possible.
+// The returned error, if any, is a KeepError with one of the
+// following codes:
 //
+// 500 Collision
+//        A different block with the same hash already exists on this
+//        Keep server.
+// 422 MD5Fail
+//        The MD5 hash of the BLOCK does not match the argument HASH.
+// 503 Full
+//        There was not enough space left in any Keep volume to store
+//        the object.
+// 500 Fail
+//        The object could not be stored for some other reason (e.g.
+//        all writes failed). The text of the error message should
+//        provide as much detail as possible.
 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
        log := ctxlog.FromContext(ctx)
 
@@ -916,7 +891,11 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
        pending := result.Copy()
        var allFull atomic.Value
        allFull.Store(true)
+
+       // We hold the lock for the duration of the "each volume" loop
+       // below, except when it is released during cond.Wait().
        mtx.Lock()
+
        for _, mnt := range writables {
                // Wait until our decision to use this mount does not
                // depend on the outcome of pending writes.