17574: Merge branch 'main'
authorTom Clegg <tom@tomclegg.ca>
Wed, 4 Aug 2021 14:02:15 +0000 (10:02 -0400)
committerTom Clegg <tom@tomclegg.ca>
Wed, 4 Aug 2021 14:02:15 +0000 (10:02 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

19 files changed:
doc/admin/upgrading.html.textile.liquid
doc/install/install-keep-balance.html.textile.liquid
lib/config/config.default.yml
lib/config/export.go
lib/config/generated_config.go
lib/service/cmd.go
sdk/go/arvados/collection.go
sdk/go/arvados/config.go
sdk/go/arvadostest/fixtures.go
services/keep-balance/balance.go
services/keep-balance/balance_run_test.go
services/keep-balance/block_state.go
services/keep-balance/block_state_test.go [new file with mode: 0644]
services/keep-balance/collection.go
services/keep-balance/collection_test.go
services/keep-balance/integration_test.go
services/keep-balance/main.go
services/keep-balance/main_test.go
services/keep-balance/server.go

index 3c283c354cf2f6e48dd4f3f2dfb2e732276f169e..dfb6a0ad6e00d9e58cd2fe33bea7f32efc70cadf 100644 (file)
@@ -39,6 +39,10 @@ h2(#main). development main (as of 2021-07-15)
 
 "Upgrading from 2.2.0":#v2_2_0
 
+h3. keep-balance requires access to PostgreSQL
+
+Make sure the keep-balance process can connect to your PostgreSQL server using the settings in your config file. (In previous versions, keep-balance accessed the database through controller instead of connecting to the database server directly.)
+
 h3. crunch-dispatch-local now requires config.yml
 
 The @crunch-dispatch-local@ dispatcher now reads the API host and token from the system wide @/etc/arvados/config.yml@ .  It will fail to start that file is not found or not readable.
index 1d9b654b25c285d13ba51b843fb0c53b4ad26b4a..bb4ae7b3d8ef1b9b5f810d2cfecfe8901f7a2be0 100644 (file)
@@ -18,13 +18,13 @@ h2(#introduction). Introduction
 
 Keep-balance deletes unreferenced and overreplicated blocks from Keep servers, makes additional copies of underreplicated blocks, and moves blocks into optimal locations as needed (e.g., after adding new servers). See "Balancing Keep servers":{{site.baseurl}}/admin/keep-balance.html for usage details.
 
-Keep-balance can be installed anywhere with network access to Keep services. Typically it runs on the same host as keepproxy.
+Keep-balance can be installed anywhere with network access to Keep services, arvados-controller, and PostgreSQL. Typically it runs on the same host as keepproxy.
 
 *A cluster should have only one instance of keep-balance running at a time.*
 
 {% include 'notebox_begin' %}
 
-If you are installing keep-balance on an existing system with valuable data, you can run keep-balance in "dry run" mode first and review its logs as a precaution. To do this, edit your keep-balance startup script to use the flags @-commit-pulls=false -commit-trash=false@.
+If you are installing keep-balance on an existing system with valuable data, you can run keep-balance in "dry run" mode first and review its logs as a precaution. To do this, edit your keep-balance startup script to use the flags @-commit-pulls=false -commit-trash=false -commit-confirmed-fields=false@.
 
 {% include 'notebox_end' %}
 
index 3ce160776ec7bbd4a39530567930ded21c4babbe..66f508b5adf2c3c10f2bca8ce2d72f0e44a0f327 100644 (file)
@@ -459,6 +459,13 @@ Clusters:
       # long-running balancing operation.
       BalanceTimeout: 6h
 
+      # Maximum number of replication_confirmed /
+      # storage_classes_confirmed updates to write to the database
+      # after a rebalancing run. When many updates are needed, this
+      # spreads them over a few runs rather than applying them all at
+      # once.
+      BalanceUpdateLimit: 100000
+
       # Default lifetime for ephemeral collections: 2 weeks. This must not
       # be less than BlobSigningTTL.
       DefaultTrashLifetime: 336h
index da5495352a53e7450ada8dabdbb578f9ec13647c..bbc5ea6c55b885244fc0c33e51a50f36c0f64ca1 100644 (file)
@@ -84,6 +84,7 @@ var whitelist = map[string]bool{
        "Collections.BalanceCollectionBuffers":                false,
        "Collections.BalancePeriod":                           false,
        "Collections.BalanceTimeout":                          false,
+       "Collections.BalanceUpdateLimit":                      false,
        "Collections.BlobDeleteConcurrency":                   false,
        "Collections.BlobMissingReport":                       false,
        "Collections.BlobReplicateConcurrency":                false,
index 4a726d1ec7e5c5ce615fdfa23ffb64db43826917..ee230841354522ad155e25ce794dbca6f549b58f 100644 (file)
@@ -465,6 +465,13 @@ Clusters:
       # long-running balancing operation.
       BalanceTimeout: 6h
 
+      # Maximum number of replication_confirmed /
+      # storage_classes_confirmed updates to write to the database
+      # after a rebalancing run. When many updates are needed, this
+      # spreads them over a few runs rather than applying them all at
+      # once.
+      BalanceUpdateLimit: 100000
+
       # Default lifetime for ephemeral collections: 2 weeks. This must not
       # be less than BlobSigningTTL.
       DefaultTrashLifetime: 336h
index 9ca24312582060d42f6ed878f600c780ae9d5872..40db4f9c7c7f80f744ab3c44da874794d925c9c1 100644 (file)
@@ -12,6 +12,7 @@ import (
        "io"
        "net"
        "net/http"
+       _ "net/http/pprof"
        "net/url"
        "os"
        "strings"
@@ -70,6 +71,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
        loader := config.NewLoader(stdin, log)
        loader.SetupFlags(flags)
        versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
+       pprofAddr := flags.String("pprof", "", "Serve Go profile data at `[addr]:port`")
        err = flags.Parse(args)
        if err == flag.ErrHelp {
                err = nil
@@ -80,6 +82,12 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                return cmd.Version.RunCommand(prog, args, stdin, stdout, stderr)
        }
 
+       if *pprofAddr != "" {
+               go func() {
+                       log.Println(http.ListenAndServe(*pprofAddr, nil))
+               }()
+       }
+
        if strings.HasSuffix(prog, "controller") {
                // Some config-loader checks try to make API calls via
                // controller. Those can't be expected to work if this
index cec20279d1fdc81cb56936ed731dea6c6a4c0e8c..785c18d4a75090a9847be18122b418b392c45474 100644 (file)
@@ -5,11 +5,10 @@
 package arvados
 
 import (
-       "bufio"
+       "bytes"
        "crypto/md5"
        "fmt"
        "regexp"
-       "strings"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/blockdigest"
@@ -56,40 +55,40 @@ func (c Collection) resourceName() string {
 //
 // Zero-length blocks are not included.
 func (c *Collection) SizedDigests() ([]SizedDigest, error) {
-       manifestText := c.ManifestText
-       if manifestText == "" {
-               manifestText = c.UnsignedManifestText
+       manifestText := []byte(c.ManifestText)
+       if len(manifestText) == 0 {
+               manifestText = []byte(c.UnsignedManifestText)
        }
-       if manifestText == "" && c.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
+       if len(manifestText) == 0 && c.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
                // TODO: Check more subtle forms of corruption, too
                return nil, fmt.Errorf("manifest is missing")
        }
-       var sds []SizedDigest
-       scanner := bufio.NewScanner(strings.NewReader(manifestText))
-       scanner.Buffer(make([]byte, 1048576), len(manifestText))
-       for scanner.Scan() {
-               line := scanner.Text()
-               tokens := strings.Split(line, " ")
+       sds := make([]SizedDigest, 0, len(manifestText)/40)
+       for _, line := range bytes.Split(manifestText, []byte{'\n'}) {
+               if len(line) == 0 {
+                       continue
+               }
+               tokens := bytes.Split(line, []byte{' '})
                if len(tokens) < 3 {
                        return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
                }
                for _, token := range tokens[1:] {
-                       if !blockdigest.LocatorPattern.MatchString(token) {
+                       if !blockdigest.LocatorPattern.Match(token) {
                                // FIXME: ensure it's a file token
                                break
                        }
-                       if strings.HasPrefix(token, "d41d8cd98f00b204e9800998ecf8427e+0") {
+                       if bytes.HasPrefix(token, []byte("d41d8cd98f00b204e9800998ecf8427e+0")) {
                                // Exclude "empty block" placeholder
                                continue
                        }
                        // FIXME: shouldn't assume 32 char hash
-                       if i := strings.IndexRune(token[33:], '+'); i >= 0 {
+                       if i := bytes.IndexRune(token[33:], '+'); i >= 0 {
                                token = token[:33+i]
                        }
-                       sds = append(sds, SizedDigest(token))
+                       sds = append(sds, SizedDigest(string(token)))
                }
        }
-       return sds, scanner.Err()
+       return sds, nil
 }
 
 type CollectionList struct {
index 844991f41e03b219c5b1d1c0c537d5125d9b2413..9e7eb521eec079a145c94a840a14b35e502b6f18 100644 (file)
@@ -138,6 +138,7 @@ type Cluster struct {
                BalanceCollectionBatch   int
                BalanceCollectionBuffers int
                BalanceTimeout           Duration
+               BalanceUpdateLimit       int
 
                WebDAVCache WebDAVCacheConfig
 
index 3de4225d568a95324e5f81ecc34e8b2486433f8e..9281f51d0cf0ee2b46ca97c2e59fde9f68051d4d 100644 (file)
@@ -31,7 +31,10 @@ const (
        UserAgreementPDH        = "b519d9cb706a29fc7ea24dbea2f05851+93"
        HelloWorldPdh           = "55713e6a34081eb03609e7ad5fcad129+62"
 
-       MultilevelCollection1 = "zzzzz-4zz18-pyw8yp9g3pr7irn"
+       MultilevelCollection1                        = "zzzzz-4zz18-pyw8yp9g3pr7irn"
+       StorageClassesDesiredDefaultConfirmedDefault = "zzzzz-4zz18-3t236wr12769tga"
+       StorageClassesDesiredArchiveConfirmedDefault = "zzzzz-4zz18-3t236wr12769qqa"
+       EmptyCollectionUUID                          = "zzzzz-4zz18-gs9ooj1h9sd5mde"
 
        AProjectUUID    = "zzzzz-j7d0g-v955i6s2oi1cbso"
        ASubprojectUUID = "zzzzz-j7d0g-axqo7eu9pwvna1x"
index 86423a2976b1e0909470bd563c486aee894743af..e69d941b1eaf6eadf52b5c48871fea02c0d3f5bb 100644 (file)
@@ -18,11 +18,13 @@ import (
        "sort"
        "strings"
        "sync"
+       "sync/atomic"
        "syscall"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
+       "github.com/jmoiron/sqlx"
        "github.com/sirupsen/logrus"
 )
 
@@ -36,6 +38,7 @@ import (
 // BlobSignatureTTL; and all N existing replicas of a given data block
 // are in the N best positions in rendezvous probe order.
 type Balancer struct {
+       DB      *sqlx.DB
        Logger  logrus.FieldLogger
        Dumper  logrus.FieldLogger
        Metrics *metrics
@@ -50,7 +53,7 @@ type Balancer struct {
        classes       []string
        mounts        int
        mountsByClass map[string]map[*KeepMount]bool
-       collScanned   int
+       collScanned   int64
        serviceRoots  map[string]string
        errors        []error
        stats         balancerStats
@@ -167,6 +170,15 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
        }
        if runOptions.CommitTrash {
                err = bal.CommitTrash(ctx, client)
+               if err != nil {
+                       return
+               }
+       }
+       if runOptions.CommitConfirmedFields {
+               err = bal.updateCollections(ctx, client, cluster)
+               if err != nil {
+                       return
+               }
        }
        return
 }
@@ -388,39 +400,14 @@ func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pag
                }(mounts)
        }
 
-       // collQ buffers incoming collections so we can start fetching
-       // the next page without waiting for the current page to
-       // finish processing.
        collQ := make(chan arvados.Collection, bufs)
 
-       // Start a goroutine to process collections. (We could use a
-       // worker pool here, but even with a single worker we already
-       // process collections much faster than we can retrieve them.)
-       wg.Add(1)
-       go func() {
-               defer wg.Done()
-               for coll := range collQ {
-                       err := bal.addCollection(coll)
-                       if err != nil || len(errs) > 0 {
-                               select {
-                               case errs <- err:
-                               default:
-                               }
-                               for range collQ {
-                               }
-                               cancel()
-                               return
-                       }
-                       bal.collScanned++
-               }
-       }()
-
-       // Start a goroutine to retrieve all collections from the
-       // Arvados database and send them to collQ for processing.
+       // Retrieve all collections from the database and send them to
+       // collQ.
        wg.Add(1)
        go func() {
                defer wg.Done()
-               err = EachCollection(ctx, c, pageSize,
+               err = EachCollection(ctx, bal.DB, c,
                        func(coll arvados.Collection) error {
                                collQ <- coll
                                if len(errs) > 0 {
@@ -444,6 +431,27 @@ func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pag
                }
        }()
 
+       // Parse manifests from collQ and pass the block hashes to
+       // BlockStateMap to track desired replication.
+       for i := 0; i < runtime.NumCPU(); i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for coll := range collQ {
+                               err := bal.addCollection(coll)
+                               if err != nil || len(errs) > 0 {
+                                       select {
+                                       case errs <- err:
+                                       default:
+                                       }
+                                       cancel()
+                                       continue
+                               }
+                               atomic.AddInt64(&bal.collScanned, 1)
+                       }
+               }()
+       }
+
        wg.Wait()
        if len(errs) > 0 {
                return <-errs
@@ -460,7 +468,7 @@ func (bal *Balancer) addCollection(coll arvados.Collection) error {
        if coll.ReplicationDesired != nil {
                repl = *coll.ReplicationDesired
        }
-       bal.Logger.Debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
+       bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
        // Pass pdh to IncreaseDesired only if LostBlocksFile is being
        // written -- otherwise it's just a waste of memory.
        pdh := ""
index cbdde595e8b4b797c70422ae71a5dc7affb33dcf..18a8bdcf47b111b0c6ea6469d58b60d2a10ea5f5 100644 (file)
@@ -21,6 +21,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/jmoiron/sqlx"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/prometheus/common/expfmt"
        check "gopkg.in/check.v1"
@@ -309,6 +310,7 @@ func (s *stubServer) serveKeepstorePull() *reqTracker {
 type runSuite struct {
        stub   stubServer
        config *arvados.Cluster
+       db     *sqlx.DB
        client *arvados.Client
 }
 
@@ -320,6 +322,7 @@ func (s *runSuite) newServer(options *RunOptions) *Server {
                Metrics:    newMetrics(prometheus.NewRegistry()),
                Logger:     options.Logger,
                Dumper:     options.Dumper,
+               DB:         s.db,
        }
        return srv
 }
@@ -329,6 +332,8 @@ func (s *runSuite) SetUpTest(c *check.C) {
        c.Assert(err, check.Equals, nil)
        s.config, err = cfg.GetCluster("")
        c.Assert(err, check.Equals, nil)
+       s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
+       c.Assert(err, check.IsNil)
 
        s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
        arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
@@ -347,6 +352,9 @@ func (s *runSuite) TearDownTest(c *check.C) {
 }
 
 func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
+       defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
+       _, err := s.db.Exec(`delete from collections`)
+       c.Assert(err, check.IsNil)
        opts := RunOptions{
                CommitPulls: true,
                CommitTrash: true,
@@ -360,7 +368,7 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
        srv := s.newServer(&opts)
-       _, err := srv.runOnce()
+       _, err = srv.runOnce()
        c.Check(err, check.ErrorMatches, "received zero collections")
        c.Check(trashReqs.Count(), check.Equals, 4)
        c.Check(pullReqs.Count(), check.Equals, 0)
@@ -385,26 +393,6 @@ func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
        c.Check(pullReqs.Count(), check.Equals, 0)
 }
 
-func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
-       opts := RunOptions{
-               CommitPulls: true,
-               CommitTrash: true,
-               Logger:      ctxlog.TestLogger(c),
-       }
-       s.stub.serveCurrentUserAdmin()
-       s.stub.serveCollectionsButSkipOne()
-       s.stub.serveKeepServices(stubServices)
-       s.stub.serveKeepstoreMounts()
-       s.stub.serveKeepstoreIndexFoo4Bar1()
-       trashReqs := s.stub.serveKeepstoreTrash()
-       pullReqs := s.stub.serveKeepstorePull()
-       srv := s.newServer(&opts)
-       _, err := srv.runOnce()
-       c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
-       c.Check(trashReqs.Count(), check.Equals, 4)
-       c.Check(pullReqs.Count(), check.Equals, 0)
-}
-
 func (s *runSuite) TestWriteLostBlocks(c *check.C) {
        lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
        c.Assert(err, check.IsNil)
@@ -428,7 +416,7 @@ func (s *runSuite) TestWriteLostBlocks(c *check.C) {
        c.Check(err, check.IsNil)
        lost, err := ioutil.ReadFile(lostf.Name())
        c.Assert(err, check.IsNil)
-       c.Check(string(lost), check.Equals, "37b51d194a7513e45b56f6524f2d51f2 fa7aeb5140e2848d39b416daeef4ffc5+45\n")
+       c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
 }
 
 func (s *runSuite) TestDryRun(c *check.C) {
@@ -459,11 +447,7 @@ func (s *runSuite) TestDryRun(c *check.C) {
 }
 
 func (s *runSuite) TestCommit(c *check.C) {
-       lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
-       c.Assert(err, check.IsNil)
-       s.config.Collections.BlobMissingReport = lostf.Name()
-       defer os.Remove(lostf.Name())
-
+       s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
        s.config.ManagementToken = "xyzzy"
        opts := RunOptions{
                CommitPulls: true,
@@ -489,17 +473,18 @@ func (s *runSuite) TestCommit(c *check.C) {
        // in a poor rendezvous position
        c.Check(bal.stats.pulls, check.Equals, 2)
 
-       lost, err := ioutil.ReadFile(lostf.Name())
+       lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
        c.Assert(err, check.IsNil)
-       c.Check(string(lost), check.Equals, "")
+       c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
 
        buf, err := s.getMetrics(c, srv)
        c.Check(err, check.IsNil)
-       c.Check(buf, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
-       c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
-       c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
-       c.Check(buf, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio 1\.5\n.*`)
-       c.Check(buf, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio 1\.5\n.*`)
+       bufstr := buf.String()
+       c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
+       c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
+       c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
+       c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
+       c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
 }
 
 func (s *runSuite) TestRunForever(c *check.C) {
index 029f8c6c0790337f561d679c8c9d21cf33ff502b..e30b4ff7943d4c4a041ec71924000a48a856c4d4 100644 (file)
@@ -133,3 +133,53 @@ func (bsm *BlockStateMap) IncreaseDesired(pdh string, classes []string, n int, b
                bsm.get(blkid).increaseDesired(pdh, classes, n)
        }
 }
+
+// GetConfirmedReplication returns the replication level of the given
+// blocks, considering only the specified storage classes.
+//
+// If len(classes)==0, returns the replication level without regard to
+// storage classes.
+//
+// Safe to call concurrently with other calls to GetCurrent, but not
+// with different BlockStateMap methods.
+func (bsm *BlockStateMap) GetConfirmedReplication(blkids []arvados.SizedDigest, classes []string) int {
+       defaultClasses := map[string]bool{"default": true}
+       min := 0
+       for _, blkid := range blkids {
+               total := 0
+               perclass := make(map[string]int, len(classes))
+               for _, c := range classes {
+                       perclass[c] = 0
+               }
+               for _, r := range bsm.get(blkid).Replicas {
+                       total += r.KeepMount.Replication
+                       mntclasses := r.KeepMount.StorageClasses
+                       if len(mntclasses) == 0 {
+                               mntclasses = defaultClasses
+                       }
+                       for c := range mntclasses {
+                               n, ok := perclass[c]
+                               if !ok {
+                                       // Don't care about this storage class
+                                       continue
+                               }
+                               perclass[c] = n + r.KeepMount.Replication
+                       }
+               }
+               if total == 0 {
+                       return 0
+               }
+               for _, n := range perclass {
+                       if n == 0 {
+                               return 0
+                       }
+                       if n < min || min == 0 {
+                               min = n
+                       }
+               }
+               if len(perclass) == 0 && (min == 0 || min > total) {
+                       min = total
+               }
+       }
+       return min
+}
diff --git a/services/keep-balance/block_state_test.go b/services/keep-balance/block_state_test.go
new file mode 100644 (file)
index 0000000..aaf2c18
--- /dev/null
@@ -0,0 +1,94 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "time"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&confirmedReplicationSuite{})
+
+type confirmedReplicationSuite struct {
+       blockStateMap *BlockStateMap
+       mtime         int64
+}
+
+func (s *confirmedReplicationSuite) SetUpTest(c *check.C) {
+       t, _ := time.Parse(time.RFC3339Nano, time.RFC3339Nano)
+       s.mtime = t.UnixNano()
+       s.blockStateMap = NewBlockStateMap()
+       s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+               Replication:    1,
+               StorageClasses: map[string]bool{"default": true},
+       }}, []arvados.KeepServiceIndexEntry{
+               {SizedDigest: knownBlkid(10), Mtime: s.mtime},
+       })
+       s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+               Replication:    2,
+               StorageClasses: map[string]bool{"default": true},
+       }}, []arvados.KeepServiceIndexEntry{
+               {SizedDigest: knownBlkid(20), Mtime: s.mtime},
+       })
+}
+
+func (s *confirmedReplicationSuite) TestZeroReplication(c *check.C) {
+       n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(404), knownBlkid(409)}, []string{"default"})
+       c.Check(n, check.Equals, 0)
+       n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(10), knownBlkid(404)}, []string{"default"})
+       c.Check(n, check.Equals, 0)
+       n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(10), knownBlkid(404)}, nil)
+       c.Check(n, check.Equals, 0)
+}
+
+func (s *confirmedReplicationSuite) TestBlocksWithDifferentReplication(c *check.C) {
+       n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(10), knownBlkid(20)}, []string{"default"})
+       c.Check(n, check.Equals, 1)
+}
+
+func (s *confirmedReplicationSuite) TestBlocksInDifferentClasses(c *check.C) {
+       s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+               Replication:    3,
+               StorageClasses: map[string]bool{"three": true},
+       }}, []arvados.KeepServiceIndexEntry{
+               {SizedDigest: knownBlkid(30), Mtime: s.mtime},
+       })
+
+       n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(30)}, []string{"three"})
+       c.Check(n, check.Equals, 3)
+       n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(20), knownBlkid(30)}, []string{"default"})
+       c.Check(n, check.Equals, 0) // block 30 has repl 0 @ "default"
+       n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(20), knownBlkid(30)}, []string{"three"})
+       c.Check(n, check.Equals, 0) // block 20 has repl 0 @ "three"
+       n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(20), knownBlkid(30)}, nil)
+       c.Check(n, check.Equals, 2)
+}
+
+func (s *confirmedReplicationSuite) TestBlocksOnMultipleMounts(c *check.C) {
+       s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+               Replication:    2,
+               StorageClasses: map[string]bool{"default": true, "four": true},
+       }}, []arvados.KeepServiceIndexEntry{
+               {SizedDigest: knownBlkid(40), Mtime: s.mtime},
+               {SizedDigest: knownBlkid(41), Mtime: s.mtime},
+       })
+       s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+               Replication:    2,
+               StorageClasses: map[string]bool{"four": true},
+       }}, []arvados.KeepServiceIndexEntry{
+               {SizedDigest: knownBlkid(40), Mtime: s.mtime},
+               {SizedDigest: knownBlkid(41), Mtime: s.mtime},
+       })
+       n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, []string{"default"})
+       c.Check(n, check.Equals, 2)
+       n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, []string{"four"})
+       c.Check(n, check.Equals, 4)
+       n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, []string{"default", "four"})
+       c.Check(n, check.Equals, 2)
+       n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, nil)
+       c.Check(n, check.Equals, 4)
+}
index 1659918cafe20c62162abfb1e841a40f80170c0a..1e1e51abe7ebcd55e5d0ed7ca55eae02a8b56504 100644 (file)
@@ -6,10 +6,15 @@ package main
 
 import (
        "context"
+       "encoding/json"
        "fmt"
+       "runtime"
+       "sync"
+       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/jmoiron/sqlx"
 )
 
 func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
