20735: Update to go 1.20.
[arvados.git] / services / keepstore / handlers.go
index 4459adfa1c1a6ffc20866fc9622344b300d2eb75..60fdde89c758c764df43c0e76e48277cf967a2f8 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepstore
 
 import (
        "container/list"
@@ -21,6 +21,7 @@ import (
        "sync/atomic"
        "time"
 
+       "git.arvados.org/arvados.git/lib/cmd"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/health"
@@ -112,12 +113,9 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
 }
 
 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
-       ctx, cancel := contextForResponse(context.TODO(), resp)
-       defer cancel()
-
        locator := req.URL.Path[1:]
        if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
-               rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
+               rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr)
                return
        }
 
@@ -136,14 +134,14 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
        // isn't here, we can return 404 now instead of waiting for a
        // buffer.
 
-       buf, err := getBufferWithContext(ctx, bufs, BlockSize)
+       buf, err := getBufferWithContext(req.Context(), bufs, BlockSize)
        if err != nil {
                http.Error(resp, err.Error(), http.StatusServiceUnavailable)
                return
        }
        defer bufs.Put(buf)
 
-       size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
+       size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
        if err != nil {
                code := http.StatusInternalServerError
                if err, ok := err.(*KeepError); ok {
@@ -158,21 +156,6 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
        resp.Write(buf[:size])
 }
 
-// Return a new context that gets cancelled by resp's CloseNotifier.
-func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
-       ctx, cancel := context.WithCancel(parent)
-       if cn, ok := resp.(http.CloseNotifier); ok {
-               go func(c <-chan bool) {
-                       select {
-                       case <-c:
-                               cancel()
-                       case <-ctx.Done():
-                       }
-               }(cn.CloseNotify())
-       }
-       return ctx, cancel
-}
-
 // Get a buffer from the pool -- but give up and return a non-nil
 // error if ctx ends before we get a buffer.
 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
@@ -223,9 +206,6 @@ func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
 }
 
 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
-       ctx, cancel := contextForResponse(context.TODO(), resp)
-       defer cancel()
-
        hash := mux.Vars(req)["hash"]
 
        // Detect as many error conditions as possible before reading
@@ -262,7 +242,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
                }
        }
 
-       buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
+       buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength))
        if err != nil {
                http.Error(resp, err.Error(), http.StatusServiceUnavailable)
                return
@@ -275,7 +255,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses)
+       result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses)
        bufs.Put(buf)
 
        if err != nil {
@@ -411,7 +391,7 @@ func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
 
 // populate the given NodeStatus struct with current values.
 func (rtr *router) readNodeStatus(st *NodeStatus) {
-       st.Version = version
+       st.Version = strings.SplitN(cmd.Version.String(), " ", 2)[0]
        vols := rtr.volmgr.AllReadable()
        if cap(st.Volumes) < len(vols) {
                st.Volumes = make([]*volumeStatusEnt, len(vols))
@@ -469,11 +449,10 @@ func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
 // Otherwise, the response code is 200 OK, with a response body
 // consisting of the JSON message
 //
-//    {"copies_deleted":d,"copies_failed":f}
+//     {"copies_deleted":d,"copies_failed":f}
 //
 // where d and f are integers representing the number of blocks that
 // were successfully and unsuccessfully deleted.
-//
 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
        hash := mux.Vars(req)["hash"]
 
@@ -694,7 +673,6 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
 //
 // If the block found does not have the correct MD5 hash, returns
 // DiskHashError.
-//
 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
        log := ctxlog.FromContext(ctx)
 
@@ -729,7 +707,7 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
                if filehash != hash {
                        // TODO: Try harder to tell a sysadmin about
                        // this.
-                       log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
+                       log.Errorf("checksum mismatch for block %s (actual %s), size %d on %s", hash, filehash, size, vol)
                        errorToCaller = DiskHashError
                        continue
                }
@@ -853,33 +831,36 @@ func newPutProgress(classes []string) putProgress {
        return pr
 }
 
-// PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
+// PutBlock stores the given block on one or more volumes.
 //
-// PutBlock(ctx, block, hash)
-//   Stores the BLOCK (identified by the content id HASH) in Keep.
+// The MD5 checksum of the block must match the given hash.
 //
-//   The MD5 checksum of the block must be identical to the content id HASH.
-//   If not, an error is returned.
+// The block is written to each writable volume (ordered by priority
+// and then UUID, see volume.go) until at least one replica has been
+// stored in each of the requested storage classes.
 //
-//   PutBlock stores the BLOCK on the first Keep volume with free space.
-//   A failure code is returned to the user only if all volumes fail.
+// The returned error, if any, is a KeepError with one of the
+// following codes:
 //
-//   On success, PutBlock returns nil.
-//   On failure, it returns a KeepError with one of the following codes:
+// 500 Collision
 //
-//   500 Collision
-//          A different block with the same hash already exists on this
-//          Keep server.
-//   422 MD5Fail
-//          The MD5 hash of the BLOCK does not match the argument HASH.
-//   503 Full
-//          There was not enough space left in any Keep volume to store
-//          the object.
-//   500 Fail
-//          The object could not be stored for some other reason (e.g.
-//          all writes failed). The text of the error message should
-//          provide as much detail as possible.
+//     A different block with the same hash already exists on this
+//     Keep server.
 //
+// 422 MD5Fail
+//
+//     The MD5 hash of the BLOCK does not match the argument HASH.
+//
+// 503 Full
+//
+//     There was not enough space left in any Keep volume to store
+//     the object.
+//
+// 500 Fail
+//
+//     The object could not be stored for some other reason (e.g.
+//     all writes failed). The text of the error message should
+//     provide as much detail as possible.
 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
        log := ctxlog.FromContext(ctx)
 
@@ -916,7 +897,11 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
        pending := result.Copy()
        var allFull atomic.Value
        allFull.Store(true)
+
+       // We hold the lock for the duration of the "each volume" loop
+       // below, except when it is released during cond.Wait().
        mtx.Lock()
+
        for _, mnt := range writables {
                // Wait until our decision to use this mount does not
                // depend on the outcome of pending writes.
@@ -996,7 +981,7 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
                        // to tell which one is wanted if we have
                        // both, so there's no point writing it even
                        // on a different volume.)
-                       log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
+                       log.Errorf("collision in Compare(%s) on volume %s", hash, mnt.Volume)
                        return CollisionError
                } else if os.IsNotExist(err) {
                        // Block does not exist. This is the only
@@ -1024,10 +1009,9 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
 
 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
-// IsValidLocator returns true if the specified string is a valid Keep locator.
-//   When Keep is extended to support hash types other than MD5,
-//   this should be updated to cover those as well.
-//
+// IsValidLocator returns true if the specified string is a valid Keep
+// locator.  When Keep is extended to support hash types other than
+// MD5, this should be updated to cover those as well.
 func IsValidLocator(loc string) bool {
        return validLocatorRe.MatchString(loc)
 }