defer bal.time("sweep", "wall clock time to run one full sweep")()
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
+ defer cancel()
+
var lbFile *os.File
if bal.LostBlocksFile != "" {
tmpfn := bal.LostBlocksFile + ".tmp"
if err = bal.CheckSanityEarly(client); err != nil {
return
}
+
+ // On a big site, indexing and sending trash/pull lists can
+ // take much longer than the usual 5 minute client
+ // timeout. From here on, we rely on the context deadline
+ // instead, aborting the entire operation if any part takes
+ // too long.
+ client.Timeout = 0
+
rs := bal.rendezvousState()
if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
if runOptions.SafeRendezvousState != "" {
bal.logf("notice: KeepServices list has changed since last run")
}
bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
- if err = bal.ClearTrashLists(client); err != nil {
+ if err = bal.ClearTrashLists(ctx, client); err != nil {
return
}
// The current rendezvous state becomes "safe" (i.e.,
nextRunOptions.SafeRendezvousState = rs
}
- // Indexing and sending trash/pull lists can take a long time
- // on a big site. Prefer a long timeout (causing slow recovery
- // from undetected network problems) to a short timeout
- // (causing starvation via perpetual timeout/restart cycle).
- client.Timeout = 24 * time.Hour
-
- if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
+ if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
return
}
bal.ComputeChangeSets()
lbFile = nil
}
if runOptions.CommitPulls {
- err = bal.CommitPulls(client)
+ err = bal.CommitPulls(ctx, client)
if err != nil {
// Skip trash if we can't pull. (Too cautious?)
return
}
}
if runOptions.CommitTrash {
- err = bal.CommitTrash(client)
+ err = bal.CommitTrash(ctx, client)
}
return
}
// We avoid this problem if we clear all trash lists before getting
// indexes. (We also assume there is only one rebalancing process
// running at a time.)
-func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
+func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
for _, srv := range bal.KeepServices {
srv.ChangeSet = &ChangeSet{}
}
- return bal.CommitTrash(c)
+ return bal.CommitTrash(ctx, c)
}
// GetCurrentState determines the current replication state, and the
// collection manifests in the database (API server).
//
// It encodes the resulting information in BlockStateMap.
-func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
- ctx, cancel := context.WithCancel(context.Background())
+func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
+ ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer bal.time("get_state", "wall clock time to get current state")()
wg.Add(1)
go func() {
defer wg.Done()
- err = EachCollection(c, pageSize,
+ err = EachCollection(ctx, c, pageSize,
func(coll arvados.Collection) error {
collQ <- coll
if len(errs) > 0 {
// keepstore servers. This has the effect of increasing replication of
// existing blocks that are either underreplicated or poorly
// distributed according to rendezvous hashing.
-func (bal *Balancer) CommitPulls(c *arvados.Client) error {
+func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
return bal.commitAsync(c, "send pull list",
func(srv *KeepService) error {
- return srv.CommitPulls(c)
+ return srv.CommitPulls(ctx, c)
})
}
// CommitTrash sends the computed lists of trash requests to the
// keepstore servers. This has the effect of deleting blocks that are
// overreplicated or unreferenced.
-func (bal *Balancer) CommitTrash(c *arvados.Client) error {
+func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
return bal.commitAsync(c, "send trash list",
func(srv *KeepService) error {
- return srv.CommitTrash(c)
+ return srv.CommitTrash(ctx, c)
})
}