@@ -28,10 +33,7 @@ func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int
 // The progress function is called periodically with done (number of
 // times f has been called) and total (number of times f is expected
 // to be called).
-//
-// If pageSize > 0 it is used as the maximum page size in each API
-// call; otherwise the maximum allowed page size is requested.
-func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
+func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error {
        if progress == nil {
                progress = func(_, _ int) {}
        }
@@ -43,125 +45,222 @@ func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func
        if err != nil {
                return err
        }
+       var newestModifiedAt time.Time
 
-       // Note the obvious way to get all collections (sorting by
-       // UUID) would be much easier, but would lose data: If a
-       // client were to move files from collection with uuid="zzz"
-       // to a collection with uuid="aaa" around the time when we
-       // were fetching the "mmm" page, we would never see those
-       // files' block IDs at all -- even if the client is careful to
-       // save "aaa" before saving "zzz".
-       //
-       // Instead, we get pages in modified_at order. Collections
-       // that are modified during the run will be re-fetched in a
-       // subsequent page.
-
-       limit := pageSize
-       if limit <= 0 {
-               // Use the maximum page size the server allows
-               limit = 1<<31 - 1
-       }
-       params := arvados.ResourceListParams{
-               Limit:              &limit,
-               Order:              "modified_at, uuid",
-               Count:              "none",
-               Select:             []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
-               IncludeTrash:       true,
-               IncludeOldVersions: true,
+       rows, err := db.QueryxContext(ctx, `SELECT
+               uuid, manifest_text, modified_at, portable_data_hash,
+               replication_desired, replication_confirmed, replication_confirmed_at,
+               storage_classes_desired, storage_classes_confirmed, storage_classes_confirmed_at,
+               is_trashed
+               FROM collections`)
+       if err != nil {
+               return err
        }
-       var last arvados.Collection
-       var filterTime time.Time
+       defer rows.Close()
+       progressTicker := time.NewTicker(10 * time.Second)
+       defer progressTicker.Stop()
        callCount := 0
-       gettingExactTimestamp := false
-       for {
-               progress(callCount, expectCount)
-               var page arvados.CollectionList
-               err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params)
+       for rows.Next() {
+               var coll arvados.Collection
+               var classesDesired, classesConfirmed []byte
+               err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash,
+                       &coll.ReplicationDesired, &coll.ReplicationConfirmed, &coll.ReplicationConfirmedAt,
+                       &classesDesired, &classesConfirmed, &coll.StorageClassesConfirmedAt,
+                       &coll.IsTrashed)
                if err != nil {
                        return err
                }
-               for _, coll := range page.Items {
-                       if last.ModifiedAt == coll.ModifiedAt && last.UUID >= coll.UUID {
-                               continue
-                       }
-                       callCount++
-                       err = f(coll)
-                       if err != nil {
-                               return err
-                       }
-                       last = coll
+
+               err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
+               if err != nil && len(classesDesired) > 0 {
+                       return err
                }
-               if len(page.Items) == 0 && !gettingExactTimestamp {
-                       break
-               } else if last.ModifiedAt.IsZero() {
-                       return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
-               } else if len(page.Items) > 0 && last.ModifiedAt == filterTime {
-                       // If we requested time>=X and never got a
-                       // time>X then we might not have received all
-                       // items with time==X yet. Switch to
-                       // gettingExactTimestamp mode (if we're not
-                       // there already), advancing our UUID
-                       // threshold with each request, until we get
-                       // an empty page.
-                       gettingExactTimestamp = true
-                       params.Filters = []arvados.Filter{{
-                               Attr:     "modified_at",
-                               Operator: "=",
-                               Operand:  filterTime,
-                       }, {
-                               Attr:     "uuid",
-                               Operator: ">",
-                               Operand:  last.UUID,
-                       }}
-               } else if gettingExactTimestamp {
-                       // This must be an empty page (in this mode,
-                       // an unequal timestamp is impossible) so we
-                       // can start getting pages of newer
-                       // collections.
-                       gettingExactTimestamp = false
-                       params.Filters = []arvados.Filter{{
-                               Attr:     "modified_at",
-                               Operator: ">",
-                               Operand:  filterTime,
-                       }}
-               } else {
-                       // In the normal case, we know we have seen
-                       // all collections with modtime<filterTime,
-                       // but we might not have seen all that have
-                       // modtime=filterTime. Hence we use >= instead
-                       // of > and skip the obvious overlapping item,
-                       // i.e., the last item on the previous
-                       // page. In some edge cases this can return
-                       // collections we have already seen, but
-                       // avoiding that would add overhead in the
-                       // overwhelmingly common cases, so we don't
-                       // bother.
-                       filterTime = last.ModifiedAt
-                       params.Filters = []arvados.Filter{{
-                               Attr:     "modified_at",
-                               Operator: ">=",
-                               Operand:  filterTime,
-                       }, {
-                               Attr:     "uuid",
-                               Operator: "!=",
-                               Operand:  last.UUID,
-                       }}
+               err = json.Unmarshal(classesConfirmed, &coll.StorageClassesConfirmed)
+               if err != nil && len(classesConfirmed) > 0 {
+                       return err
+               }
+               if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
+                       newestModifiedAt = coll.ModifiedAt
+               }
+               callCount++
+               err = f(coll)
+               if err != nil {
+                       return err
+               }
+               select {
+               case <-progressTicker.C:
+                       progress(callCount, expectCount)
+               default:
                }
        }
        progress(callCount, expectCount)
-
+       err = rows.Close()
+       if err != nil {
+               return err
+       }
        if checkCount, err := countCollections(c, arvados.ResourceListParams{
                Filters: []arvados.Filter{{
                        Attr:     "modified_at",
                        Operator: "<=",
-                       Operand:  filterTime}},
+                       Operand:  newestModifiedAt}},
                IncludeTrash:       true,
                IncludeOldVersions: true,
        }); err != nil {
                return err
        } else if callCount < checkCount {
-               return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)
+               return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, newestModifiedAt, checkCount)
        }
 
        return nil
 }
