17574: Update replication_confirmed fields after keep-balance run.
authorTom Clegg <tom@curii.com>
Mon, 26 Jul 2021 19:28:01 +0000 (15:28 -0400)
committerTom Clegg <tom@curii.com>
Mon, 26 Jul 2021 19:28:01 +0000 (15:28 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

sdk/go/arvadostest/fixtures.go
services/keep-balance/balance.go
services/keep-balance/block_state.go
services/keep-balance/block_state_test.go [new file with mode: 0644]
services/keep-balance/collection.go
services/keep-balance/integration_test.go

index 4b7ad6dd59fa426e8b1e71c546ee43a851d99c54..d770ca76d1876ed56274a89435c4322db536fbfa 100644 (file)
@@ -31,7 +31,10 @@ const (
        UserAgreementPDH        = "b519d9cb706a29fc7ea24dbea2f05851+93"
        HelloWorldPdh           = "55713e6a34081eb03609e7ad5fcad129+62"
 
-       MultilevelCollection1 = "zzzzz-4zz18-pyw8yp9g3pr7irn"
+       MultilevelCollection1                        = "zzzzz-4zz18-pyw8yp9g3pr7irn"
+       StorageClassesDesiredDefaultConfirmedDefault = "zzzzz-4zz18-3t236wr12769tga"
+       StorageClassesDesiredArchiveConfirmedDefault = "zzzzz-4zz18-3t236wr12769qqa"
+       EmptyCollectionUUID                          = "zzzzz-4zz18-gs9ooj1h9sd5mde"
 
        AProjectUUID    = "zzzzz-j7d0g-v955i6s2oi1cbso"
        ASubprojectUUID = "zzzzz-j7d0g-axqo7eu9pwvna1x"
index 86423a2976b1e0909470bd563c486aee894743af..a7dcf61902d830d7b389d8bc36eee7f90a178c21 100644 (file)
@@ -167,7 +167,11 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
        }
        if runOptions.CommitTrash {
                err = bal.CommitTrash(ctx, client)
+               if err != nil {
+                       return
+               }
        }
+       err = bal.updateCollections(ctx, client, cluster)
        return
 }
 
@@ -460,7 +464,7 @@ func (bal *Balancer) addCollection(coll arvados.Collection) error {
        if coll.ReplicationDesired != nil {
                repl = *coll.ReplicationDesired
        }
-       bal.Logger.Debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
+       bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
        // Pass pdh to IncreaseDesired only if LostBlocksFile is being
        // written -- otherwise it's just a waste of memory.
        pdh := ""
index 029f8c6c0790337f561d679c8c9d21cf33ff502b..e30b4ff7943d4c4a041ec71924000a48a856c4d4 100644 (file)
@@ -133,3 +133,53 @@ func (bsm *BlockStateMap) IncreaseDesired(pdh string, classes []string, n int, b
                bsm.get(blkid).increaseDesired(pdh, classes, n)
        }
 }
