X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/224f384d411bb1b4cccc7165c55bb64fd5c695ad..1bba7f8fb361186ad040b521d168a73abd8fdd65:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index 4987c01055..ce960c0772 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -7,10 +7,7 @@ import ( "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "log" - "os" - "os/signal" "sync" - "syscall" "time" ) @@ -42,15 +39,18 @@ type Dispatcher struct { RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container) // Amount of time to wait between polling for updates. - PollInterval time.Duration + PollPeriod time.Duration - // Channel used to signal that RunDispatcher loop should exit. - DoneProcessing chan struct{} + // Minimum time between two attempts to run the same container + MinRetryPeriod time.Duration - mineMutex sync.Mutex - mineMap map[string]chan arvados.Container - Auth arvados.APIClientAuthorization - containers chan arvados.Container + mineMutex sync.Mutex + mineMap map[string]chan arvados.Container + Auth arvados.APIClientAuthorization + + throttle throttle + + stop chan struct{} } // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones @@ -110,12 +110,13 @@ func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched m } for _, container := range containers.Items { touched[container.UUID] = true - dispatcher.containers <- container + dispatcher.handleUpdate(container) } } -func (dispatcher *Dispatcher) pollContainers() { - ticker := time.NewTicker(dispatcher.PollInterval) +func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) { + ticker := time.NewTicker(dispatcher.PollPeriod) + defer ticker.Stop() paramsQ := arvadosclient.Dict{ "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}}, @@ -126,26 +127,24 @@ func (dispatcher *Dispatcher) pollContainers() { "limit": "1000"} for { + touched := make(map[string]bool) + dispatcher.getContainers(paramsQ, touched) + dispatcher.getContainers(paramsP, touched) + dispatcher.mineMutex.Lock() + var monitored []string + for k := range dispatcher.mineMap { + if _, ok := touched[k]; !ok { + monitored = append(monitored, k) + } + } + dispatcher.mineMutex.Unlock() + if monitored != nil { + dispatcher.getContainers(arvadosclient.Dict{ + "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched) + } select { case <-ticker.C: - touched := make(map[string]bool) - dispatcher.getContainers(paramsQ, touched) - dispatcher.getContainers(paramsP, touched) - dispatcher.mineMutex.Lock() - var monitored []string - for k := range dispatcher.mineMap { - if _, ok := touched[k]; !ok { - monitored = append(monitored, k) - } - } - dispatcher.mineMutex.Unlock() - if monitored != nil { - dispatcher.getContainers(arvadosclient.Dict{ - "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched) - } - case <-dispatcher.DoneProcessing: - close(dispatcher.containers) - ticker.Stop() + case <-stop: return } } @@ -177,6 +176,9 @@ func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) { } if container.State == Queued && container.Priority > 0 { + if !dispatcher.throttle.Check(container.UUID) { + return + } // Try to take the lock if err := dispatcher.Lock(container.UUID); err != nil { return @@ -221,10 +223,18 @@ func (dispatcher *Dispatcher) Unlock(uuid string) error { return err } -// RunDispatcher runs the main loop of the dispatcher until receiving a message -// on the dispatcher.DoneProcessing channel. It also installs a signal handler -// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT. -func (dispatcher *Dispatcher) RunDispatcher() (err error) { +// Stop causes Run to return after the current polling cycle. +func (dispatcher *Dispatcher) Stop() { + if dispatcher.stop == nil { + // already stopped + return + } + close(dispatcher.stop) + dispatcher.stop = nil +} + +// Run runs the main loop of the dispatcher. +func (dispatcher *Dispatcher) Run() (err error) { err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth) if err != nil { log.Printf("Error getting my token UUID: %v", err) @@ -232,26 +242,8 @@ func (dispatcher *Dispatcher) RunDispatcher() (err error) { } dispatcher.mineMap = make(map[string]chan arvados.Container) - dispatcher.containers = make(chan arvados.Container) - - // Graceful shutdown on signal - sigChan := make(chan os.Signal) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) - - go func(sig <-chan os.Signal) { - for sig := range sig { - log.Printf("Caught signal: %v", sig) - dispatcher.DoneProcessing <- struct{}{} - } - }(sigChan) - - defer close(sigChan) - defer signal.Stop(sigChan) - - go dispatcher.pollContainers() - for container := range dispatcher.containers { - dispatcher.handleUpdate(container) - } - + dispatcher.stop = make(chan struct{}) + dispatcher.throttle.hold = dispatcher.MinRetryPeriod + dispatcher.pollContainers(dispatcher.stop) return nil }