return exec.Command("echo")
}
- container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
+ container := s.integrationTest(c,
+ func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
[]string(nil),
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
}(squeueCmd)
squeueCmd = newSqueueCmd
- // There should be no queued containers now
+ // There should be one queued container
params := arvadosclient.Dict{
"filters": [][]string{{"state", "=", "Queued"}},
}
theConfig.CrunchRunCommand = []string{"echo"}
- doneProcessing := make(chan struct{})
dispatcher := dispatch.Dispatcher{
- Arv: arv,
- PollInterval: time.Duration(1) * time.Second,
+ Arv: arv,
+ PollPeriod: time.Duration(1) * time.Second,
RunContainer: func(dispatcher *dispatch.Dispatcher,
container arvados.Container,
status chan arvados.Container) {
go runContainer(dispatcher, container)
run(dispatcher, container, status)
- doneProcessing <- struct{}{}
+ dispatcher.Stop()
},
- DoneProcessing: doneProcessing}
+ }
- squeueUpdater.StartMonitor(time.Duration(500) * time.Millisecond)
+ sqCheck = SqueueChecker{Period: 500 * time.Millisecond}
- err = dispatcher.RunDispatcher()
+ err = dispatcher.Run()
c.Assert(err, IsNil)
- squeueUpdater.Done()
+ sqCheck.Stop()
c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
theConfig.CrunchRunCommand = []string{crunchCmd}
- doneProcessing := make(chan struct{})
dispatcher := dispatch.Dispatcher{
- Arv: arv,
- PollInterval: time.Duration(1) * time.Second,
+ Arv: arv,
+ PollPeriod: time.Duration(1) * time.Second,
RunContainer: func(dispatcher *dispatch.Dispatcher,
container arvados.Container,
status chan arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Complete)
}()
run(dispatcher, container, status)
- doneProcessing <- struct{}{}
+ dispatcher.Stop()
},
- DoneProcessing: doneProcessing}
+ }
go func() {
for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
time.Sleep(100 * time.Millisecond)
}
- dispatcher.DoneProcessing <- struct{}{}
+ dispatcher.Stop()
}()
- err := dispatcher.RunDispatcher()
+ err := dispatcher.Run()
c.Assert(err, IsNil)
c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)