+
+// GetConfirmedReplication returns the replication level of the given
+// blocks, considering only the specified storage classes.
+//
+// If len(classes)==0, returns the replication level without regard to
+// storage classes.
+//
+// Safe to call concurrently with other calls to GetCurrent, but not
+// with different BlockStateMap methods.
+func (bsm *BlockStateMap) GetConfirmedReplication(blkids []arvados.SizedDigest, classes []string) int {
+       defaultClasses := map[string]bool{"default": true}
+       min := 0
+       for _, blkid := range blkids {
+               total := 0
+               perclass := make(map[string]int, len(classes))
+               for _, c := range classes {
+                       perclass[c] = 0
+               }
+               for _, r := range bsm.get(blkid).Replicas {
+                       total += r.KeepMount.Replication
+                       mntclasses := r.KeepMount.StorageClasses
+                       if len(mntclasses) == 0 {
+                               mntclasses = defaultClasses
+                       }
+                       for c := range mntclasses {
+                               n, ok := perclass[c]
+                               if !ok {
+                                       // Don't care about this storage class
+                                       continue
+                               }
+                               perclass[c] = n + r.KeepMount.Replication
+                       }
+               }
+               if total == 0 {
+                       return 0
+               }
+               for _, n := range perclass {
+                       if n == 0 {
+                               return 0
+                       }
+                       if n < min || min == 0 {
+                               min = n
+                       }
+               }
+               if len(perclass) == 0 && (min == 0 || min > total) {
+                       min = total
+               }
+       }
+       return min
+}
diff --git a/services/keep-balance/block_state_test.go b/services/keep-balance/block_state_test.go
new file mode 100644 (file)
index 0000000..aaf2c18
--- /dev/null
@@ -0,0 +1,94 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "time"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&confirmedReplicationSuite{})
+
+type confirmedReplicationSuite struct {
+       blockStateMap *BlockStateMap
+       mtime         int64
+}
+
+func (s *confirmedReplicationSuite) SetUpTest(c *check.C) {
+       t, _ := time.Parse(time.RFC3339Nano, time.RFC3339Nano)
+       s.mtime = t.UnixNano()
+       s.blockStateMap = NewBlockStateMap()
+       s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+               Replication:    1,
+               StorageClasses: map[string]bool{"default": true},
+       }}, []arvados.KeepServiceIndexEntry{
+               {SizedDigest: knownBlkid(10), Mtime: s.mtime},
+       })
+       s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+               Replication:    2,
+               StorageClasses: map[string]bool{"default": true},
+       }}, []arvados.KeepServiceIndexEntry{
+               {SizedDigest: knownBlkid(20), Mtime: s.mtime},
+       })
+}
+
+func (s *confirmedReplicationSuite) TestZeroReplication(c *check.C) {
+       n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(404), knownBlkid(409)}, []string{"default"})
+       c.Check(n, check.Equals, 0)
+       n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(10), knownBlkid(404)}, []string{"default"})
+       c.Check(n, check.Equals, 0)
+       n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(10), knownBlkid(404)}, nil)
+       c.Check(n, check.Equals, 0)
+}
+
+func (s *confirmedReplicationSuite) TestBlocksWithDifferentReplication(c *check.C) {
+       n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(10), knownBlkid(20)}, []string{"default"})
+       c.Check(n, check.Equals, 1)
+}
+
+func (s *confirmedReplicationSuite) TestBlocksInDifferentClasses(c *check.C) {
+       s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+               Replication:    3,
+               StorageClasses: map[string]bool{"three": true},
+       }}, []arvados.KeepServiceIndexEntry{
+               {SizedDigest: knownBlkid(30), Mtime: s.mtime},
+       })
+
+       n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(30)}, []string{"three"})
+       c.Check(n, check.Equals, 3)
+       n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(20), knownBlkid(30)}, []string{"default"})
+       c.Check(n, check.Equals, 0) // block 30 has repl 0 @ "default"
+       n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(20), knownBlkid(30)}, []string{"three"})
+       c.Check(n, check.Equals, 0) // block 20 has repl 0 @ "three"
+       n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(20), knownBlkid(30)}, nil)
+       c.Check(n, check.Equals, 2)
+}
+
+func (s *confirmedReplicationSuite) TestBlocksOnMultipleMounts(c *check.C) {
+       s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+               Replication:    2,
+               StorageClasses: map[string]bool{"default": true, "four": true},
+       }}, []arvados.KeepServiceIndexEntry{
+               {SizedDigest: knownBlkid(40), Mtime: s.mtime},
+               {SizedDigest: knownBlkid(41), Mtime: s.mtime},
+       })
+       s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+               Replication:    2,
+               StorageClasses: map[string]bool{"four": true},
+       }}, []arvados.KeepServiceIndexEntry{
+               {SizedDigest: knownBlkid(40), Mtime: s.mtime},
+               {SizedDigest: knownBlkid(41), Mtime: s.mtime},
+       })
+       n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, []string{"default"})
+       c.Check(n, check.Equals, 2)
+       n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, []string{"four"})
+       c.Check(n, check.Equals, 4)
+       n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, []string{"default", "four"})
+       c.Check(n, check.Equals, 2)
+       n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, nil)
+       c.Check(n, check.Equals, 4)
+}
index 1659918cafe20c62162abfb1e841a40f80170c0a..3afb1ccc5500ccba647940b8f31254a1cd117861 100644 (file)
@@ -6,10 +6,16 @@ package main
 
 import (
        "context"
+       "encoding/json"
        "fmt"
+       "io"
+       "runtime"
+       "sync"
+       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/jmoiron/sqlx"
 )
 
 func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
@@ -65,7 +71,7 @@ func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func
                Limit:              &limit,
                Order:              "modified_at, uuid",
                Count:              "none",
-               Select:             []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
+               Select:             []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired", "storage_classes_desired", "is_trashed"},
                IncludeTrash:       true,
                IncludeOldVersions: true,
        }
@@ -165,3 +171,98 @@ func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func
 
        return nil
 }
