RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
// Amount of time to wait between polling for updates.
- PollInterval time.Duration
+ PollPeriod time.Duration
+
+ // Minimum time between two attempts to run the same container
+ MinRetryPeriod time.Duration
mineMutex sync.Mutex
mineMap map[string]chan arvados.Container
Auth arvados.APIClientAuthorization
+ throttle throttle
+
stop chan struct{}
}
}
func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) {
- ticker := time.NewTicker(dispatcher.PollInterval)
+ ticker := time.NewTicker(dispatcher.PollPeriod)
defer ticker.Stop()
paramsQ := arvadosclient.Dict{
}
if container.State == Queued && container.Priority > 0 {
+ if !dispatcher.throttle.Check(container.UUID) {
+ return
+ }
// Try to take the lock
if err := dispatcher.Lock(container.UUID); err != nil {
return
dispatcher.mineMap = make(map[string]chan arvados.Container)
dispatcher.stop = make(chan struct{})
+ dispatcher.throttle.hold = dispatcher.MinRetryPeriod
dispatcher.pollContainers(dispatcher.stop)
return nil
}