return
}
}
- if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize); err != nil {
+ if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
return
}
bal.ComputeChangeSets()
// collection manifests in the database (API server).
//
// It encodes the resulting information in BlockStateMap.
-func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize int) error {
+func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
defer timeMe(bal.Logger, "GetCurrentState")()
bal.BlockStateMap = NewBlockStateMap()
// collQ buffers incoming collections so we can start fetching
// the next page without waiting for the current page to
// finish processing.
- collQ := make(chan arvados.Collection, 1000)
+ collQ := make(chan arvados.Collection, bufs)
// Start a goroutine to process collections. (We could use a
// worker pool here, but even with a single worker we already
// Number of collections to request in each API call
CollectionBatchSize int
+
+ // Max collections to buffer in memory (bigger values consume
+ // more memory, but can reduce store-and-forward latency when
+ // fetching pages)
+ CollectionBuffers int
}
// RunOptions controls runtime behavior. The flags/options that belong