+
+func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, cluster *arvados.Cluster) error {
+       ctx, cancel := context.WithCancel(ctx)
+       defer cancel()
+
+       defer bal.time("update_collections", "wall clock time to update collections")()
+       threshold := time.Now()
+       thresholdStr := threshold.Format(time.RFC3339Nano)
+
+       updated := int64(0)
+
+       errs := make(chan error, 1)
+       collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
+       go func() {
+               defer close(collQ)
+               err := EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
+                       if atomic.LoadInt64(&updated) >= int64(cluster.Collections.BalanceUpdateLimit) {
+                               bal.logf("reached BalanceUpdateLimit (%d)", cluster.Collections.BalanceUpdateLimit)
+                               cancel()
+                               return context.Canceled
+                       }
+                       collQ <- coll
+                       return nil
+               }, func(done, total int) {
+                       bal.logf("update collections: %d/%d (%d updated @ %.01f updates/s)", done, total, atomic.LoadInt64(&updated), float64(atomic.LoadInt64(&updated))/time.Since(threshold).Seconds())
+               })
+               if err != nil && err != context.Canceled {
+                       select {
+                       case errs <- err:
+                       default:
+                       }
+               }
+       }()
+
+       var wg sync.WaitGroup
+
+       // Use about 1 goroutine per 2 CPUs. Based on experiments with
+       // a 2-core host, using more concurrent database
+       // calls/transactions makes this process slower, not faster.
+       for i := 0; i < runtime.NumCPU()+1/2; i++ {
+               wg.Add(1)
+               goSendErr(errs, func() error {
+                       defer wg.Done()
+                       tx, err := bal.DB.Beginx()
+                       if err != nil {
+                               return err
+                       }
+                       txPending := 0
+                       flush := func(final bool) error {
+                               err := tx.Commit()
+                               if err != nil && ctx.Err() == nil {
+                                       tx.Rollback()
+                                       return err
+                               }
+                               txPending = 0
+                               if final {
+                                       return nil
+                               }
+                               tx, err = bal.DB.Beginx()
+                               return err
+                       }
+                       txBatch := 100
+                       for coll := range collQ {
+                               if ctx.Err() != nil || len(errs) > 0 {
+                                       continue
+                               }
+                               blkids, err := coll.SizedDigests()
+                               if err != nil {
+                                       bal.logf("%s: %s", coll.UUID, err)
+                                       continue
+                               }
+                               repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
+
+                               desired := bal.DefaultReplication
+                               if coll.ReplicationDesired != nil {
+                                       desired = *coll.ReplicationDesired
+                               }
+                               if repl > desired {
+                                       // If actual>desired, confirm
+                                       // the desired number rather
+                                       // than actual to avoid
+                                       // flapping updates when
+                                       // replication increases
+                                       // temporarily.
+                                       repl = desired
+                               }
+                               classes := emptyJSONArray
+                               if repl > 0 {
+                                       classes, err = json.Marshal(coll.StorageClassesDesired)
+                                       if err != nil {
+                                               bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
+                                               continue
+                                       }
+                               }
+                               needUpdate := coll.ReplicationConfirmed == nil || *coll.ReplicationConfirmed != repl || len(coll.StorageClassesConfirmed) != len(coll.StorageClassesDesired)
+                               for i := range coll.StorageClassesDesired {
+                                       if !needUpdate && coll.StorageClassesDesired[i] != coll.StorageClassesConfirmed[i] {
+                                               needUpdate = true
+                                       }
+                               }
+                               if !needUpdate {
+                                       continue
+                               }
+                               _, err = tx.ExecContext(ctx, `update collections set
+                                       replication_confirmed=$1,
+                                       replication_confirmed_at=$2,
+                                       storage_classes_confirmed=$3,
+                                       storage_classes_confirmed_at=$2
+                                       where uuid=$4`,
+                                       repl, thresholdStr, classes, coll.UUID)
+                               if err != nil {
+                                       if ctx.Err() == nil {
+                                               bal.logf("%s: update failed: %s", coll.UUID, err)
+                                       }
+                                       continue
+                               }
+                               atomic.AddInt64(&updated, 1)
+                               if txPending++; txPending >= txBatch {
+                                       err = flush(false)
+                                       if err != nil {
+                                               return err
+                                       }
+                               }
+                       }
+                       return flush(true)
+               })
+       }
+       wg.Wait()
+       bal.logf("updated %d collections", updated)
+       if len(errs) > 0 {
+               return fmt.Errorf("error updating collections: %s", <-errs)
+       }
+       return nil
+}
+
+// Call f in a new goroutine. If it returns a non-nil error, send the
+// error to the errs channel (unless the channel is already full with
+// another error).
+func goSendErr(errs chan<- error, f func() error) {
+       go func() {
+               err := f()
+               if err != nil {
+                       select {
+                       case errs <- err:
+                       default:
+                       }
+               }
+       }()
+}
+
+var emptyJSONArray = []byte("[]")
index 3ab9d07b2e2ed6bcc7220ae17aad4e6e7a665855..f749bad6ad1865a30670d0fe2978dfe8ebd2764c 100644 (file)
@@ -6,57 +6,34 @@ package main
 
 import (
        "context"
-       "sync"
-       "time"
 
+       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/jmoiron/sqlx"
        check "gopkg.in/check.v1"
 )
 
