X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6d95130da47af9fd0290d3c8f80a0364faf74957..b96d5caa0056472fe67b82bd5305448d85c7d0cd:/services/keep-balance/balance.go diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index 3c35d304cb..86423a2976 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -6,6 +6,7 @@ package main import ( "bytes" + "context" "crypto/md5" "fmt" "io" @@ -71,6 +72,9 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp defer bal.time("sweep", "wall clock time to run one full sweep")() + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration())) + defer cancel() + var lbFile *os.File if bal.LostBlocksFile != "" { tmpfn := bal.LostBlocksFile + ".tmp" @@ -111,13 +115,21 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp if err = bal.CheckSanityEarly(client); err != nil { return } + + // On a big site, indexing and sending trash/pull lists can + // take much longer than the usual 5 minute client + // timeout. From here on, we rely on the context deadline + // instead, aborting the entire operation if any part takes + // too long. + client.Timeout = 0 + rs := bal.rendezvousState() if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState { if runOptions.SafeRendezvousState != "" { bal.logf("notice: KeepServices list has changed since last run") } bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run") - if err = bal.ClearTrashLists(client); err != nil { + if err = bal.ClearTrashLists(ctx, client); err != nil { return } // The current rendezvous state becomes "safe" (i.e., @@ -126,7 +138,8 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp // succeed in clearing existing trash lists. nextRunOptions.SafeRendezvousState = rs } - if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil { + + if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil { return } bal.ComputeChangeSets() @@ -146,14 +159,14 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp lbFile = nil } if runOptions.CommitPulls { - err = bal.CommitPulls(client) + err = bal.CommitPulls(ctx, client) if err != nil { // Skip trash if we can't pull. (Too cautious?) return } } if runOptions.CommitTrash { - err = bal.CommitTrash(client) + err = bal.CommitTrash(ctx, client) } return } @@ -286,11 +299,11 @@ func (bal *Balancer) rendezvousState() string { // We avoid this problem if we clear all trash lists before getting // indexes. (We also assume there is only one rebalancing process // running at a time.) -func (bal *Balancer) ClearTrashLists(c *arvados.Client) error { +func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error { for _, srv := range bal.KeepServices { srv.ChangeSet = &ChangeSet{} } - return bal.CommitTrash(c) + return bal.CommitTrash(ctx, c) } // GetCurrentState determines the current replication state, and the @@ -304,7 +317,10 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error { // collection manifests in the database (API server). // // It encodes the resulting information in BlockStateMap. -func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error { +func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + defer bal.time("get_state", "wall clock time to get current state")() bal.BlockStateMap = NewBlockStateMap() @@ -348,12 +364,13 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro go func(mounts []*KeepMount) { defer wg.Done() bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService) - idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "") + idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "") if err != nil { select { case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err): default: } + cancel() return } if len(errs) > 0 { @@ -391,6 +408,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro } for range collQ { } + cancel() return } bal.collScanned++ @@ -402,7 +420,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro wg.Add(1) go func() { defer wg.Done() - err = EachCollection(c, pageSize, + err = EachCollection(ctx, c, pageSize, func(coll arvados.Collection) error { collQ <- coll if len(errs) > 0 { @@ -422,6 +440,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro case errs <- err: default: } + cancel() } }() @@ -1084,22 +1103,22 @@ func (bal *Balancer) CheckSanityLate() error { // keepstore servers. This has the effect of increasing replication of // existing blocks that are either underreplicated or poorly // distributed according to rendezvous hashing. -func (bal *Balancer) CommitPulls(c *arvados.Client) error { +func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error { defer bal.time("send_pull_lists", "wall clock time to send pull lists")() return bal.commitAsync(c, "send pull list", func(srv *KeepService) error { - return srv.CommitPulls(c) + return srv.CommitPulls(ctx, c) }) } // CommitTrash sends the computed lists of trash requests to the // keepstore servers. This has the effect of deleting blocks that are // overreplicated or unreferenced. -func (bal *Balancer) CommitTrash(c *arvados.Client) error { +func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error { defer bal.time("send_trash_lists", "wall clock time to send trash lists")() return bal.commitAsync(c, "send trash list", func(srv *KeepService) error { - return srv.CommitTrash(c) + return srv.CommitTrash(ctx, c) }) }