X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2bc1a7a89597ab02aaeef84b82fdc51f8e375b79..d8e3a67d508e9a5f5c01884259c0e75a140f64e9:/services/keep-balance/balance.go diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index 2617146056..bb590e13b3 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -18,11 +18,13 @@ import ( "sort" "strings" "sync" + "sync/atomic" "syscall" "time" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/keepclient" + "github.com/jmoiron/sqlx" "github.com/sirupsen/logrus" ) @@ -36,6 +38,7 @@ import ( // BlobSignatureTTL; and all N existing replicas of a given data block // are in the N best positions in rendezvous probe order. type Balancer struct { + DB *sqlx.DB Logger logrus.FieldLogger Dumper logrus.FieldLogger Metrics *metrics @@ -50,7 +53,7 @@ type Balancer struct { classes []string mounts int mountsByClass map[string]map[*KeepMount]bool - collScanned int + collScanned int64 serviceRoots map[string]string errors []error stats balancerStats @@ -72,6 +75,9 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp defer bal.time("sweep", "wall clock time to run one full sweep")() + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration())) + defer cancel() + var lbFile *os.File if bal.LostBlocksFile != "" { tmpfn := bal.LostBlocksFile + ".tmp" @@ -112,13 +118,21 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp if err = bal.CheckSanityEarly(client); err != nil { return } + + // On a big site, indexing and sending trash/pull lists can + // take much longer than the usual 5 minute client + // timeout. From here on, we rely on the context deadline + // instead, aborting the entire operation if any part takes + // too long. + client.Timeout = 0 + rs := bal.rendezvousState() if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState { if runOptions.SafeRendezvousState != "" { bal.logf("notice: KeepServices list has changed since last run") } bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run") - if err = bal.ClearTrashLists(client); err != nil { + if err = bal.ClearTrashLists(ctx, client); err != nil { return } // The current rendezvous state becomes "safe" (i.e., @@ -128,13 +142,7 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp nextRunOptions.SafeRendezvousState = rs } - // Indexing and sending trash/pull lists can take a long time - // on a big site. Prefer a long timeout (causing slow recovery - // from undetected network problems) to a short timeout - // (causing starvation via perpetual timeout/restart cycle). - client.Timeout = 24 * time.Hour - - if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil { + if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil { return } bal.ComputeChangeSets() @@ -154,14 +162,23 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp lbFile = nil } if runOptions.CommitPulls { - err = bal.CommitPulls(client) + err = bal.CommitPulls(ctx, client) if err != nil { // Skip trash if we can't pull. (Too cautious?) return } } if runOptions.CommitTrash { - err = bal.CommitTrash(client) + err = bal.CommitTrash(ctx, client) + if err != nil { + return + } + } + if runOptions.CommitConfirmedFields { + err = bal.updateCollections(ctx, client, cluster) + if err != nil { + return + } } return } @@ -294,11 +311,11 @@ func (bal *Balancer) rendezvousState() string { // We avoid this problem if we clear all trash lists before getting // indexes. (We also assume there is only one rebalancing process // running at a time.) -func (bal *Balancer) ClearTrashLists(c *arvados.Client) error { +func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error { for _, srv := range bal.KeepServices { srv.ChangeSet = &ChangeSet{} } - return bal.CommitTrash(c) + return bal.CommitTrash(ctx, c) } // GetCurrentState determines the current replication state, and the @@ -312,8 +329,8 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error { // collection manifests in the database (API server). // // It encodes the resulting information in BlockStateMap. -func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error { - ctx, cancel := context.WithCancel(context.Background()) +func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error { + ctx, cancel := context.WithCancel(ctx) defer cancel() defer bal.time("get_state", "wall clock time to get current state")() @@ -383,39 +400,14 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro }(mounts) } - // collQ buffers incoming collections so we can start fetching - // the next page without waiting for the current page to - // finish processing. collQ := make(chan arvados.Collection, bufs) - // Start a goroutine to process collections. (We could use a - // worker pool here, but even with a single worker we already - // process collections much faster than we can retrieve them.) - wg.Add(1) - go func() { - defer wg.Done() - for coll := range collQ { - err := bal.addCollection(coll) - if err != nil || len(errs) > 0 { - select { - case errs <- err: - default: - } - for range collQ { - } - cancel() - return - } - bal.collScanned++ - } - }() - - // Start a goroutine to retrieve all collections from the - // Arvados database and send them to collQ for processing. + // Retrieve all collections from the database and send them to + // collQ. wg.Add(1) go func() { defer wg.Done() - err = EachCollection(c, pageSize, + err = EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error { collQ <- coll if len(errs) > 0 { @@ -439,6 +431,27 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro } }() + // Parse manifests from collQ and pass the block hashes to + // BlockStateMap to track desired replication. + for i := 0; i < runtime.NumCPU(); i++ { + wg.Add(1) + go func() { + defer wg.Done() + for coll := range collQ { + err := bal.addCollection(coll) + if err != nil || len(errs) > 0 { + select { + case errs <- err: + default: + } + cancel() + continue + } + atomic.AddInt64(&bal.collScanned, 1) + } + }() + } + wg.Wait() if len(errs) > 0 { return <-errs @@ -455,7 +468,7 @@ func (bal *Balancer) addCollection(coll arvados.Collection) error { if coll.ReplicationDesired != nil { repl = *coll.ReplicationDesired } - bal.Logger.Debugf("%v: %d block x%d", coll.UUID, len(blkids), repl) + bal.Logger.Debugf("%v: %d blocks x%d", coll.UUID, len(blkids), repl) // Pass pdh to IncreaseDesired only if LostBlocksFile is being // written -- otherwise it's just a waste of memory. pdh := "" @@ -525,10 +538,6 @@ func (bal *Balancer) setupLookupTables() { // effectively read-only. mnt.ReadOnly = mnt.ReadOnly || srv.ReadOnly - if len(mnt.StorageClasses) == 0 { - bal.mountsByClass["default"][mnt] = true - continue - } for class := range mnt.StorageClasses { if mbc := bal.mountsByClass[class]; mbc == nil { bal.classes = append(bal.classes, class) @@ -1098,22 +1107,22 @@ func (bal *Balancer) CheckSanityLate() error { // keepstore servers. This has the effect of increasing replication of // existing blocks that are either underreplicated or poorly // distributed according to rendezvous hashing. -func (bal *Balancer) CommitPulls(c *arvados.Client) error { +func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error { defer bal.time("send_pull_lists", "wall clock time to send pull lists")() return bal.commitAsync(c, "send pull list", func(srv *KeepService) error { - return srv.CommitPulls(c) + return srv.CommitPulls(ctx, c) }) } // CommitTrash sends the computed lists of trash requests to the // keepstore servers. This has the effect of deleting blocks that are // overreplicated or unreferenced. -func (bal *Balancer) CommitTrash(c *arvados.Client) error { +func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error { defer bal.time("send_trash_lists", "wall clock time to send trash lists")() return bal.commitAsync(c, "send trash list", func(srv *KeepService) error { - return srv.CommitTrash(c) + return srv.CommitTrash(ctx, c) }) }