-//  TestIdenticalTimestamps ensures EachCollection returns the same
-//  set of collections for various page sizes -- even page sizes so
-//  small that we get entire pages full of collections with identical
-//  timestamps and exercise our gettingExactTimestamp cases.
-func (s *integrationSuite) TestIdenticalTimestamps(c *check.C) {
-       // pageSize==0 uses the default (large) page size.
-       pageSizes := []int{0, 2, 3, 4, 5}
-       got := make([][]string, len(pageSizes))
-       var wg sync.WaitGroup
-       for trial, pageSize := range pageSizes {
-               wg.Add(1)
-               go func(trial, pageSize int) {
-                       defer wg.Done()
-                       streak := 0
-                       longestStreak := 0
-                       var lastMod time.Time
-                       sawUUID := make(map[string]bool)
-                       err := EachCollection(context.Background(), s.client, pageSize, func(c arvados.Collection) error {
-                               if c.ModifiedAt.IsZero() {
-                                       return nil
-                               }
-                               if sawUUID[c.UUID] {
-                                       // dup
-                                       return nil
-                               }
-                               got[trial] = append(got[trial], c.UUID)
-                               sawUUID[c.UUID] = true
-                               if lastMod == c.ModifiedAt {
-                                       streak++
-                                       if streak > longestStreak {
-                                               longestStreak = streak
-                                       }
-                               } else {
-                                       streak = 0
-                                       lastMod = c.ModifiedAt
-                               }
-                               return nil
-                       }, nil)
-                       c.Check(err, check.IsNil)
-                       c.Check(longestStreak > 25, check.Equals, true)
-               }(trial, pageSize)
-       }
-       wg.Wait()
-       for trial := 1; trial < len(pageSizes); trial++ {
-               c.Check(got[trial], check.DeepEquals, got[0])
-       }
+// TestMissedCollections exercises EachCollection's sanity check:
+// #collections processed >= #old collections that exist in database
+// after processing.
+func (s *integrationSuite) TestMissedCollections(c *check.C) {
+       cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+       c.Assert(err, check.IsNil)
+       cluster, err := cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+       db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+       c.Assert(err, check.IsNil)
+
+       defer db.Exec(`delete from collections where uuid = 'zzzzz-4zz18-404040404040404'`)
+       insertedOld := false
+       err = EachCollection(context.Background(), db, s.client, func(coll arvados.Collection) error {
+               if !insertedOld {
+                       insertedOld = true
+                       _, err := db.Exec(`insert into collections (uuid, created_at, updated_at, modified_at) values ('zzzzz-4zz18-404040404040404', '2002-02-02T02:02:02Z', '2002-02-02T02:02:02Z', '2002-02-02T02:02:02Z')`)
+                       return err
+               }
+               return nil
+       }, nil)
+       c.Check(err, check.ErrorMatches, `Retrieved .* collections .* but server now reports .* collections.*`)
 }
index defabd9a109f27b348b61aeccb2ed822d2fa862e..52e6149158253bdff24705dcb8b6fb7a00f8cb02 100644 (file)
@@ -6,6 +6,7 @@ package main
 
 import (
        "bytes"
+       "io"
        "os"
        "strings"
        "testing"
@@ -17,6 +18,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
+       "github.com/jmoiron/sqlx"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
        check "gopkg.in/check.v1"
@@ -26,6 +28,7 @@ var _ = check.Suite(&integrationSuite{})
 
 type integrationSuite struct {
        config     *arvados.Cluster
+       db         *sqlx.DB
        client     *arvados.Client
        keepClient *keepclient.KeepClient
 }
@@ -67,6 +70,8 @@ func (s *integrationSuite) SetUpTest(c *check.C) {
        c.Assert(err, check.Equals, nil)
        s.config, err = cfg.GetCluster("")
        c.Assert(err, check.Equals, nil)
+       s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
+       c.Assert(err, check.IsNil)
        s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
 
        s.client = &arvados.Client{
@@ -81,14 +86,16 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
        for iter := 0; iter < 20; iter++ {
                logBuf.Reset()
                logger := logrus.New()
-               logger.Out = &logBuf
+               logger.Out = io.MultiWriter(&logBuf, os.Stderr)
                opts := RunOptions{
-                       CommitPulls: true,
-                       CommitTrash: true,
-                       Logger:      logger,
+                       CommitPulls:           true,
+                       CommitTrash:           true,
+                       CommitConfirmedFields: true,
+                       Logger:                logger,
                }
 
                bal := &Balancer{
+                       DB:      s.db,
                        Logger:  logger,
                        Metrics: newMetrics(prometheus.NewRegistry()),
                }
@@ -105,4 +112,23 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
                time.Sleep(200 * time.Millisecond)
        }
        c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*0 replicas (0 blocks, 0 bytes) underreplicated.*`)
+
+       for _, trial := range []struct {
+               uuid    string
+               repl    int
+               classes []string
+       }{
+               {arvadostest.EmptyCollectionUUID, 0, []string{}},
+               {arvadostest.FooCollection, 2, []string{"default"}},                                // "foo" blk
+               {arvadostest.StorageClassesDesiredDefaultConfirmedDefault, 2, []string{"default"}}, // "bar" blk
+               {arvadostest.StorageClassesDesiredArchiveConfirmedDefault, 0, []string{}},          // "bar" blk
+       } {
+               c.Logf("%#v", trial)
+               var coll arvados.Collection
+               s.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+trial.uuid, nil, nil)
+               if c.Check(coll.ReplicationConfirmed, check.NotNil) {
+                       c.Check(*coll.ReplicationConfirmed, check.Equals, trial.repl)
+               }
+               c.Check(coll.StorageClassesConfirmed, check.DeepEquals, trial.classes)
+       }
 }
index 8b4ee84c716e4596987b1371b38035610f9ffa2f..e1573e7f733935028d164d6d5dd69d383fdf338f 100644 (file)
@@ -9,6 +9,8 @@ import (
        "flag"
        "fmt"
        "io"
+       "net/http"
+       _ "net/http/pprof"
        "os"
 
        "git.arvados.org/arvados.git/lib/config"
@@ -16,6 +18,8 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/health"
+       "github.com/jmoiron/sqlx"
+       _ "github.com/lib/pq"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
@@ -35,8 +39,17 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
                "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
        flags.BoolVar(&options.CommitTrash, "commit-trash", false,
                "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
+       flags.BoolVar(&options.CommitConfirmedFields, "commit-confirmed-fields", true,
+               "update collection fields (replicas_confirmed, storage_classes_confirmed, etc.)")
        flags.Bool("version", false, "Write version information to stdout and exit 0")
        dumpFlag := flags.Bool("dump", false, "dump details for each block to stdout")
+       pprofAddr := flags.String("pprof", "", "serve Go profile data at `[addr]:port`")
+
+       if *pprofAddr != "" {
+               go func() {
+                       logrus.Println(http.ListenAndServe(*pprofAddr, nil))
+               }()
+       }
 
        loader := config.NewLoader(os.Stdin, logger)
        loader.SetupFlags(flags)
@@ -55,10 +68,11 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
        // service.Command
        args = nil
        dropFlag := map[string]bool{
-               "once":         true,
-               "commit-pulls": true,
-               "commit-trash": true,
-               "dump":         true,
+               "once":                    true,
+               "commit-pulls":            true,
+               "commit-trash":            true,
+               "commit-confirmed-fields": true,
+               "dump":                    true,
        }
        flags.Visit(func(f *flag.Flag) {
                if !dropFlag[f.Name] {
@@ -78,6 +92,18 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
                                return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
                        }
 
+                       db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+                       if err != nil {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection failed: %s", err))
+                       }
+                       if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
+                               db.SetMaxOpenConns(p)
+                       }
+                       err = db.Ping()
+                       if err != nil {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection succeeded but ping failed: %s", err))
+                       }
+
                        if options.Logger == nil {
                                options.Logger = ctxlog.FromContext(ctx)
                        }
@@ -89,6 +115,7 @@ func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.W
                                Metrics:    newMetrics(registry),
                                Logger:     options.Logger,
                                Dumper:     options.Dumper,
+                               DB:         db,
                        }
                        srv.Handler = &health.Handler{
                                Token:  cluster.ManagementToken,
index b154f6e99848a3623b167726412ce5b48a59c715..65a2d5567a86505e7d6e4866aa8ffa75e3bf2deb 100644 (file)
@@ -11,6 +11,11 @@ import (
        "net/http"
        "time"
 
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadostest"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/ghodss/yaml"
        check "gopkg.in/check.v1"
 )
 
@@ -26,6 +31,8 @@ func (s *mainSuite) TestVersionFlag(c *check.C) {
 }
 
 func (s *mainSuite) TestHTTPServer(c *check.C) {
+       arvadostest.StartKeep(2, true)
+
        ln, err := net.Listen("tcp", ":0")
        if err != nil {
                c.Fatal(err)
@@ -33,10 +40,17 @@ func (s *mainSuite) TestHTTPServer(c *check.C) {
        _, p, err := net.SplitHostPort(ln.Addr().String())
        c.Check(err, check.IsNil)
        ln.Close()
-       config := "Clusters:\n zzzzz:\n  ManagementToken: abcdefg\n  Services: {Keepbalance: {InternalURLs: {'http://localhost:" + p + "/': {}}}}\n"
+       cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+       c.Assert(err, check.IsNil)
+       cluster, err := cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+       cluster.Services.Keepbalance.InternalURLs[arvados.URL{Host: "localhost:" + p, Path: "/"}] = arvados.ServiceInstance{}
+       cfg.Clusters[cluster.ClusterID] = *cluster
+       config, err := yaml.Marshal(cfg)
+       c.Assert(err, check.IsNil)
 
        var stdout bytes.Buffer
-       go runCommand("keep-balance", []string{"-config", "-"}, bytes.NewBufferString(config), &stdout, &stdout)
+       go runCommand("keep-balance", []string{"-config", "-"}, bytes.NewBuffer(config), &stdout, &stdout)
        done := make(chan struct{})
        go func() {
                defer close(done)
@@ -47,7 +61,7 @@ func (s *mainSuite) TestHTTPServer(c *check.C) {
                                c.Fatal(err)
                                return
                        }
-                       req.Header.Set("Authorization", "Bearer abcdefg")
+                       req.Header.Set("Authorization", "Bearer "+cluster.ManagementToken)
                        resp, err := http.DefaultClient.Do(req)
                        if err != nil {
                                c.Logf("error %s", err)
@@ -73,6 +87,7 @@ func (s *mainSuite) TestHTTPServer(c *check.C) {
                c.Log(stdout.String())
                c.Fatal("timeout")
        }
+       c.Log(stdout.String())
 
        // Check non-metrics URL that gets passed through to us from
        // service.Command
index 9801a3fd45d5d13ec40bf661c59b4de5156cfeed..5299b96c1caf2ac3aaa28c639e71d501ddbbd637 100644 (file)
@@ -12,6 +12,7 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/jmoiron/sqlx"
        "github.com/sirupsen/logrus"
 )
 
@@ -23,11 +24,12 @@ import (
 //
 // RunOptions fields are controlled by command line flags.
 type RunOptions struct {
-       Once        bool
-       CommitPulls bool
-       CommitTrash bool
-       Logger      logrus.FieldLogger
-       Dumper      logrus.FieldLogger
+       Once                  bool
+       CommitPulls           bool
+       CommitTrash           bool
+       CommitConfirmedFields bool
+       Logger                logrus.FieldLogger
+       Dumper                logrus.FieldLogger
 
        // SafeRendezvousState from the most recent balance operation,
        // or "" if unknown. If this changes from one run to the next,
@@ -46,11 +48,13 @@ type Server struct {
 
        Logger logrus.FieldLogger
        Dumper logrus.FieldLogger
+
+       DB *sqlx.DB
 }
 
 // CheckHealth implements service.Handler.
 func (srv *Server) CheckHealth() error {
-       return nil
+       return srv.DB.Ping()
 }
 
 // Done implements service.Handler.
@@ -75,6 +79,7 @@ func (srv *Server) run() {
 
 func (srv *Server) runOnce() (*Balancer, error) {
        bal := &Balancer{
+               DB:             srv.DB,
                Logger:         srv.Logger,
                Dumper:         srv.Dumper,
                Metrics:        srv.Metrics,