10990: Add Seek tests.
[arvados.git] / sdk / go / dispatch / dispatch.go
index 4129b24f94f45e3360f922adc3d694ce55706088..ce960c0772eebe9f28fb314a9248b1f1490d8496 100644 (file)
@@ -39,12 +39,17 @@ type Dispatcher struct {
        RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
 
        // Amount of time to wait between polling for updates.
-       PollInterval time.Duration
+       PollPeriod time.Duration
+
+       // Minimum time between two attempts to run the same container
+       MinRetryPeriod time.Duration
 
        mineMutex sync.Mutex
        mineMap   map[string]chan arvados.Container
        Auth      arvados.APIClientAuthorization
 
+       throttle throttle
+
        stop chan struct{}
 }
 
@@ -110,7 +115,7 @@ func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched m
 }
 
 func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) {
-       ticker := time.NewTicker(dispatcher.PollInterval)
+       ticker := time.NewTicker(dispatcher.PollPeriod)
        defer ticker.Stop()
 
        paramsQ := arvadosclient.Dict{
@@ -171,6 +176,9 @@ func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
        }
 
        if container.State == Queued && container.Priority > 0 {
+               if !dispatcher.throttle.Check(container.UUID) {
+                       return
+               }
                // Try to take the lock
                if err := dispatcher.Lock(container.UUID); err != nil {
                        return
@@ -235,6 +243,7 @@ func (dispatcher *Dispatcher) Run() (err error) {
 
        dispatcher.mineMap = make(map[string]chan arvados.Container)
        dispatcher.stop = make(chan struct{})
+       dispatcher.throttle.hold = dispatcher.MinRetryPeriod
        dispatcher.pollContainers(dispatcher.stop)
        return nil
 }