Logger Logger
+ // Batch size for container queries
+ BatchSize int64
+
// Queue polling frequency
PollPeriod time.Duration
poll := time.NewTicker(d.PollPeriod)
defer poll.Stop()
+ if d.BatchSize == 0 {
+ d.BatchSize = 100
+ }
+
for {
select {
case <-poll.C:
}
func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
+ var countList arvados.ContainerList
params := arvadosclient.Dict{
"filters": filters,
+ "count": "exact",
+ "limit": 0,
+ "order": []string{"priority desc"}}
+ err := d.Arv.List("containers", params, &countList)
+ if err != nil {
+ d.Logger.Warnf("error getting count of containers: %q", err)
+ return false
+ }
+ itemsAvailable := countList.ItemsAvailable
+ params = arvadosclient.Dict{
+ "filters": filters,
+ "count": "none",
+ "limit": d.BatchSize,
"order": []string{"priority desc"}}
offset := 0
for {
}
d.checkListForUpdates(list.Items, todo)
offset += len(list.Items)
- if len(list.Items) == 0 || list.ItemsAvailable <= offset {
+ if len(list.Items) == 0 || itemsAvailable <= offset {
return true
}
}
apiStubResponses := make(map[string]arvadostest.StubResponse)
apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
- testWithServerStub(c, apiStubResponses, "echo", "error getting list of containers")
+ testWithServerStub(c, apiStubResponses, "echo", "error getting count of containers")
}
func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
// Minimum time between two attempts to run the same container
MinRetryPeriod arvados.Duration
+
+ // Batch size for container queries
+ BatchSize int64
}
func main() {
disp.Dispatcher = &dispatch.Dispatcher{
Arv: arv,
Logger: disp.logger,
+ BatchSize: disp.BatchSize,
RunContainer: disp.runContainer,
PollPeriod: time.Duration(disp.PollPeriod),
MinRetryPeriod: time.Duration(disp.MinRetryPeriod),
apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
- s.testWithServerStub(c, apiStubResponses, "echo", "error getting list of containers")
+ s.testWithServerStub(c, apiStubResponses, "echo", "error getting count of containers")
}
func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
"PollPeriod": "10s",
"SbatchArguments": ["--partition=foo", "--exclude=node13"],
"ReserveExtraRAM": 268435456,
+ "BatchSize": 10000
}`)
func usage(fs *flag.FlagSet) {