+
+func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, cluster *arvados.Cluster) error {
+       ctx, cancel := context.WithCancel(ctx)
+       defer cancel()
+
+       defer bal.time("update_collections", "wall clock time to update collections")()
+       threshold := time.Now()
+       thresholdStr := threshold.Format(time.RFC3339Nano)
+
+       var err error
+       collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
+       go func() {
+               defer close(collQ)
+               err = EachCollection(ctx, c, cluster.Collections.BalanceCollectionBatch, func(coll arvados.Collection) error {
+                       if coll.ModifiedAt.After(threshold) {
+                               return io.EOF
+                       }
+                       if coll.IsTrashed {
+                               return nil
+                       }
+                       collQ <- coll
+                       return nil
+               }, func(done, total int) {
+                       bal.logf("update collections: %d/%d", done, total)
+               })
+               if err == io.EOF {
+                       err = nil
+               } else if err != nil {
+                       bal.logf("error updating collections: %s", err)
+               }
+       }()
+
+       db, err := bal.db(cluster)
+       if err != nil {
+               return err
+       }
+
+       var updated int64
+       var wg sync.WaitGroup
+       for i := 0; i < runtime.NumCPU(); i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for coll := range collQ {
+                               blkids, err := coll.SizedDigests()
+                               if err != nil {
+                                       bal.logf("%s: %s", coll.UUID, err)
+                                       continue
+                               }
+                               repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
+                               tx, err := db.Beginx()
+                               if err != nil {
+                                       bal.logf("error opening transaction: %s", coll.UUID, err)
+                                       cancel()
+                                       continue
+                               }
+                               classes, _ := json.Marshal(coll.StorageClassesDesired)
+                               _, err = tx.ExecContext(ctx, `update collections set
+                                       replication_confirmed=$1,
+                                       replication_confirmed_at=$2,
+                                       storage_classes_confirmed=$3,
+                                       storage_classes_confirmed_at=$2
+                                       where uuid=$4`,
+                                       repl, thresholdStr, classes, coll.UUID)
+                               if err != nil {
+                                       tx.Rollback()
+                               } else {
+                                       err = tx.Commit()
+                               }
+                               if err != nil {
+                                       bal.logf("%s: update failed: %s", coll.UUID, err)
+                                       continue
+                               }
+                               atomic.AddInt64(&updated, 1)
+                       }
+               }()
+       }
+       wg.Wait()
+       bal.logf("updated %d collections", updated)
+       return err
+}
+
+func (bal *Balancer) db(cluster *arvados.Cluster) (*sqlx.DB, error) {
+       db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+       if err != nil {
+               return nil, err
+       }
+       if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
+               db.SetMaxOpenConns(p)
+       }
+       if err := db.Ping(); err != nil {
+               return nil, fmt.Errorf("postgresql connect succeeded but ping failed: %s", err)
+       }
+       return db, nil
+}
index defabd9a109f27b348b61aeccb2ed822d2fa862e..e4297bfe6d353f00c741545747f1a4b520d5ba66 100644 (file)
@@ -6,6 +6,7 @@ package main
 
 import (
        "bytes"
+       "io"
        "os"
        "strings"
        "testing"
@@ -81,7 +82,7 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
        for iter := 0; iter < 20; iter++ {
                logBuf.Reset()
                logger := logrus.New()
-               logger.Out = &logBuf
+               logger.Out = io.MultiWriter(&logBuf, os.Stderr)
                opts := RunOptions{
                        CommitPulls: true,
                        CommitTrash: true,
@@ -105,4 +106,23 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
                time.Sleep(200 * time.Millisecond)
        }
        c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*0 replicas (0 blocks, 0 bytes) underreplicated.*`)
+
+       for _, trial := range []struct {
+               uuid    string
+               repl    int
+               classes []string
+       }{
+               {arvadostest.EmptyCollectionUUID, 0, []string{"default"}},
+               {arvadostest.FooCollection, 4, []string{"default"}},                                // "foo" blk
+               {arvadostest.StorageClassesDesiredDefaultConfirmedDefault, 2, []string{"default"}}, // "bar" blk
+               {arvadostest.StorageClassesDesiredArchiveConfirmedDefault, 0, []string{"archive"}}, // "bar" blk
+       } {
+               c.Logf("%#v", trial)
+               var coll arvados.Collection
+               s.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+trial.uuid, nil, nil)
+               if c.Check(coll.ReplicationConfirmed, check.NotNil) {
+                       c.Check(*coll.ReplicationConfirmed, check.Equals, trial.repl)
+               }
+               c.Check(coll.StorageClassesConfirmed, check.DeepEquals, trial.classes)
+       }
 }