Refactor the multi-host salt install page.
[arvados.git] / services / keepstore / handlers.go
index eb0ea5ad2f133f3b8a569fa354255521f08c5965..63a23687ece8f8a60213791006a78712bd19a67b 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepstore
 
 import (
        "container/list"
@@ -18,8 +18,10 @@ import (
        "strconv"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
+       "git.arvados.org/arvados.git/lib/cmd"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/health"
@@ -111,12 +113,9 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
 }
 
 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
-       ctx, cancel := contextForResponse(context.TODO(), resp)
-       defer cancel()
-
        locator := req.URL.Path[1:]
        if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
-               rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
+               rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr)
                return
        }
 
@@ -135,14 +134,14 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
        // isn't here, we can return 404 now instead of waiting for a
        // buffer.
 
-       buf, err := getBufferWithContext(ctx, bufs, BlockSize)
+       buf, err := getBufferWithContext(req.Context(), bufs, BlockSize)
        if err != nil {
                http.Error(resp, err.Error(), http.StatusServiceUnavailable)
                return
        }
        defer bufs.Put(buf)
 
-       size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
+       size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
        if err != nil {
                code := http.StatusInternalServerError
                if err, ok := err.(*KeepError); ok {
@@ -157,21 +156,6 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
        resp.Write(buf[:size])
 }
 
-// Return a new context that gets cancelled by resp's CloseNotifier.
-func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
-       ctx, cancel := context.WithCancel(parent)
-       if cn, ok := resp.(http.CloseNotifier); ok {
-               go func(c <-chan bool) {
-                       select {
-                       case <-c:
-                               cancel()
-                       case <-ctx.Done():
-                       }
-               }(cn.CloseNotify())
-       }
-       return ctx, cancel
-}
-
 // Get a buffer from the pool -- but give up and return a non-nil
 // error if ctx ends before we get a buffer.
 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
@@ -222,9 +206,6 @@ func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
 }
 
 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
-       ctx, cancel := contextForResponse(context.TODO(), resp)
-       defer cancel()
-
        hash := mux.Vars(req)["hash"]
 
        // Detect as many error conditions as possible before reading
@@ -246,7 +227,22 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
+       var wantStorageClasses []string
+       if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" {
+               wantStorageClasses = strings.Split(hdr, ",")
+               for i, sc := range wantStorageClasses {
+                       wantStorageClasses[i] = strings.TrimSpace(sc)
+               }
+       } else {
+               // none specified -- use configured default
+               for class, cfg := range rtr.cluster.StorageClasses {
+                       if cfg.Default {
+                               wantStorageClasses = append(wantStorageClasses, class)
+                       }
+               }
+       }
+
+       buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength))
        if err != nil {
                http.Error(resp, err.Error(), http.StatusServiceUnavailable)
                return
@@ -259,7 +255,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       replication, err := PutBlock(ctx, rtr.volmgr, buf, hash)
+       result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses)
        bufs.Put(buf)
 
        if err != nil {
@@ -279,7 +275,8 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
                expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
                returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
        }
-       resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
+       resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
+       resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
        resp.Write([]byte(returnHash + "\n"))
 }
 
