import (
"bytes"
+ "context"
"crypto/md5"
"fmt"
"io"
// succeed in clearing existing trash lists.
nextRunOptions.SafeRendezvousState = rs
}
+
+ // Indexing and sending trash/pull lists can take a long time
+ // on a big site. Prefer a long timeout (causing slow recovery
+ // from undetected network problems) to a short timeout
+ // (causing starvation via perpetual timeout/restart cycle).
+ client.Timeout = 24 * time.Hour
+
if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
return
}
//
// It encodes the resulting information in BlockStateMap.
func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
defer bal.time("get_state", "wall clock time to get current state")()
bal.BlockStateMap = NewBlockStateMap()
go func(mounts []*KeepMount) {
defer wg.Done()
bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
- idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
+ idx, err := mounts[0].KeepService.IndexMount(ctx, c, mounts[0].UUID, "")
if err != nil {
select {
case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err):
default:
}
+ cancel()
return
}
if len(errs) > 0 {
}
for range collQ {
}
+ cancel()
return
}
bal.collScanned++
case errs <- err:
default:
}
+ cancel()
}
}()