"Upgrading from 2.2.0":#v2_2_0
+h3. keep-balance requires access to PostgreSQL
+
+Make sure the keep-balance process can connect to your PostgreSQL server using the settings in your config file. (In previous versions, keep-balance accessed the database through controller instead of connecting to the database server directly.)
+
h3. crunch-dispatch-local now requires config.yml
The @crunch-dispatch-local@ dispatcher now reads the API host and token from the system wide @/etc/arvados/config.yml@ . It will fail to start that file is not found or not readable.
Keep-balance deletes unreferenced and overreplicated blocks from Keep servers, makes additional copies of underreplicated blocks, and moves blocks into optimal locations as needed (e.g., after adding new servers). See "Balancing Keep servers":{{site.baseurl}}/admin/keep-balance.html for usage details.
-Keep-balance can be installed anywhere with network access to Keep services. Typically it runs on the same host as keepproxy.
+Keep-balance can be installed anywhere with network access to Keep services, arvados-controller, and PostgreSQL. Typically it runs on the same host as keepproxy.
*A cluster should have only one instance of keep-balance running at a time.*
{% include 'notebox_begin' %}
-If you are installing keep-balance on an existing system with valuable data, you can run keep-balance in "dry run" mode first and review its logs as a precaution. To do this, edit your keep-balance startup script to use the flags @-commit-pulls=false -commit-trash=false@.
+If you are installing keep-balance on an existing system with valuable data, you can run keep-balance in "dry run" mode first and review its logs as a precaution. To do this, edit your keep-balance startup script to use the flags @-commit-pulls=false -commit-trash=false -commit-confirmed-fields=false@.
{% include 'notebox_end' %}
# long-running balancing operation.
BalanceTimeout: 6h
+ # Maximum number of replication_confirmed /
+ # storage_classes_confirmed updates to write to the database
+ # after a rebalancing run. When many updates are needed, this
+ # spreads them over a few runs rather than applying them all at
+ # once.
+ BalanceUpdateLimit: 100000
+
# Default lifetime for ephemeral collections: 2 weeks. This must not
# be less than BlobSigningTTL.
DefaultTrashLifetime: 336h
"Collections.BalanceCollectionBuffers": false,
"Collections.BalancePeriod": false,
"Collections.BalanceTimeout": false,
+ "Collections.BalanceUpdateLimit": false,
"Collections.BlobDeleteConcurrency": false,
"Collections.BlobMissingReport": false,
"Collections.BlobReplicateConcurrency": false,
# long-running balancing operation.
BalanceTimeout: 6h
+ # Maximum number of replication_confirmed /
+ # storage_classes_confirmed updates to write to the database
+ # after a rebalancing run. When many updates are needed, this
+ # spreads them over a few runs rather than applying them all at
+ # once.
+ BalanceUpdateLimit: 100000
+
# Default lifetime for ephemeral collections: 2 weeks. This must not
# be less than BlobSigningTTL.
DefaultTrashLifetime: 336h
"io"
"net"
"net/http"
+ _ "net/http/pprof"
"net/url"
"os"
"strings"
loader := config.NewLoader(stdin, log)
loader.SetupFlags(flags)
versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
+ pprofAddr := flags.String("pprof", "", "Serve Go profile data at `[addr]:port`")
err = flags.Parse(args)
if err == flag.ErrHelp {
err = nil
return cmd.Version.RunCommand(prog, args, stdin, stdout, stderr)
}
+ if *pprofAddr != "" {
+ go func() {
+ log.Println(http.ListenAndServe(*pprofAddr, nil))
+ }()
+ }
+
if strings.HasSuffix(prog, "controller") {
// Some config-loader checks try to make API calls via
// controller. Those can't be expected to work if this
package arvados
import (
- "bufio"
+ "bytes"
"crypto/md5"
"fmt"
"regexp"
- "strings"
"time"
"git.arvados.org/arvados.git/sdk/go/blockdigest"
//
// Zero-length blocks are not included.
func (c *Collection) SizedDigests() ([]SizedDigest, error) {
- manifestText := c.ManifestText
- if manifestText == "" {
- manifestText = c.UnsignedManifestText
+ manifestText := []byte(c.ManifestText)
+ if len(manifestText) == 0 {
+ manifestText = []byte(c.UnsignedManifestText)
}
- if manifestText == "" && c.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
+ if len(manifestText) == 0 && c.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
// TODO: Check more subtle forms of corruption, too
return nil, fmt.Errorf("manifest is missing")
}
- var sds []SizedDigest
- scanner := bufio.NewScanner(strings.NewReader(manifestText))
- scanner.Buffer(make([]byte, 1048576), len(manifestText))
- for scanner.Scan() {
- line := scanner.Text()
- tokens := strings.Split(line, " ")
+ sds := make([]SizedDigest, 0, len(manifestText)/40)
+ for _, line := range bytes.Split(manifestText, []byte{'\n'}) {
+ if len(line) == 0 {
+ continue
+ }
+ tokens := bytes.Split(line, []byte{' '})
if len(tokens) < 3 {
return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
}
for _, token := range tokens[1:] {
- if !blockdigest.LocatorPattern.MatchString(token) {
+ if !blockdigest.LocatorPattern.Match(token) {
// FIXME: ensure it's a file token
break
}
- if strings.HasPrefix(token, "d41d8cd98f00b204e9800998ecf8427e+0") {
+ if bytes.HasPrefix(token, []byte("d41d8cd98f00b204e9800998ecf8427e+0")) {
// Exclude "empty block" placeholder
continue
}
// FIXME: shouldn't assume 32 char hash
- if i := strings.IndexRune(token[33:], '+'); i >= 0 {
+ if i := bytes.IndexRune(token[33:], '+'); i >= 0 {
token = token[:33+i]
}
- sds = append(sds, SizedDigest(token))
+ sds = append(sds, SizedDigest(string(token)))
}
}
- return sds, scanner.Err()
+ return sds, nil
}
type CollectionList struct {
BalanceCollectionBatch int
BalanceCollectionBuffers int
BalanceTimeout Duration
+ BalanceUpdateLimit int
WebDAVCache WebDAVCacheConfig
UserAgreementPDH = "b519d9cb706a29fc7ea24dbea2f05851+93"
HelloWorldPdh = "55713e6a34081eb03609e7ad5fcad129+62"
- MultilevelCollection1 = "zzzzz-4zz18-pyw8yp9g3pr7irn"
+ MultilevelCollection1 = "zzzzz-4zz18-pyw8yp9g3pr7irn"
+ StorageClassesDesiredDefaultConfirmedDefault = "zzzzz-4zz18-3t236wr12769tga"
+ StorageClassesDesiredArchiveConfirmedDefault = "zzzzz-4zz18-3t236wr12769qqa"
+ EmptyCollectionUUID = "zzzzz-4zz18-gs9ooj1h9sd5mde"
AProjectUUID = "zzzzz-j7d0g-v955i6s2oi1cbso"
ASubprojectUUID = "zzzzz-j7d0g-axqo7eu9pwvna1x"
"sort"
"strings"
"sync"
+ "sync/atomic"
"syscall"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/keepclient"
+ "github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
)
// BlobSignatureTTL; and all N existing replicas of a given data block
// are in the N best positions in rendezvous probe order.
type Balancer struct {
+ DB *sqlx.DB
Logger logrus.FieldLogger
Dumper logrus.FieldLogger
Metrics *metrics
classes []string
mounts int
mountsByClass map[string]map[*KeepMount]bool
- collScanned int
+ collScanned int64
serviceRoots map[string]string
errors []error
stats balancerStats
}
if runOptions.CommitTrash {
err = bal.CommitTrash(ctx, client)
+ if err != nil {
+ return
+ }
+ }
+ if runOptions.CommitConfirmedFields {
+ err = bal.updateCollections(ctx, client, cluster)
+ if err != nil {
+ return
+ }
}
return
}
}(mounts)
}
- // collQ buffers incoming collections so we can start fetching
- // the next page without waiting for the current page to
- // finish processing.
collQ := make(chan arvados.Collection, bufs)
- // Start a goroutine to process collections. (We could use a
- // worker pool here, but even with a single worker we already
- // process collections much faster than we can retrieve them.)
- wg.Add(1)
- go func() {
- defer wg.Done()
- for coll := range collQ {
- err := bal.addCollection(coll)
- if err != nil || len(errs) > 0 {
- select {
- case errs <- err:
- default:
- }
- for range collQ {
- }
- cancel()
- return
- }
- bal.collScanned++
- }
- }()
-
- // Start a goroutine to retrieve all collections from the
- // Arvados database and send them to collQ for processing.
+ // Retrieve all collections from the database and send them to
+ // collQ.
wg.Add(1)
go func() {
defer wg.Done()
- err = EachCollection(ctx, c, pageSize,
+ err = EachCollection(ctx, bal.DB, c,
func(coll arvados.Collection) error {
collQ <- coll
if len(errs) > 0 {
}
}()
+ // Parse manifests from collQ and pass the block hashes to
+ // BlockStateMap to track desired replication.
+ for i := 0; i < runtime.NumCPU(); i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for coll := range collQ {
+ err := bal.addCollection(coll)
+ if err != nil || len(errs) > 0 {
+ select {
+ case errs <- err:
+ default:
+ }
+ cancel()
+ continue
+ }
+ atomic.AddInt64(&bal.collScanned, 1)
+ }
+ }()
+ }
+
wg.Wait()
if len(errs) > 0 {
return <-errs
if coll.ReplicationDesired != nil {
repl = *coll.ReplicationDesired
}
- bal.Logger.Debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
+ bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl)
// Pass pdh to IncreaseDesired only if LostBlocksFile is being
// written -- otherwise it's just a waste of memory.
pdh := ""
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
check "gopkg.in/check.v1"
type runSuite struct {
stub stubServer
config *arvados.Cluster
+ db *sqlx.DB
client *arvados.Client
}
Metrics: newMetrics(prometheus.NewRegistry()),
Logger: options.Logger,
Dumper: options.Dumper,
+ DB: s.db,
}
return srv
}
c.Assert(err, check.Equals, nil)
s.config, err = cfg.GetCluster("")
c.Assert(err, check.Equals, nil)
+ s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
+ c.Assert(err, check.IsNil)
s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
}
func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
+ defer arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
+ _, err := s.db.Exec(`delete from collections`)
+ c.Assert(err, check.IsNil)
opts := RunOptions{
CommitPulls: true,
CommitTrash: true,
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
srv := s.newServer(&opts)
- _, err := srv.runOnce()
+ _, err = srv.runOnce()
c.Check(err, check.ErrorMatches, "received zero collections")
c.Check(trashReqs.Count(), check.Equals, 4)
c.Check(pullReqs.Count(), check.Equals, 0)
c.Check(pullReqs.Count(), check.Equals, 0)
}
-func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
- opts := RunOptions{
- CommitPulls: true,
- CommitTrash: true,
- Logger: ctxlog.TestLogger(c),
- }
- s.stub.serveCurrentUserAdmin()
- s.stub.serveCollectionsButSkipOne()
- s.stub.serveKeepServices(stubServices)
- s.stub.serveKeepstoreMounts()
- s.stub.serveKeepstoreIndexFoo4Bar1()
- trashReqs := s.stub.serveKeepstoreTrash()
- pullReqs := s.stub.serveKeepstorePull()
- srv := s.newServer(&opts)
- _, err := srv.runOnce()
- c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
- c.Check(trashReqs.Count(), check.Equals, 4)
- c.Check(pullReqs.Count(), check.Equals, 0)
-}
-
func (s *runSuite) TestWriteLostBlocks(c *check.C) {
lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
c.Assert(err, check.IsNil)
c.Check(err, check.IsNil)
lost, err := ioutil.ReadFile(lostf.Name())
c.Assert(err, check.IsNil)
- c.Check(string(lost), check.Equals, "37b51d194a7513e45b56f6524f2d51f2 fa7aeb5140e2848d39b416daeef4ffc5+45\n")
+ c.Check(string(lost), check.Matches, `(?ms).*37b51d194a7513e45b56f6524f2d51f2.* fa7aeb5140e2848d39b416daeef4ffc5\+45.*`)
}
func (s *runSuite) TestDryRun(c *check.C) {
}
func (s *runSuite) TestCommit(c *check.C) {
- lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
- c.Assert(err, check.IsNil)
- s.config.Collections.BlobMissingReport = lostf.Name()
- defer os.Remove(lostf.Name())
-
+ s.config.Collections.BlobMissingReport = c.MkDir() + "/keep-balance-lost-blocks-test-"
s.config.ManagementToken = "xyzzy"
opts := RunOptions{
CommitPulls: true,
// in a poor rendezvous position
c.Check(bal.stats.pulls, check.Equals, 2)
- lost, err := ioutil.ReadFile(lostf.Name())
+ lost, err := ioutil.ReadFile(s.config.Collections.BlobMissingReport)
c.Assert(err, check.IsNil)
- c.Check(string(lost), check.Equals, "")
+ c.Check(string(lost), check.Not(check.Matches), `(?ms).*acbd18db4cc2f85cedef654fccc4a4d8.*`)
buf, err := s.getMetrics(c, srv)
c.Check(err, check.IsNil)
- c.Check(buf, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
- c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
- c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
- c.Check(buf, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio 1\.5\n.*`)
- c.Check(buf, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio 1\.5\n.*`)
+ bufstr := buf.String()
+ c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
+ c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
+ c.Check(bufstr, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
+ c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio [1-9].*`)
+ c.Check(bufstr, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio [1-9].*`)
}
func (s *runSuite) TestRunForever(c *check.C) {
bsm.get(blkid).increaseDesired(pdh, classes, n)
}
}
+
+// GetConfirmedReplication returns the replication level of the given
+// blocks, considering only the specified storage classes.
+//
+// If len(classes)==0, returns the replication level without regard to
+// storage classes.
+//
+// Safe to call concurrently with other calls to GetCurrent, but not
+// with different BlockStateMap methods.
+func (bsm *BlockStateMap) GetConfirmedReplication(blkids []arvados.SizedDigest, classes []string) int {
+ defaultClasses := map[string]bool{"default": true}
+ min := 0
+ for _, blkid := range blkids {
+ total := 0
+ perclass := make(map[string]int, len(classes))
+ for _, c := range classes {
+ perclass[c] = 0
+ }
+ for _, r := range bsm.get(blkid).Replicas {
+ total += r.KeepMount.Replication
+ mntclasses := r.KeepMount.StorageClasses
+ if len(mntclasses) == 0 {
+ mntclasses = defaultClasses
+ }
+ for c := range mntclasses {
+ n, ok := perclass[c]
+ if !ok {
+ // Don't care about this storage class
+ continue
+ }
+ perclass[c] = n + r.KeepMount.Replication
+ }
+ }
+ if total == 0 {
+ return 0
+ }
+ for _, n := range perclass {
+ if n == 0 {
+ return 0
+ }
+ if n < min || min == 0 {
+ min = n
+ }
+ }
+ if len(perclass) == 0 && (min == 0 || min > total) {
+ min = total
+ }
+ }
+ return min
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "time"
+
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&confirmedReplicationSuite{})
+
+type confirmedReplicationSuite struct {
+ blockStateMap *BlockStateMap
+ mtime int64
+}
+
+func (s *confirmedReplicationSuite) SetUpTest(c *check.C) {
+ t, _ := time.Parse(time.RFC3339Nano, time.RFC3339Nano)
+ s.mtime = t.UnixNano()
+ s.blockStateMap = NewBlockStateMap()
+ s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+ Replication: 1,
+ StorageClasses: map[string]bool{"default": true},
+ }}, []arvados.KeepServiceIndexEntry{
+ {SizedDigest: knownBlkid(10), Mtime: s.mtime},
+ })
+ s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+ Replication: 2,
+ StorageClasses: map[string]bool{"default": true},
+ }}, []arvados.KeepServiceIndexEntry{
+ {SizedDigest: knownBlkid(20), Mtime: s.mtime},
+ })
+}
+
+func (s *confirmedReplicationSuite) TestZeroReplication(c *check.C) {
+ n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(404), knownBlkid(409)}, []string{"default"})
+ c.Check(n, check.Equals, 0)
+ n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(10), knownBlkid(404)}, []string{"default"})
+ c.Check(n, check.Equals, 0)
+ n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(10), knownBlkid(404)}, nil)
+ c.Check(n, check.Equals, 0)
+}
+
+func (s *confirmedReplicationSuite) TestBlocksWithDifferentReplication(c *check.C) {
+ n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(10), knownBlkid(20)}, []string{"default"})
+ c.Check(n, check.Equals, 1)
+}
+
+func (s *confirmedReplicationSuite) TestBlocksInDifferentClasses(c *check.C) {
+ s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+ Replication: 3,
+ StorageClasses: map[string]bool{"three": true},
+ }}, []arvados.KeepServiceIndexEntry{
+ {SizedDigest: knownBlkid(30), Mtime: s.mtime},
+ })
+
+ n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(30)}, []string{"three"})
+ c.Check(n, check.Equals, 3)
+ n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(20), knownBlkid(30)}, []string{"default"})
+ c.Check(n, check.Equals, 0) // block 30 has repl 0 @ "default"
+ n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(20), knownBlkid(30)}, []string{"three"})
+ c.Check(n, check.Equals, 0) // block 20 has repl 0 @ "three"
+ n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(20), knownBlkid(30)}, nil)
+ c.Check(n, check.Equals, 2)
+}
+
+func (s *confirmedReplicationSuite) TestBlocksOnMultipleMounts(c *check.C) {
+ s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+ Replication: 2,
+ StorageClasses: map[string]bool{"default": true, "four": true},
+ }}, []arvados.KeepServiceIndexEntry{
+ {SizedDigest: knownBlkid(40), Mtime: s.mtime},
+ {SizedDigest: knownBlkid(41), Mtime: s.mtime},
+ })
+ s.blockStateMap.AddReplicas(&KeepMount{KeepMount: arvados.KeepMount{
+ Replication: 2,
+ StorageClasses: map[string]bool{"four": true},
+ }}, []arvados.KeepServiceIndexEntry{
+ {SizedDigest: knownBlkid(40), Mtime: s.mtime},
+ {SizedDigest: knownBlkid(41), Mtime: s.mtime},
+ })
+ n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, []string{"default"})
+ c.Check(n, check.Equals, 2)
+ n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, []string{"four"})
+ c.Check(n, check.Equals, 4)
+ n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, []string{"default", "four"})
+ c.Check(n, check.Equals, 2)
+ n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, nil)
+ c.Check(n, check.Equals, 4)
+}
import (
"context"
+ "encoding/json"
"fmt"
+ "runtime"
+ "sync"
+ "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/jmoiron/sqlx"
)
func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
// The progress function is called periodically with done (number of
// times f has been called) and total (number of times f is expected
// to be called).
-//
-// If pageSize > 0 it is used as the maximum page size in each API
-// call; otherwise the maximum allowed page size is requested.
-func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
+func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error {
if progress == nil {
progress = func(_, _ int) {}
}
if err != nil {
return err
}
+ var newestModifiedAt time.Time
- // Note the obvious way to get all collections (sorting by
- // UUID) would be much easier, but would lose data: If a
- // client were to move files from collection with uuid="zzz"
- // to a collection with uuid="aaa" around the time when we
- // were fetching the "mmm" page, we would never see those
- // files' block IDs at all -- even if the client is careful to
- // save "aaa" before saving "zzz".
- //
- // Instead, we get pages in modified_at order. Collections
- // that are modified during the run will be re-fetched in a
- // subsequent page.
-
- limit := pageSize
- if limit <= 0 {
- // Use the maximum page size the server allows
- limit = 1<<31 - 1
- }
- params := arvados.ResourceListParams{
- Limit: &limit,
- Order: "modified_at, uuid",
- Count: "none",
- Select: []string{"uuid", "unsigned_manifest_text", "modified_at", "portable_data_hash", "replication_desired"},
- IncludeTrash: true,
- IncludeOldVersions: true,
+ rows, err := db.QueryxContext(ctx, `SELECT
+ uuid, manifest_text, modified_at, portable_data_hash,
+ replication_desired, replication_confirmed, replication_confirmed_at,
+ storage_classes_desired, storage_classes_confirmed, storage_classes_confirmed_at,
+ is_trashed
+ FROM collections`)
+ if err != nil {
+ return err
}
- var last arvados.Collection
- var filterTime time.Time
+ defer rows.Close()
+ progressTicker := time.NewTicker(10 * time.Second)
+ defer progressTicker.Stop()
callCount := 0
- gettingExactTimestamp := false
- for {
- progress(callCount, expectCount)
- var page arvados.CollectionList
- err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params)
+ for rows.Next() {
+ var coll arvados.Collection
+ var classesDesired, classesConfirmed []byte
+ err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash,
+ &coll.ReplicationDesired, &coll.ReplicationConfirmed, &coll.ReplicationConfirmedAt,
+ &classesDesired, &classesConfirmed, &coll.StorageClassesConfirmedAt,
+ &coll.IsTrashed)
if err != nil {
return err
}
- for _, coll := range page.Items {
- if last.ModifiedAt == coll.ModifiedAt && last.UUID >= coll.UUID {
- continue
- }
- callCount++
- err = f(coll)
- if err != nil {
- return err
- }
- last = coll
+
+ err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
+ if err != nil && len(classesDesired) > 0 {
+ return err
}
- if len(page.Items) == 0 && !gettingExactTimestamp {
- break
- } else if last.ModifiedAt.IsZero() {
- return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
- } else if len(page.Items) > 0 && last.ModifiedAt == filterTime {
- // If we requested time>=X and never got a
- // time>X then we might not have received all
- // items with time==X yet. Switch to
- // gettingExactTimestamp mode (if we're not
- // there already), advancing our UUID
- // threshold with each request, until we get
- // an empty page.
- gettingExactTimestamp = true
- params.Filters = []arvados.Filter{{
- Attr: "modified_at",
- Operator: "=",
- Operand: filterTime,
- }, {
- Attr: "uuid",
- Operator: ">",
- Operand: last.UUID,
- }}
- } else if gettingExactTimestamp {
- // This must be an empty page (in this mode,
- // an unequal timestamp is impossible) so we
- // can start getting pages of newer
- // collections.
- gettingExactTimestamp = false
- params.Filters = []arvados.Filter{{
- Attr: "modified_at",
- Operator: ">",
- Operand: filterTime,
- }}
- } else {
- // In the normal case, we know we have seen
- // all collections with modtime<filterTime,
- // but we might not have seen all that have
- // modtime=filterTime. Hence we use >= instead
- // of > and skip the obvious overlapping item,
- // i.e., the last item on the previous
- // page. In some edge cases this can return
- // collections we have already seen, but
- // avoiding that would add overhead in the
- // overwhelmingly common cases, so we don't
- // bother.
- filterTime = last.ModifiedAt
- params.Filters = []arvados.Filter{{
- Attr: "modified_at",
- Operator: ">=",
- Operand: filterTime,
- }, {
- Attr: "uuid",
- Operator: "!=",
- Operand: last.UUID,
- }}
+ err = json.Unmarshal(classesConfirmed, &coll.StorageClassesConfirmed)
+ if err != nil && len(classesConfirmed) > 0 {
+ return err
+ }
+ if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
+ newestModifiedAt = coll.ModifiedAt
+ }
+ callCount++
+ err = f(coll)
+ if err != nil {
+ return err
+ }
+ select {
+ case <-progressTicker.C:
+ progress(callCount, expectCount)
+ default:
}
}
progress(callCount, expectCount)
-
+ err = rows.Close()
+ if err != nil {
+ return err
+ }
if checkCount, err := countCollections(c, arvados.ResourceListParams{
Filters: []arvados.Filter{{
Attr: "modified_at",
Operator: "<=",
- Operand: filterTime}},
+ Operand: newestModifiedAt}},
IncludeTrash: true,
IncludeOldVersions: true,
}); err != nil {
return err
} else if callCount < checkCount {
- return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, filterTime, checkCount)
+ return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, newestModifiedAt, checkCount)
}
return nil
}
+
+func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, cluster *arvados.Cluster) error {
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ defer bal.time("update_collections", "wall clock time to update collections")()
+ threshold := time.Now()
+ thresholdStr := threshold.Format(time.RFC3339Nano)
+
+ updated := int64(0)
+
+ errs := make(chan error, 1)
+ collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
+ go func() {
+ defer close(collQ)
+ err := EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
+ if atomic.LoadInt64(&updated) >= int64(cluster.Collections.BalanceUpdateLimit) {
+ bal.logf("reached BalanceUpdateLimit (%d)", cluster.Collections.BalanceUpdateLimit)
+ cancel()
+ return context.Canceled
+ }
+ collQ <- coll
+ return nil
+ }, func(done, total int) {
+ bal.logf("update collections: %d/%d (%d updated @ %.01f updates/s)", done, total, atomic.LoadInt64(&updated), float64(atomic.LoadInt64(&updated))/time.Since(threshold).Seconds())
+ })
+ if err != nil && err != context.Canceled {
+ select {
+ case errs <- err:
+ default:
+ }
+ }
+ }()
+
+ var wg sync.WaitGroup
+
+ // Use about 1 goroutine per 2 CPUs. Based on experiments with
+ // a 2-core host, using more concurrent database
+ // calls/transactions makes this process slower, not faster.
+ for i := 0; i < runtime.NumCPU()+1/2; i++ {
+ wg.Add(1)
+ goSendErr(errs, func() error {
+ defer wg.Done()
+ tx, err := bal.DB.Beginx()
+ if err != nil {
+ return err
+ }
+ txPending := 0
+ flush := func(final bool) error {
+ err := tx.Commit()
+ if err != nil && ctx.Err() == nil {
+ tx.Rollback()
+ return err
+ }
+ txPending = 0
+ if final {
+ return nil
+ }
+ tx, err = bal.DB.Beginx()
+ return err
+ }
+ txBatch := 100
+ for coll := range collQ {
+ if ctx.Err() != nil || len(errs) > 0 {
+ continue
+ }
+ blkids, err := coll.SizedDigests()
+ if err != nil {
+ bal.logf("%s: %s", coll.UUID, err)
+ continue
+ }
+ repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
+
+ desired := bal.DefaultReplication
+ if coll.ReplicationDesired != nil {
+ desired = *coll.ReplicationDesired
+ }
+ if repl > desired {
+ // If actual>desired, confirm
+ // the desired number rather
+ // than actual to avoid
+ // flapping updates when
+ // replication increases
+ // temporarily.
+ repl = desired
+ }
+ classes := emptyJSONArray
+ if repl > 0 {
+ classes, err = json.Marshal(coll.StorageClassesDesired)
+ if err != nil {
+ bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
+ continue
+ }
+ }
+ needUpdate := coll.ReplicationConfirmed == nil || *coll.ReplicationConfirmed != repl || len(coll.StorageClassesConfirmed) != len(coll.StorageClassesDesired)
+ for i := range coll.StorageClassesDesired {
+ if !needUpdate && coll.StorageClassesDesired[i] != coll.StorageClassesConfirmed[i] {
+ needUpdate = true
+ }
+ }
+ if !needUpdate {
+ continue
+ }
+ _, err = tx.ExecContext(ctx, `update collections set
+ replication_confirmed=$1,
+ replication_confirmed_at=$2,
+ storage_classes_confirmed=$3,
+ storage_classes_confirmed_at=$2
+ where uuid=$4`,
+ repl, thresholdStr, classes, coll.UUID)
+ if err != nil {
+ if ctx.Err() == nil {
+ bal.logf("%s: update failed: %s", coll.UUID, err)
+ }
+ continue
+ }
+ atomic.AddInt64(&updated, 1)
+ if txPending++; txPending >= txBatch {
+ err = flush(false)
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return flush(true)
+ })
+ }
+ wg.Wait()
+ bal.logf("updated %d collections", updated)
+ if len(errs) > 0 {
+ return fmt.Errorf("error updating collections: %s", <-errs)
+ }
+ return nil
+}
+
+// Call f in a new goroutine. If it returns a non-nil error, send the
+// error to the errs channel (unless the channel is already full with
+// another error).
+func goSendErr(errs chan<- error, f func() error) {
+ go func() {
+ err := f()
+ if err != nil {
+ select {
+ case errs <- err:
+ default:
+ }
+ }
+ }()
+}
+
+var emptyJSONArray = []byte("[]")
import (
"context"
- "sync"
- "time"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/jmoiron/sqlx"
check "gopkg.in/check.v1"
)
-// TestIdenticalTimestamps ensures EachCollection returns the same
-// set of collections for various page sizes -- even page sizes so
-// small that we get entire pages full of collections with identical
-// timestamps and exercise our gettingExactTimestamp cases.
-func (s *integrationSuite) TestIdenticalTimestamps(c *check.C) {
- // pageSize==0 uses the default (large) page size.
- pageSizes := []int{0, 2, 3, 4, 5}
- got := make([][]string, len(pageSizes))
- var wg sync.WaitGroup
- for trial, pageSize := range pageSizes {
- wg.Add(1)
- go func(trial, pageSize int) {
- defer wg.Done()
- streak := 0
- longestStreak := 0
- var lastMod time.Time
- sawUUID := make(map[string]bool)
- err := EachCollection(context.Background(), s.client, pageSize, func(c arvados.Collection) error {
- if c.ModifiedAt.IsZero() {
- return nil
- }
- if sawUUID[c.UUID] {
- // dup
- return nil
- }
- got[trial] = append(got[trial], c.UUID)
- sawUUID[c.UUID] = true
- if lastMod == c.ModifiedAt {
- streak++
- if streak > longestStreak {
- longestStreak = streak
- }
- } else {
- streak = 0
- lastMod = c.ModifiedAt
- }
- return nil
- }, nil)
- c.Check(err, check.IsNil)
- c.Check(longestStreak > 25, check.Equals, true)
- }(trial, pageSize)
- }
- wg.Wait()
- for trial := 1; trial < len(pageSizes); trial++ {
- c.Check(got[trial], check.DeepEquals, got[0])
- }
+// TestMissedCollections exercises EachCollection's sanity check:
+// #collections processed >= #old collections that exist in database
+// after processing.
+func (s *integrationSuite) TestMissedCollections(c *check.C) {
+ cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+ c.Assert(err, check.IsNil)
+ cluster, err := cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+ db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+ c.Assert(err, check.IsNil)
+
+ defer db.Exec(`delete from collections where uuid = 'zzzzz-4zz18-404040404040404'`)
+ insertedOld := false
+ err = EachCollection(context.Background(), db, s.client, func(coll arvados.Collection) error {
+ if !insertedOld {
+ insertedOld = true
+ _, err := db.Exec(`insert into collections (uuid, created_at, updated_at, modified_at) values ('zzzzz-4zz18-404040404040404', '2002-02-02T02:02:02Z', '2002-02-02T02:02:02Z', '2002-02-02T02:02:02Z')`)
+ return err
+ }
+ return nil
+ }, nil)
+ c.Check(err, check.ErrorMatches, `Retrieved .* collections .* but server now reports .* collections.*`)
}
import (
"bytes"
+ "io"
"os"
"strings"
"testing"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/keepclient"
+ "github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
type integrationSuite struct {
config *arvados.Cluster
+ db *sqlx.DB
client *arvados.Client
keepClient *keepclient.KeepClient
}
c.Assert(err, check.Equals, nil)
s.config, err = cfg.GetCluster("")
c.Assert(err, check.Equals, nil)
+ s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
+ c.Assert(err, check.IsNil)
s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
s.client = &arvados.Client{
for iter := 0; iter < 20; iter++ {
logBuf.Reset()
logger := logrus.New()
- logger.Out = &logBuf
+ logger.Out = io.MultiWriter(&logBuf, os.Stderr)
opts := RunOptions{
- CommitPulls: true,
- CommitTrash: true,
- Logger: logger,
+ CommitPulls: true,
+ CommitTrash: true,
+ CommitConfirmedFields: true,
+ Logger: logger,
}
bal := &Balancer{
+ DB: s.db,
Logger: logger,
Metrics: newMetrics(prometheus.NewRegistry()),
}
time.Sleep(200 * time.Millisecond)
}
c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*0 replicas (0 blocks, 0 bytes) underreplicated.*`)
+
+ for _, trial := range []struct {
+ uuid string
+ repl int
+ classes []string
+ }{
+ {arvadostest.EmptyCollectionUUID, 0, []string{}},
+ {arvadostest.FooCollection, 2, []string{"default"}}, // "foo" blk
+ {arvadostest.StorageClassesDesiredDefaultConfirmedDefault, 2, []string{"default"}}, // "bar" blk
+ {arvadostest.StorageClassesDesiredArchiveConfirmedDefault, 0, []string{}}, // "bar" blk
+ } {
+ c.Logf("%#v", trial)
+ var coll arvados.Collection
+ s.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+trial.uuid, nil, nil)
+ if c.Check(coll.ReplicationConfirmed, check.NotNil) {
+ c.Check(*coll.ReplicationConfirmed, check.Equals, trial.repl)
+ }
+ c.Check(coll.StorageClassesConfirmed, check.DeepEquals, trial.classes)
+ }
}
"flag"
"fmt"
"io"
+ "net/http"
+ _ "net/http/pprof"
"os"
"git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/health"
+ "github.com/jmoiron/sqlx"
+ _ "github.com/lib/pq"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
"send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
flags.BoolVar(&options.CommitTrash, "commit-trash", false,
"send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
+ flags.BoolVar(&options.CommitConfirmedFields, "commit-confirmed-fields", true,
+ "update collection fields (replicas_confirmed, storage_classes_confirmed, etc.)")
flags.Bool("version", false, "Write version information to stdout and exit 0")
dumpFlag := flags.Bool("dump", false, "dump details for each block to stdout")
+ pprofAddr := flags.String("pprof", "", "serve Go profile data at `[addr]:port`")
+
+ if *pprofAddr != "" {
+ go func() {
+ logrus.Println(http.ListenAndServe(*pprofAddr, nil))
+ }()
+ }
loader := config.NewLoader(os.Stdin, logger)
loader.SetupFlags(flags)
// service.Command
args = nil
dropFlag := map[string]bool{
- "once": true,
- "commit-pulls": true,
- "commit-trash": true,
- "dump": true,
+ "once": true,
+ "commit-pulls": true,
+ "commit-trash": true,
+ "commit-confirmed-fields": true,
+ "dump": true,
}
flags.Visit(func(f *flag.Flag) {
if !dropFlag[f.Name] {
return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
}
+ db, err := sqlx.Open("postgres", cluster.PostgreSQL.Connection.String())
+ if err != nil {
+ return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection failed: %s", err))
+ }
+ if p := cluster.PostgreSQL.ConnectionPool; p > 0 {
+ db.SetMaxOpenConns(p)
+ }
+ err = db.Ping()
+ if err != nil {
+ return service.ErrorHandler(ctx, cluster, fmt.Errorf("postgresql connection succeeded but ping failed: %s", err))
+ }
+
if options.Logger == nil {
options.Logger = ctxlog.FromContext(ctx)
}
Metrics: newMetrics(registry),
Logger: options.Logger,
Dumper: options.Dumper,
+ DB: db,
}
srv.Handler = &health.Handler{
Token: cluster.ManagementToken,
"net/http"
"time"
+ "git.arvados.org/arvados.git/lib/config"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/ghodss/yaml"
check "gopkg.in/check.v1"
)
}
func (s *mainSuite) TestHTTPServer(c *check.C) {
+ arvadostest.StartKeep(2, true)
+
ln, err := net.Listen("tcp", ":0")
if err != nil {
c.Fatal(err)
_, p, err := net.SplitHostPort(ln.Addr().String())
c.Check(err, check.IsNil)
ln.Close()
- config := "Clusters:\n zzzzz:\n ManagementToken: abcdefg\n Services: {Keepbalance: {InternalURLs: {'http://localhost:" + p + "/': {}}}}\n"
+ cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+ c.Assert(err, check.IsNil)
+ cluster, err := cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+ cluster.Services.Keepbalance.InternalURLs[arvados.URL{Host: "localhost:" + p, Path: "/"}] = arvados.ServiceInstance{}
+ cfg.Clusters[cluster.ClusterID] = *cluster
+ config, err := yaml.Marshal(cfg)
+ c.Assert(err, check.IsNil)
var stdout bytes.Buffer
- go runCommand("keep-balance", []string{"-config", "-"}, bytes.NewBufferString(config), &stdout, &stdout)
+ go runCommand("keep-balance", []string{"-config", "-"}, bytes.NewBuffer(config), &stdout, &stdout)
done := make(chan struct{})
go func() {
defer close(done)
c.Fatal(err)
return
}
- req.Header.Set("Authorization", "Bearer abcdefg")
+ req.Header.Set("Authorization", "Bearer "+cluster.ManagementToken)
resp, err := http.DefaultClient.Do(req)
if err != nil {
c.Logf("error %s", err)
c.Log(stdout.String())
c.Fatal("timeout")
}
+ c.Log(stdout.String())
// Check non-metrics URL that gets passed through to us from
// service.Command
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
)
//
// RunOptions fields are controlled by command line flags.
type RunOptions struct {
- Once bool
- CommitPulls bool
- CommitTrash bool
- Logger logrus.FieldLogger
- Dumper logrus.FieldLogger
+ Once bool
+ CommitPulls bool
+ CommitTrash bool
+ CommitConfirmedFields bool
+ Logger logrus.FieldLogger
+ Dumper logrus.FieldLogger
// SafeRendezvousState from the most recent balance operation,
// or "" if unknown. If this changes from one run to the next,
Logger logrus.FieldLogger
Dumper logrus.FieldLogger
+
+ DB *sqlx.DB
}
// CheckHealth implements service.Handler.
func (srv *Server) CheckHealth() error {
- return nil
+ return srv.DB.Ping()
}
// Done implements service.Handler.
func (srv *Server) runOnce() (*Balancer, error) {
bal := &Balancer{
+ DB: srv.DB,
Logger: srv.Logger,
Dumper: srv.Dumper,
Metrics: srv.Metrics,