@@ -394,7 +391,7 @@ func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
 
 // populate the given NodeStatus struct with current values.
 func (rtr *router) readNodeStatus(st *NodeStatus) {
-       st.Version = version
+       st.Version = strings.SplitN(cmd.Version.String(), " ", 2)[0]
        vols := rtr.volmgr.AllReadable()
        if cap(st.Volumes) < len(vols) {
                st.Volumes = make([]*volumeStatusEnt, len(vols))
@@ -712,7 +709,7 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
                if filehash != hash {
                        // TODO: Try harder to tell a sysadmin about
                        // this.
-                       log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
+                       log.Errorf("checksum mismatch for block %s (actual %s), size %d on %s", hash, filehash, size, vol)
                        errorToCaller = DiskHashError
                        continue
                }
@@ -724,119 +721,263 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
        return 0, errorToCaller
 }
 
-// PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
-//
-// PutBlock(ctx, block, hash)
-//   Stores the BLOCK (identified by the content id HASH) in Keep.
-//
-//   The MD5 checksum of the block must be identical to the content id HASH.
-//   If not, an error is returned.
+type putProgress struct {
+       classNeeded      map[string]bool
+       classTodo        map[string]bool
+       mountUsed        map[*VolumeMount]bool
+       totalReplication int
+       classDone        map[string]int
+}
+
+// Number of distinct replicas stored. "2" can mean the block was
+// stored on 2 different volumes with replication 1, or on 1 volume
+// with replication 2.
+func (pr putProgress) TotalReplication() string {
+       return strconv.Itoa(pr.totalReplication)
+}
+
+// Number of replicas satisfying each storage class, formatted like
+// "default=2; special=1".
+func (pr putProgress) ClassReplication() string {
+       s := ""
+       for k, v := range pr.classDone {
+               if len(s) > 0 {
+                       s += ", "
+               }
+               s += k + "=" + strconv.Itoa(v)
+       }
+       return s
+}
+
+func (pr *putProgress) Add(mnt *VolumeMount) {
+       if pr.mountUsed[mnt] {
+               logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID)
+               return
+       }
+       pr.mountUsed[mnt] = true
+       pr.totalReplication += mnt.Replication
+       for class := range mnt.StorageClasses {
+               pr.classDone[class] += mnt.Replication
+               delete(pr.classTodo, class)
+       }
+}
+
+func (pr *putProgress) Sub(mnt *VolumeMount) {
+       if !pr.mountUsed[mnt] {
+               logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID)
+               return
+       }
+       pr.mountUsed[mnt] = false
+       pr.totalReplication -= mnt.Replication
+       for class := range mnt.StorageClasses {
+               pr.classDone[class] -= mnt.Replication
+               if pr.classNeeded[class] {
+                       pr.classTodo[class] = true
+               }
+       }
+}
+
+func (pr *putProgress) Done() bool {
+       return len(pr.classTodo) == 0 && pr.totalReplication > 0
+}
+
+func (pr *putProgress) Want(mnt *VolumeMount) bool {
+       if pr.Done() || pr.mountUsed[mnt] {
+               return false
+       }
+       if len(pr.classTodo) == 0 {
+               // none specified == "any"
+               return true
+       }
+       for class := range mnt.StorageClasses {
+               if pr.classTodo[class] {
+                       return true
+               }
+       }
+       return false
+}
+
+func (pr *putProgress) Copy() *putProgress {
+       cp := putProgress{
+               classNeeded:      pr.classNeeded,
+               classTodo:        make(map[string]bool, len(pr.classTodo)),
+               classDone:        make(map[string]int, len(pr.classDone)),
+               mountUsed:        make(map[*VolumeMount]bool, len(pr.mountUsed)),
+               totalReplication: pr.totalReplication,
+       }
+       for k, v := range pr.classTodo {
+               cp.classTodo[k] = v
+       }
+       for k, v := range pr.classDone {
+               cp.classDone[k] = v
+       }
+       for k, v := range pr.mountUsed {
+               cp.mountUsed[k] = v
+       }
+       return &cp
+}
+
+func newPutProgress(classes []string) putProgress {
+       pr := putProgress{
+               classNeeded: make(map[string]bool, len(classes)),
+               classTodo:   make(map[string]bool, len(classes)),
+               classDone:   map[string]int{},
+               mountUsed:   map[*VolumeMount]bool{},
+       }
+       for _, c := range classes {
+               if c != "" {
+                       pr.classNeeded[c] = true
+                       pr.classTodo[c] = true
+               }
+       }
+       return pr
+}
+
+// PutBlock stores the given block on one or more volumes.
 //
-//   PutBlock stores the BLOCK on the first Keep volume with free space.
-//   A failure code is returned to the user only if all volumes fail.
+// The MD5 checksum of the block must match the given hash.
 //
-//   On success, PutBlock returns nil.
-//   On failure, it returns a KeepError with one of the following codes:
+// The block is written to each writable volume (ordered by priority
+// and then UUID, see volume.go) until at least one replica has been
+// stored in each of the requested storage classes.
 //
-//   500 Collision
-//          A different block with the same hash already exists on this
-//          Keep server.
-//   422 MD5Fail
-//          The MD5 hash of the BLOCK does not match the argument HASH.
-//   503 Full
-//          There was not enough space left in any Keep volume to store
-//          the object.
-//   500 Fail
-//          The object could not be stored for some other reason (e.g.
-//          all writes failed). The text of the error message should
-//          provide as much detail as possible.
+// The returned error, if any, is a KeepError with one of the
+// following codes:
 //
-func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
+// 500 Collision
+//        A different block with the same hash already exists on this
+//        Keep server.
+// 422 MD5Fail
+//        The MD5 hash of the BLOCK does not match the argument HASH.
+// 503 Full
+//        There was not enough space left in any Keep volume to store
+//        the object.
+// 500 Fail
+//        The object could not be stored for some other reason (e.g.
+//        all writes failed). The text of the error message should
+//        provide as much detail as possible.
+func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
        log := ctxlog.FromContext(ctx)
 
        // Check that BLOCK's checksum matches HASH.
        blockhash := fmt.Sprintf("%x", md5.Sum(block))
        if blockhash != hash {
                log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
-               return 0, RequestHashError
+               return putProgress{}, RequestHashError
        }
 
+       result := newPutProgress(wantStorageClasses)
+
        // If we already have this data, it's intact on disk, and we
        // can update its timestamp, return success. If we have
        // different data with the same hash, return failure.
