17698: Do concurrent writes when multiple volumes are needed.
authorTom Clegg <tom@curii.com>
Fri, 13 Aug 2021 15:06:34 +0000 (11:06 -0400)
committerTom Clegg <tom@curii.com>
Fri, 13 Aug 2021 15:06:34 +0000 (11:06 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/volume.go

index 897447dd11c7a95a5b113d867fb0de28cbed6844..cbc83929de10eaa20edcfff8fdc35894e6fd043d 100644 (file)
@@ -23,6 +23,7 @@ import (
        "os"
        "sort"
        "strings"
+       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/lib/config"
@@ -367,6 +368,74 @@ func (s *HandlerSuite) TestReadsOrderedByStorageClassPriority(c *check.C) {
        }
 }
 
+func (s *HandlerSuite) TestConcurrentWritesToMultipleStorageClasses(c *check.C) {
+       s.cluster.Volumes = map[string]arvados.Volume{
+               "zzzzz-nyw5e-111111111111111": {
+                       Driver:         "mock",
+                       Replication:    1,
+                       StorageClasses: map[string]bool{"class1": true}},
+               "zzzzz-nyw5e-121212121212121": {
+                       Driver:         "mock",
+                       Replication:    1,
+                       StorageClasses: map[string]bool{"class1": true, "class2": true}},
+               "zzzzz-nyw5e-222222222222222": {
+                       Driver:         "mock",
+                       Replication:    1,
+                       StorageClasses: map[string]bool{"class2": true}},
+       }
+
+       for _, trial := range []struct {
+               setCounter uint32 // value to stuff vm.counter, to control offset
+               classes    string // desired classes
+               put111     int    // expected number of "put" ops on 11111... after 2x put reqs
+               put121     int    // expected number of "put" ops on 12121...
+               put222     int    // expected number of "put" ops on 22222...
+               cmp111     int    // expected number of "compare" ops on 11111... after 2x put reqs
+               cmp121     int    // expected number of "compare" ops on 12121...
+               cmp222     int    // expected number of "compare" ops on 22222...
+       }{
+               {0, "class1",
+                       1, 0, 0,
+                       2, 1, 0}, // first put compares on all vols with class2; second put succeeds after checking 121
+               {0, "class2",
+                       0, 1, 0,
+                       0, 2, 1}, // first put compares on all vols with class2; second put succeeds after checking 121
+               {0, "class1,class2",
+                       1, 1, 0,
+                       2, 2, 1}, // first put compares on all vols; second put succeeds after checking 111 and 121
+               {1, "class1,class2",
+                       0, 1, 0, // vm.counter offset is 1 so the first volume attempted is 121
+                       2, 2, 1}, // first put compares on all vols; second put succeeds after checking 111 and 121
+               {0, "class1,class2,class404",
+                       1, 1, 0,
+                       2, 2, 1}, // first put compares on all vols; second put doesn't compare on 222 because it already satisfied class2 on 121
+       } {
+               c.Logf("%+v", trial)
+               s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
+                       "class1": {},
+                       "class2": {},
+                       "class3": {},
+               }
+               c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
+               atomic.StoreUint32(&s.handler.volmgr.counter, trial.setCounter)
+               for i := 0; i < 2; i++ {
+                       IssueRequest(s.handler,
+                               &RequestTester{
+                                       method:         "PUT",
+                                       uri:            "/" + TestHash,
+                                       requestBody:    TestBlock,
+                                       storageClasses: trial.classes,
+                               })
+               }
+               c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-111111111111111"].Volume.(*MockVolume).CallCount("Put"), check.Equals, trial.put111)
+               c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-121212121212121"].Volume.(*MockVolume).CallCount("Put"), check.Equals, trial.put121)
+               c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-222222222222222"].Volume.(*MockVolume).CallCount("Put"), check.Equals, trial.put222)
+               c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-111111111111111"].Volume.(*MockVolume).CallCount("Compare"), check.Equals, trial.cmp111)
+               c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-121212121212121"].Volume.(*MockVolume).CallCount("Compare"), check.Equals, trial.cmp121)
+               c.Check(s.handler.volmgr.mountMap["zzzzz-nyw5e-222222222222222"].Volume.(*MockVolume).CallCount("Compare"), check.Equals, trial.cmp222)
+       }
+}
+
 // Test TOUCH requests.
 func (s *HandlerSuite) TestTouchHandler(c *check.C) {
        c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
index 2b469a13eb993e0827bac8ae1ebe4db46bc8c4df..81f7fcd123704f53f6399f75023bf82768ba387e 100644 (file)
@@ -18,6 +18,7 @@ import (
        "strconv"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
@@ -741,6 +742,7 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
 }
 
 type putProgress struct {
+       classNeeded      map[string]bool
        classTodo        map[string]bool
        mountUsed        map[*VolumeMount]bool
        totalReplication int
@@ -769,7 +771,7 @@ func (pr putProgress) ClassReplication() string {
 
 func (pr *putProgress) Add(mnt *VolumeMount) {
        if pr.mountUsed[mnt] {
-               logrus.Warnf("BUG? superfluous extra write to mount %s", mnt)
+               logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID)
                return
        }
        pr.mountUsed[mnt] = true
@@ -780,6 +782,21 @@ func (pr *putProgress) Add(mnt *VolumeMount) {
        }
 }
 
+func (pr *putProgress) Sub(mnt *VolumeMount) {
+       if !pr.mountUsed[mnt] {
+               logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID)
+               return
+       }
+       pr.mountUsed[mnt] = false
+       pr.totalReplication -= mnt.Replication
+       for class := range mnt.StorageClasses {
+               pr.classDone[class] -= mnt.Replication
+               if pr.classNeeded[class] {
+                       pr.classTodo[class] = true
+               }
+       }
+}
+
 func (pr *putProgress) Done() bool {
        return len(pr.classTodo) == 0 && pr.totalReplication > 0
 }
@@ -800,14 +817,36 @@ func (pr *putProgress) Want(mnt *VolumeMount) bool {
        return false
 }
 
+func (pr *putProgress) Copy() *putProgress {
+       cp := putProgress{
+               classNeeded:      pr.classNeeded,
+               classTodo:        make(map[string]bool, len(pr.classTodo)),
+               classDone:        make(map[string]int, len(pr.classDone)),
+               mountUsed:        make(map[*VolumeMount]bool, len(pr.mountUsed)),
+               totalReplication: pr.totalReplication,
+       }
+       for k, v := range pr.classTodo {
+               cp.classTodo[k] = v
+       }
+       for k, v := range pr.classDone {
+               cp.classDone[k] = v
+       }
+       for k, v := range pr.mountUsed {
+               cp.mountUsed[k] = v
+       }
+       return &cp
+}
+
 func newPutResult(classes []string) putProgress {
        pr := putProgress{
-               classTodo: make(map[string]bool, len(classes)),
-               classDone: map[string]int{},
-               mountUsed: map[*VolumeMount]bool{},
+               classNeeded: make(map[string]bool, len(classes)),
+               classTodo:   make(map[string]bool, len(classes)),
+               classDone:   map[string]int{},
+               mountUsed:   map[*VolumeMount]bool{},
        }
        for _, c := range classes {
                if c != "" {
+                       pr.classNeeded[c] = true
                        pr.classTodo[c] = true
                }
        }
@@ -856,67 +895,79 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
        // If we already have this data, it's intact on disk, and we
        // can update its timestamp, return success. If we have
        // different data with the same hash, return failure.
-       if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil {
+       if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil || result.Done() {
                return result, err
        }
        if ctx.Err() != nil {
                return result, ErrClientDisconnect
        }
 
-       // Choose a Keep volume to write to.
-       // If this volume fails, try all of the volumes in order.
-       if mnt := volmgr.NextWritable(); mnt == nil || !result.Want(mnt) {
-               // fall through to "try all volumes" below
-       } else if err := mnt.Put(ctx, hash, block); err != nil {
-               log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
-       } else {
-               result.Add(mnt)
-               if result.Done() {
-                       return result, nil
-               }
-       }
-       if ctx.Err() != nil {
-               return putProgress{}, ErrClientDisconnect
-       }
-
-       writables := volmgr.AllWritable()
+       writables := volmgr.NextWritable()
        if len(writables) == 0 {
                log.Error("no writable volumes")
-               return putProgress{}, FullError
-       }
-
-       allFull := true
+               return result, FullError
+       }
+
+       var wg sync.WaitGroup
+       var mtx sync.Mutex
+       cond := sync.Cond{L: &mtx}
+       // pending predicts what result will be if all pending writes
+       // succeed.
+       pending := result.Copy()
+       var allFull atomic.Value
+       allFull.Store(true)
+       mtx.Lock()
        for _, mnt := range writables {
+               // Wait until our decision to use this mount does not
+               // depend on the outcome of pending writes.
+               for result.Want(mnt) && !pending.Want(mnt) {
+                       cond.Wait()
+               }
                if !result.Want(mnt) {
                        continue
                }
-               err := mnt.Put(ctx, hash, block)
-               if ctx.Err() != nil {
-                       return result, ErrClientDisconnect
-               }
-               switch err {
-               case nil:
-                       result.Add(mnt)
-                       if result.Done() {
-                               return result, nil
+               mnt := mnt
+               pending.Add(mnt)
+               wg.Add(1)
+               go func() {
+                       log.Debugf("PutBlock: start write to %s", mnt.UUID)
+                       defer wg.Done()
+                       err := mnt.Put(ctx, hash, block)
+
+                       mtx.Lock()
+                       if err != nil {
+                               log.Debugf("PutBlock: write to %s failed", mnt.UUID)
+                               pending.Sub(mnt)
+                       } else {
+                               log.Debugf("PutBlock: write to %s succeeded", mnt.UUID)
+                               result.Add(mnt)
                        }
-                       continue
-               case FullError:
-                       continue
-               default:
-                       // The volume is not full but the
-                       // write did not succeed.  Report the
-                       // error and continue trying.
-                       allFull = false
-                       log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
-               }
+                       cond.Broadcast()
+                       mtx.Unlock()
+
+                       if err != nil && err != FullError && ctx.Err() == nil {
+                               // The volume is not full but the
+                               // write did not succeed.  Report the
+                               // error and continue trying.
+                               allFull.Store(false)
+                               log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
+                       }
+               }()
+       }
+       mtx.Unlock()
+       wg.Wait()
+       if ctx.Err() != nil {
+               return result, ErrClientDisconnect
+       }
+       if result.Done() {
+               return result, nil
        }
 
        if result.totalReplication > 0 {
                // Some, but not all, of the storage classes were
                // satisfied. This qualifies as success.
                return result, nil
-       } else if allFull {
+       } else if allFull.Load().(bool) {
                log.Error("all volumes with qualifying storage classes are full")
                return putProgress{}, FullError
        } else {
index 9bfc6ca3e5191d2953ceac75f915a07cab19c69f..878f690c9bff7a27f8f82e4516326598008c220c 100644 (file)
@@ -344,11 +344,11 @@ func makeRRVolumeManager(logger logrus.FieldLogger, cluster *arvados.Cluster, my
                        vm.writables = append(vm.writables, mnt)
                }
        }
-       // pri(i): return highest priority of any storage class
-       // offered by vm.readables[i]
-       pri := func(i int) int {
+       // pri(mnt): return highest priority of any storage class
+       // offered by mnt
+       pri := func(mnt *VolumeMount) int {
                any, best := false, 0
-               for class := range vm.readables[i].KeepMount.StorageClasses {
+               for class := range mnt.KeepMount.StorageClasses {
                        if p := cluster.StorageClasses[class].Priority; !any || best < p {
                                best = p
                                any = true
@@ -356,14 +356,20 @@ func makeRRVolumeManager(logger logrus.FieldLogger, cluster *arvados.Cluster, my
                }
                return best
        }
-       // sort vm.readables, first by highest priority of any offered
+       // less(a,b): sort first by highest priority of any offered
        // storage class (highest->lowest), then by volume UUID
-       sort.Slice(vm.readables, func(i, j int) bool {
-               if pi, pj := pri(i), pri(j); pi != pj {
-                       return pi > pj
+       less := func(a, b *VolumeMount) bool {
+               if pa, pb := pri(a), pri(b); pa != pb {
+                       return pa > pb
                } else {
-                       return vm.readables[i].KeepMount.UUID < vm.readables[j].KeepMount.UUID
+                       return a.KeepMount.UUID < b.KeepMount.UUID
                }
+       }
+       sort.Slice(vm.readables, func(i, j int) bool {
+               return less(vm.readables[i], vm.readables[j])
+       })
+       sort.Slice(vm.writables, func(i, j int) bool {
+               return less(vm.writables[i], vm.writables[j])
        })
        return vm, nil
 }
@@ -384,18 +390,19 @@ func (vm *RRVolumeManager) AllReadable() []*VolumeMount {
        return vm.readables
 }
 
-// AllWritable returns an array of all writable volumes
+// AllWritable returns writable volumes, sorted by priority/uuid. Used
+// by CompareAndTouch to ensure higher-priority volumes are checked
+// first.
 func (vm *RRVolumeManager) AllWritable() []*VolumeMount {
        return vm.writables
 }
 
-// NextWritable returns the next writable
-func (vm *RRVolumeManager) NextWritable() *VolumeMount {
-       if len(vm.writables) == 0 {
-               return nil
-       }
-       i := atomic.AddUint32(&vm.counter, 1)
-       return vm.writables[i%uint32(len(vm.writables))]
+// NextWritable returns writable volumes, rotated by vm.counter so
+// each volume gets a turn to be first. Used by PutBlock to distribute
+// new data across available volumes.
+func (vm *RRVolumeManager) NextWritable() []*VolumeMount {
+       offset := (int(atomic.AddUint32(&vm.counter, 1)) - 1) % len(vm.writables)
+       return append(append([]*VolumeMount(nil), vm.writables[offset:]...), vm.writables[:offset]...)
 }
 
 // VolumeStats returns an ioStats for the given volume.