# long-running balancing operation.
BalanceTimeout: 6h
+ # Maximum number of replication_confirmed /
+ # storage_classes_confirmed updates to write to the database
+ # after a rebalancing run. When many updates are needed, this
+ # spreads them over a few runs rather than applying them all at
+ # once.
+ BalanceUpdateLimit: 100000
+
# Default lifetime for ephemeral collections: 2 weeks. This must not
# be less than BlobSigningTTL.
DefaultTrashLifetime: 336h
"Collections.BalanceCollectionBuffers": false,
"Collections.BalancePeriod": false,
"Collections.BalanceTimeout": false,
+ "Collections.BalanceUpdateLimit": false,
"Collections.BlobDeleteConcurrency": false,
"Collections.BlobMissingReport": false,
"Collections.BlobReplicateConcurrency": false,
# long-running balancing operation.
BalanceTimeout: 6h
+ # Maximum number of replication_confirmed /
+ # storage_classes_confirmed updates to write to the database
+ # after a rebalancing run. When many updates are needed, this
+ # spreads them over a few runs rather than applying them all at
+ # once.
+ BalanceUpdateLimit: 100000
+
# Default lifetime for ephemeral collections: 2 weeks. This must not
# be less than BlobSigningTTL.
DefaultTrashLifetime: 336h
BalanceCollectionBatch int
BalanceCollectionBuffers int
BalanceTimeout Duration
+ BalanceUpdateLimit int
WebDAVCache WebDAVCacheConfig
}
func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
+ defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
+ _, err := s.db.Exec(`delete from collections`)
+ c.Assert(err, check.IsNil)
opts := RunOptions{
CommitPulls: true,
CommitTrash: true,
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
srv := s.newServer(&opts)
- _, err := srv.runOnce()
+ _, err = srv.runOnce()
c.Check(err, check.ErrorMatches, "received zero collections")
c.Check(trashReqs.Count(), check.Equals, 4)
c.Check(pullReqs.Count(), check.Equals, 0)
c.Check(pullReqs.Count(), check.Equals, 0)
}
-func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
- opts := RunOptions{
- CommitPulls: true,
- CommitTrash: true,
- Logger: ctxlog.TestLogger(c),
- }
- s.stub.serveCurrentUserAdmin()
- s.stub.serveCollectionsButSkipOne()
- s.stub.serveKeepServices(stubServices)
- s.stub.serveKeepstoreMounts()
- s.stub.serveKeepstoreIndexFoo4Bar1()
- trashReqs := s.stub.serveKeepstoreTrash()
- pullReqs := s.stub.serveKeepstorePull()
- srv := s.newServer(&opts)
- _, err := srv.runOnce()
- c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
- c.Check(trashReqs.Count(), check.Equals, 4)
- c.Check(pullReqs.Count(), check.Equals, 0)
-}
-
func (s *runSuite) TestWriteLostBlocks(c *check.C) {
lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
c.Assert(err, check.IsNil)
c.Check(err, check.IsNil)
lost, err := ioutil.ReadFile(lostf.Name())
c.Assert(err, check.IsNil)
- c.Check(string(lost), check.Equals, "37b51d194a7513e45b56f6524f2d51f2 fa7aeb5140e2848d39b416daeef4ffc5+45\n")
+ c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
}
func (s *runSuite) TestDryRun(c *check.C) {
}
func (s *runSuite) TestCommit(c *check.C) {
- lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
- c.Assert(err, check.IsNil)
- s.config.Collections.BlobMissingReport = lostf.Name()
- defer os.Remove(lostf.Name())
-
+ s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
s.config.ManagementToken = "xyzzy"
opts := RunOptions{
CommitPulls: true,
// in a poor rendezvous position
c.Check(bal.stats.pulls, check.Equals, 2)
- lost, err := ioutil.ReadFile(lostf.Name())
+ lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
c.Assert(err, check.IsNil)
- c.Check(string(lost), check.Equals, "")
+ c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
buf, err := s.getMetrics(c, srv)
c.Check(err, check.IsNil)
- c.Check(buf, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
- c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
- c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
- c.Check(buf, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio 1\.5\n.*`)
- c.Check(buf, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio 1\.5\n.*`)
+ bufstr := buf.String()
+ c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
+ c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
+ c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
+ c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
+ c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
}
func (s *runSuite) TestRunForever(c *check.C) {
"context"
"encoding/json"
"fmt"
- "io"
"runtime"
"sync"
"sync/atomic"
if err != nil {
return err
}
+ progressTicker := time.NewTicker(10 * time.Second)
+ defer progressTicker.Stop()
callCount := 0
for rows.Next() {
var coll arvados.Collection
if err != nil {
return err
}
- progress(callCount, expectCount)
+ select {
+ case <-progressTicker.C:
+ progress(callCount, expectCount)
+ default:
+ }
}
+ progress(callCount, expectCount)
rows.Close()
if err := rows.Err(); err != nil {
return err
threshold := time.Now()
thresholdStr := threshold.Format(time.RFC3339Nano)
+ updated := int64(0)
+
var err error
collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
go func() {
defer close(collQ)
- err = EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
- if coll.ModifiedAt.After(threshold) {
- return io.EOF
- }
- if coll.IsTrashed {
- return nil
+ err := EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
+ if atomic.LoadInt64(&updated) >= int64(cluster.Collections.BalanceUpdateLimit) {
+ bal.logf("reached BalanceUpdateLimit (%d)", cluster.Collections.BalanceUpdateLimit)
+ cancel()
+ return context.Canceled
}
collQ <- coll
return nil
}, func(done, total int) {
bal.logf("update collections: %d/%d", done, total)
})
- if err == io.EOF {
- err = nil
- } else if err != nil {
+ if err != nil && err != context.Canceled {
bal.logf("error updating collections: %s", err)
}
}()
- var updated int64
var wg sync.WaitGroup
for i := 0; i < runtime.NumCPU(); i++ {
wg.Add(1)
go func() {
defer wg.Done()
for coll := range collQ {
+ if ctx.Err() != nil {
+ continue
+ }
blkids, err := coll.SizedDigests()
if err != nil {
bal.logf("%s: %s", coll.UUID, err)
where uuid=$4`,
repl, thresholdStr, classes, coll.UUID)
if err != nil {
- bal.logf("%s: update failed: %s", coll.UUID, err)
+ if err != context.Canceled {
+ bal.logf("%s: update failed: %s", coll.UUID, err)
+ }
continue
}
atomic.AddInt64(&updated, 1)
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/health"
"github.com/jmoiron/sqlx"
+ _ "github.com/lib/pq"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
"net/http"
"time"
+ "git.arvados.org/arvados.git/lib/config"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/ghodss/yaml"
check "gopkg.in/check.v1"
)
}
func (s *mainSuite) TestHTTPServer(c *check.C) {
+ arvadostest.StartKeep(2, true)
+
ln, err := net.Listen("tcp", ":0")
if err != nil {
c.Fatal(err)
_, p, err := net.SplitHostPort(ln.Addr().String())
c.Check(err, check.IsNil)
ln.Close()
- config := "Clusters:\n zzzzz:\n ManagementToken: abcdefg\n Services: {Keepbalance: {InternalURLs: {'http://localhost:" + p + "/': {}}}}\n"
+ cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+ c.Assert(err, check.IsNil)
+ cluster, err := cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+ cluster.Services.Keepbalance.InternalURLs[arvados.URL{Host: "localhost:" + p, Path: "/"}] = arvados.ServiceInstance{}
+ cfg.Clusters[cluster.ClusterID] = *cluster
+ config, err := yaml.Marshal(cfg)
+ c.Assert(err, check.IsNil)
var stdout bytes.Buffer
- go runCommand("keep-balance", []string{"-config", "-"}, bytes.NewBufferString(config), &stdout, &stdout)
+ go runCommand("keep-balance", []string{"-config", "-"}, bytes.NewBuffer(config), &stdout, &stdout)
done := make(chan struct{})
go func() {
defer close(done)
c.Fatal(err)
return
}
- req.Header.Set("Authorization", "Bearer abcdefg")
+ req.Header.Set("Authorization", "Bearer "+cluster.ManagementToken)
resp, err := http.DefaultClient.Do(req)
if err != nil {
c.Logf("error %s", err)
c.Log(stdout.String())
c.Fatal("timeout")
}
+ c.Log(stdout.String())
// Check non-metrics URL that gets passed through to us from
// service.Command