-       if n, err := CompareAndTouch(ctx, volmgr, hash, block); err == nil || err == CollisionError {
-               return n, err
-       } else if ctx.Err() != nil {
-               return 0, ErrClientDisconnect
-       }
-
-       // Choose a Keep volume to write to.
-       // If this volume fails, try all of the volumes in order.
-       if mnt := volmgr.NextWritable(); mnt != nil {
-               if err := mnt.Put(ctx, hash, block); err != nil {
-                       log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
-               } else {
-                       return mnt.Replication, nil // success!
-               }
+       if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil || result.Done() {
+               return result, err
        }
        if ctx.Err() != nil {
-               return 0, ErrClientDisconnect
+               return result, ErrClientDisconnect
        }
 
-       writables := volmgr.AllWritable()
+       writables := volmgr.NextWritable()
        if len(writables) == 0 {
                log.Error("no writable volumes")
-               return 0, FullError
+               return result, FullError
        }
 
-       allFull := true
-       for _, vol := range writables {
-               err := vol.Put(ctx, hash, block)
-               if ctx.Err() != nil {
-                       return 0, ErrClientDisconnect
+       var wg sync.WaitGroup
+       var mtx sync.Mutex
+       cond := sync.Cond{L: &mtx}
+       // pending predicts what result will be if all pending writes
+       // succeed.
+       pending := result.Copy()
+       var allFull atomic.Value
+       allFull.Store(true)
+
+       // We hold the lock for the duration of the "each volume" loop
+       // below, except when it is released during cond.Wait().
+       mtx.Lock()
+
+       for _, mnt := range writables {
+               // Wait until our decision to use this mount does not
+               // depend on the outcome of pending writes.
+               for result.Want(mnt) && !pending.Want(mnt) {
+                       cond.Wait()
                }
-               switch err {
-               case nil:
-                       return vol.Replication, nil // success!
-               case FullError:
+               if !result.Want(mnt) {
                        continue
-               default:
-                       // The volume is not full but the
-                       // write did not succeed.  Report the
-                       // error and continue trying.
-                       allFull = false
-                       log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash)
                }
+               mnt := mnt
+               pending.Add(mnt)
+               wg.Add(1)
+               go func() {
+                       log.Debugf("PutBlock: start write to %s", mnt.UUID)
+                       defer wg.Done()
+                       err := mnt.Put(ctx, hash, block)
+
+                       mtx.Lock()
+                       if err != nil {
+                               log.Debugf("PutBlock: write to %s failed", mnt.UUID)
+                               pending.Sub(mnt)
+                       } else {
+                               log.Debugf("PutBlock: write to %s succeeded", mnt.UUID)
+                               result.Add(mnt)
+                       }
+                       cond.Broadcast()
+                       mtx.Unlock()
+
+                       if err != nil && err != FullError && ctx.Err() == nil {
+                               // The volume is not full but the
+                               // write did not succeed.  Report the
+                               // error and continue trying.
+                               allFull.Store(false)
+                               log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
+                       }
+               }()
+       }
+       mtx.Unlock()
+       wg.Wait()
+       if ctx.Err() != nil {
+               return result, ErrClientDisconnect
+       }
+       if result.Done() {
+               return result, nil
        }
 
-       if allFull {
-               log.Error("all volumes are full")
-               return 0, FullError
+       if result.totalReplication > 0 {
+               // Some, but not all, of the storage classes were
+               // satisfied. This qualifies as success.
+               return result, nil
+       } else if allFull.Load().(bool) {
+               log.Error("all volumes with qualifying storage classes are full")
+               return putProgress{}, FullError
+       } else {
+               // Already logged the non-full errors.
+               return putProgress{}, GenericError
        }
-       // Already logged the non-full errors.
-       return 0, GenericError
 }
 
-// CompareAndTouch returns the current replication level if one of the
-// volumes already has the given content and it successfully updates
-// the relevant block's modification time in order to protect it from
-// premature garbage collection. Otherwise, it returns a non-nil
-// error.
-func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
+// CompareAndTouch looks for volumes where the given content already
+// exists and its modification time can be updated (i.e., it is
+// protected from garbage collection), and updates result accordingly.
+// It returns when the result is Done() or all volumes have been
+// checked.
+func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putProgress) error {
        log := ctxlog.FromContext(ctx)
-       var bestErr error = NotFoundError
        for _, mnt := range volmgr.AllWritable() {
+               if !result.Want(mnt) {
+                       continue
+               }
                err := mnt.Compare(ctx, hash, buf)
                if ctx.Err() != nil {
-                       return 0, ctx.Err()
+                       return nil
                } else if err == CollisionError {
                        // Stop if we have a block with same hash but
                        // different content. (It will be impossible
                        // to tell which one is wanted if we have
                        // both, so there's no point writing it even
                        // on a different volume.)
-                       log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
-                       return 0, err
+                       log.Errorf("collision in Compare(%s) on volume %s", hash, mnt.Volume)
+                       return CollisionError
                } else if os.IsNotExist(err) {
                        // Block does not exist. This is the only
                        // "normal" error: we don't log anything.
@@ -850,13 +991,15 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
                }
                if err := mnt.Touch(hash); err != nil {
                        log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
-                       bestErr = err
                        continue
                }
                // Compare and Touch both worked --> done.
-               return mnt.Replication, nil
+               result.Add(mnt)
+               if result.Done() {
+                       return nil
+               }
        }
-       return 0, bestErr
+       return nil
 }
 
 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)