From bf4166939c77771642af846cb5372efc8a78659a Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Mon, 26 Jul 2021 15:28:01 -0400 Subject: [PATCH] 17574: Update replication_confirmed fields after keep-balance run. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- sdk/go/arvadostest/fixtures.go | 5 +- services/keep-balance/balance.go | 6 +- services/keep-balance/block_state.go | 50 +++++++++++ services/keep-balance/block_state_test.go | 94 ++++++++++++++++++++ services/keep-balance/collection.go | 103 +++++++++++++++++++++- services/keep-balance/integration_test.go | 22 ++++- 6 files changed, 276 insertions(+), 4 deletions(-) create mode 100644 services/keep-balance/block_state_test.go diff --git a/sdk/go/arvadostest/fixtures.go b/sdk/go/arvadostest/fixtures.go index 4b7ad6dd59..d770ca76d1 100644 --- a/sdk/go/arvadostest/fixtures.go +++ b/sdk/go/arvadostest/fixtures.go @@ -31,7 +31,10 @@ const ( UserAgreementPDH = "b519d9cb706a29fc7ea24dbea2f05851+93" HelloWorldPdh = "55713e6a34081eb03609e7ad5fcad129+62" - MultilevelCollection1 = "zzzzz-4zz18-pyw8yp9g3pr7irn" + MultilevelCollection1 = "zzzzz-4zz18-pyw8yp9g3pr7irn" + StorageClassesDesiredDefaultConfirmedDefault = "zzzzz-4zz18-3t236wr12769tga" + StorageClassesDesiredArchiveConfirmedDefault = "zzzzz-4zz18-3t236wr12769qqa" + EmptyCollectionUUID = "zzzzz-4zz18-gs9ooj1h9sd5mde" AProjectUUID = "zzzzz-j7d0g-v955i6s2oi1cbso" ASubprojectUUID = "zzzzz-j7d0g-axqo7eu9pwvna1x" diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index 86423a2976..a7dcf61902 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -167,7 +167,11 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp } if runOptions.CommitTrash { err = bal.CommitTrash(ctx, client) + if err != nil { + return + } } + err = bal.updateCollections(ctx, client, cluster) return } @@ -460,7 +464,7 @@ func (bal *Balancer) addCollection(coll arvados.Collection) error { if coll.ReplicationDesired != nil { repl = *coll.ReplicationDesired } - bal.Logger.Debugf("%v: %d block x%d", coll.UUID, len(blkids), repl) + bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl) // Pass pdh to IncreaseDesired only if LostBlocksFile is being // written -- otherwise it's just a waste of memory. pdh := "" diff --git a/services/keep-balance/block_state.go b/services/keep-balance/block_state.go index 029f8c6c07..e30b4ff794 100644 --- a/services/keep-balance/block_state.go +++ b/services/keep-balance/block_state.go @@ -133,3 +133,53 @@ func (bsm *BlockStateMap) IncreaseDesired(pdh string, classes []string, n int, b bsm.get(blkid).increaseDesired(pdh, classes, n) } } + +// GetConfirmedReplication returns the replication level of the given +// blocks, considering only the specified storage classes. +// +// If len(classes)==0, returns the replication level without regard to +// storage classes. +// +// Safe to call concurrently with other calls to GetCurrent, but not +// with different BlockStateMap methods. +func (bsm *BlockStateMap) GetConfirmedReplication(blkids []arvados.SizedDigest, classes []string) int { + defaultClasses := map[string]bool{"default": true} + min := 0 + for _, blkid := range blkids { + total := 0 + perclass := make(map[string]int, len(classes)) + for _, c := range classes { + perclass[c] = 0 + } + for _, r := range bsm.get(blkid).Replicas { + total += r.KeepMount.Replication + mntclasses := r.KeepMount.StorageClasses + if len(mntclasses) == 0 { + mntclasses = defaultClasses + } + for c := range mntclasses { + n, ok := perclass[c] + if !ok { + // Don't care about this storage class + continue + } + perclass[c] = n + r.KeepMount.Replication + } + } + if total == 0 { + return 0 + } + for _, n := range perclass { + if n == 0 { + return 0 + } + if n < min || min == 0 { + min = n + } + } + if len(perclass) == 0 && (min == 0 || min > total) { + min = total + } + } + return min +} diff --git a/services/keep-balance/block_state_test.go b/services/keep-balance/block_state_test.go new file mode 100644 index 0000000000..aaf2c18e29 --- /dev/null +++ b/services/keep-balance/block_state_test.go @@ -0,0 +1,94 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package main + +import ( + "time" + + "git.arvados.org/arvados.git/sdk/go/arvados" + check "gopkg.in/check.v1" +) + +var _ = check.Suite(&confirmedReplicationSuite{}) + +type confirmedReplicationSuite struct { + blockStateMap *BlockStateMap + mtime int64 +} + +func (s *confirmedReplicationSuite) SetUpTest(c *check.C) { + t, _ := time.Parse(time.RFC3339Nano, time.RFC3339Nano) + s.mtime = t.UnixNano() + s.blockStateMap = NewBlockStateMap() + s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{ + Replication: 1, + StorageClasses: map[string]bool{"default": true}, + }}, []arvados.KeepServiceIndexEntry{ + {SizedDigest: knownBlkid(10), Mtime: s.mtime}, + }) + s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{ + Replication: 2, + StorageClasses: map[string]bool{"default": true}, + }}, []arvados.KeepServiceIndexEntry{ + {SizedDigest: knownBlkid(20), Mtime: s.mtime}, + }) +} + +func (s *confirmedReplicationSuite) TestZeroReplication(c *check.C) { + n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(404), knownBlkid(409)}, []string{"default"}) + c.Check(n, check.Equals, 0) + n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(10), knownBlkid(404)}, []string{"default"}) + c.Check(n, check.Equals, 0) + n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(10), knownBlkid(404)}, nil) + c.Check(n, check.Equals, 0) +} + +func (s *confirmedReplicationSuite) TestBlocksWithDifferentReplication(c *check.C) { + n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(10), knownBlkid(20)}, []string{"default"}) + c.Check(n, check.Equals, 1) +} + +func (s *confirmedReplicationSuite) TestBlocksInDifferentClasses(c *check.C) { + s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{ + Replication: 3, + StorageClasses: map[string]bool{"three": true}, + }}, []arvados.KeepServiceIndexEntry{ + {SizedDigest: knownBlkid(30), Mtime: s.mtime}, + }) + + n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(30)}, []string{"three"}) + c.Check(n, check.Equals, 3) + n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(20), knownBlkid(30)}, []string{"default"}) + c.Check(n, check.Equals, 0) // block 30 has repl 0 @ "default" + n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(20), knownBlkid(30)}, []string{"three"}) + c.Check(n, check.Equals, 0) // block 20 has repl 0 @ "three" + n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(20), knownBlkid(30)}, nil) + c.Check(n, check.Equals, 2) +} + +func (s *confirmedReplicationSuite) TestBlocksOnMultipleMounts(c *check.C) { + s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{ + Replication: 2, + StorageClasses: map[string]bool{"default": true, "four": true}, + }}, []arvados.KeepServiceIndexEntry{ + {SizedDigest: knownBlkid(40), Mtime: s.mtime}, + {SizedDigest: knownBlkid(41), Mtime: s.mtime}, + }) + s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{ + Replication: 2, + StorageClasses: map[string]bool{"four": true}, + }}, []arvados.KeepServiceIndexEntry{ + {SizedDigest: knownBlkid(40), Mtime: s.mtime}, + {SizedDigest: knownBlkid(41), Mtime: s.mtime}, + }) + n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, []string{"default"}) + c.Check(n, check.Equals, 2) + n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, []string{"four"}) + c.Check(n, check.Equals, 4) + n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, []string{"default", "four"}) + c.Check(n, check.Equals, 2) + n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, nil) + c.Check(n, check.Equals, 4) +} diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go index 1659918caf..3afb1ccc55 100644 --- a/services/keep-balance/collection.go +++ b/services/keep-balance/collection.go @@ -6,10 +6,16 @@ package main import ( "context" + "encoding/json" "fmt" + "io" + "runtime" + "sync" + "sync/atomic" "time" "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/jmoiron/sqlx" ) func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) { @@ -65,7 +71,7 @@ func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func Limit: &limit, Order: "modified_at, uuid", Count: "none", - Select: []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired"}, + Select: []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired", "storage_classes_desired", "is_trashed"}, IncludeTrash: true, IncludeOldVersions: true, } @@ -165,3 +171,98 @@ func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func return nil } + +func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, cluster *arvados.Cluster) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + defer bal.time("update_collections", "wall clock time to update collections")() + threshold := time.Now() + thresholdStr := threshold.Format(time.RFC3339Nano) + + var err error + collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers) + go func() { + defer close(collQ) + err = EachCollection(ctx, c, cluster.Collections.BalanceCollectionBatch, func(coll arvados.Collection) error { + if coll.ModifiedAt.After(threshold) { + return io.EOF + } + if coll.IsTrashed { + return nil + } + collQ <- coll + return nil + }, func(done, total int) { + bal.logf("update collections: %d/%d", done, total) + }) + if err == io.EOF { + err = nil + } else if err != nil { + bal.logf("error updating collections: %s", err) + } + }() + + db, err := bal.db(cluster) + if err != nil { + return err + } + + var updated int64 + var wg sync.WaitGroup + for i := 0; i < runtime.NumCPU(); i++ { + wg.Add(1) + go func() { + defer wg.Done() + for coll := range collQ { + blkids, err := coll.SizedDigests() + if err != nil { + bal.logf("%s: %s", coll.UUID, err) + continue + } + repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired) + tx, err := db.Beginx() + if err != nil { + bal.logf("error opening transaction: %s", coll.UUID, err) + cancel() + continue + } + classes, _ := json.Marshal(coll.StorageClassesDesired) + _, err = tx.ExecContext(ctx, `update collections set + replication_confirmed=$1, + replication_confirmed_at=$2, + storage_classes_confirmed=$3, + storage_classes_confirmed_at=$2 + where uuid=$4`, + repl, thresholdStr, classes, coll.UUID) + if err != nil { + tx.Rollback() + } else { + err = tx.Commit() + } + if err != nil { + bal.logf("%s: update failed: %s", coll.UUID, err) + continue + } + atomic.AddInt64(&updated, 1) + } + }() + } + wg.Wait() + bal.logf("updated %d collections", updated) + return err +} + +func (bal *Balancer) db(cluster *arvados.Cluster) (*sqlx.DB, error) { + db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String()) + if err != nil { + return nil, err + } + if p := cluster.PostgreSQL.ConnectionPool; p > 0 { + db.SetMaxOpenConns(p) + } + if err := db.Ping(); err != nil { + return nil, fmt.Errorf("postgresql connect succeeded but ping failed: %s", err) + } + return db, nil +} diff --git a/services/keep-balance/integration_test.go b/services/keep-balance/integration_test.go index defabd9a10..e4297bfe6d 100644 --- a/services/keep-balance/integration_test.go +++ b/services/keep-balance/integration_test.go @@ -6,6 +6,7 @@ package main import ( "bytes" + "io" "os" "strings" "testing" @@ -81,7 +82,7 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) { for iter := 0; iter < 20; iter++ { logBuf.Reset() logger := logrus.New() - logger.Out = &logBuf + logger.Out = io.MultiWriter(&logBuf, os.Stderr) opts := RunOptions{ CommitPulls: true, CommitTrash: true, @@ -105,4 +106,23 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) { time.Sleep(200 * time.Millisecond) } c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*0 replicas (0 blocks, 0 bytes) underreplicated.*`) + + for _, trial := range []struct { + uuid string + repl int + classes []string + }{ + {arvadostest.EmptyCollectionUUID, 0, []string{"default"}}, + {arvadostest.FooCollection, 4, []string{"default"}}, // "foo" blk + {arvadostest.StorageClassesDesiredDefaultConfirmedDefault, 2, []string{"default"}}, // "bar" blk + {arvadostest.StorageClassesDesiredArchiveConfirmedDefault, 0, []string{"archive"}}, // "bar" blk + } { + c.Logf("%#v", trial) + var coll arvados.Collection + s.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+trial.uuid, nil, nil) + if c.Check(coll.ReplicationConfirmed, check.NotNil) { + c.Check(*coll.ReplicationConfirmed, check.Equals, trial.repl) + } + c.Check(coll.StorageClassesConfirmed, check.DeepEquals, trial.classes) + } } -- 2.30.2