X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2efdb0e205abe63c5ca777fc4dacca65192e5dd3..1bba7f8fb361186ad040b521d168a73abd8fdd65:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index fb7b5fb799..ce960c0772 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -4,52 +4,25 @@ package dispatch import ( + "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "log" - "os" - "os/signal" "sync" - "syscall" "time" ) -// Constants for container states const ( - Queued = "Queued" - Locked = "Locked" - Running = "Running" - Complete = "Complete" - Cancelled = "Cancelled" + Queued = arvados.ContainerStateQueued + Locked = arvados.ContainerStateLocked + Running = arvados.ContainerStateRunning + Complete = arvados.ContainerStateComplete + Cancelled = arvados.ContainerStateCancelled ) -type apiClientAuthorization struct { - UUID string `json:"uuid"` - APIToken string `json:"api_token"` -} - -type apiClientAuthorizationList struct { - Items []apiClientAuthorization `json:"items"` -} - -// Represents an Arvados container record -type Container struct { - UUID string `json:"uuid"` - State string `json:"state"` - Priority int `json:"priority"` - RuntimeConstraints map[string]int64 `json:"runtime_constraints"` - LockedByUUID string `json:"locked_by_uuid"` -} - -// ContainerList is a list of the containers from api -type ContainerList struct { - Items []Container `json:"items"` - ItemsAvailable int `json:"items_available"` -} - // Dispatcher holds the state of the dispatcher type Dispatcher struct { // The Arvados client - Arv arvadosclient.ArvadosClient + Arv *arvadosclient.ArvadosClient // When a new queued container appears and is either already owned by // this dispatcher or is successfully locked, the dispatcher will call @@ -63,31 +36,34 @@ type Dispatcher struct { // handled by this dispatcher and the goroutine should terminate. The // goroutine is responsible for draining the 'status' channel, failure // to do so may deadlock the dispatcher. - RunContainer func(*Dispatcher, Container, chan Container) + RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container) // Amount of time to wait between polling for updates. - PollInterval time.Duration + PollPeriod time.Duration + + // Minimum time between two attempts to run the same container + MinRetryPeriod time.Duration - // Channel used to signal that RunDispatcher loop should exit. - DoneProcessing chan struct{} + mineMutex sync.Mutex + mineMap map[string]chan arvados.Container + Auth arvados.APIClientAuthorization - mineMutex sync.Mutex - mineMap map[string]chan Container - Auth apiClientAuthorization - containers chan Container + throttle throttle + + stop chan struct{} } // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones // for which this process is actively starting/monitoring. Returns channel to // be used to send container status updates. -func (dispatcher *Dispatcher) setMine(uuid string) chan Container { +func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container { dispatcher.mineMutex.Lock() defer dispatcher.mineMutex.Unlock() if ch, ok := dispatcher.mineMap[uuid]; ok { return ch } - ch := make(chan Container) + ch := make(chan arvados.Container) dispatcher.mineMap[uuid] = ch return ch } @@ -102,22 +78,24 @@ func (dispatcher *Dispatcher) notMine(uuid string) { } } -// Check if there is a channel for updates associated with this container. If -// so send the container record on the channel and return true, if not return -// false. -func (dispatcher *Dispatcher) updateMine(c Container) bool { +// checkMine returns true if there is a channel for updates associated +// with container c. If update is true, also send the container record on +// the channel. +func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool { dispatcher.mineMutex.Lock() defer dispatcher.mineMutex.Unlock() ch, ok := dispatcher.mineMap[c.UUID] if ok { - ch <- c + if update { + ch <- c + } return true } return false } func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) { - var containers ContainerList + var containers arvados.ContainerList err := dispatcher.Arv.List("containers", params, &containers) if err != nil { log.Printf("Error getting list of containers: %q", err) @@ -132,12 +110,13 @@ func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched m } for _, container := range containers.Items { touched[container.UUID] = true - dispatcher.containers <- container + dispatcher.handleUpdate(container) } } -func (dispatcher *Dispatcher) pollContainers() { - ticker := time.NewTicker(dispatcher.PollInterval) +func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) { + ticker := time.NewTicker(dispatcher.PollPeriod) + defer ticker.Stop() paramsQ := arvadosclient.Dict{ "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}}, @@ -148,32 +127,38 @@ func (dispatcher *Dispatcher) pollContainers() { "limit": "1000"} for { + touched := make(map[string]bool) + dispatcher.getContainers(paramsQ, touched) + dispatcher.getContainers(paramsP, touched) + dispatcher.mineMutex.Lock() + var monitored []string + for k := range dispatcher.mineMap { + if _, ok := touched[k]; !ok { + monitored = append(monitored, k) + } + } + dispatcher.mineMutex.Unlock() + if monitored != nil { + dispatcher.getContainers(arvadosclient.Dict{ + "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched) + } select { case <-ticker.C: - touched := make(map[string]bool) - dispatcher.getContainers(paramsQ, touched) - dispatcher.getContainers(paramsP, touched) - dispatcher.mineMutex.Lock() - var monitored []string - for k := range dispatcher.mineMap { - if _, ok := touched[k]; !ok { - monitored = append(monitored, k) - } - } - dispatcher.mineMutex.Unlock() - if monitored != nil { - dispatcher.getContainers(arvadosclient.Dict{ - "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched) - } - case <-dispatcher.DoneProcessing: - close(dispatcher.containers) - ticker.Stop() + case <-stop: return } } } -func (dispatcher *Dispatcher) handleUpdate(container Container) { +func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) { + if container.State == Queued && dispatcher.checkMine(container, false) { + // If we previously started the job, something failed, and it + // was re-queued, this dispatcher might still be monitoring it. + // Stop the existing monitor, then try to lock and run it + // again. + dispatcher.notMine(container.UUID) + } + if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued { // If container is Complete, Cancelled, or Queued, LockedByUUID // will be nil. If the container was formerly Locked, moved @@ -185,14 +170,17 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) { return } - if dispatcher.updateMine(container) { + if dispatcher.checkMine(container, true) { // Already monitored, sent status update return } - if container.State == Queued { + if container.State == Queued && container.Priority > 0 { + if !dispatcher.throttle.Check(container.UUID) { + return + } // Try to take the lock - if err := dispatcher.UpdateState(container.UUID, Locked); err != nil { + if err := dispatcher.Lock(container.UUID); err != nil { return } container.State = Locked @@ -206,7 +194,7 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) { } // UpdateState makes an API call to change the state of a container. -func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error { +func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error { err := dispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{ "container": arvadosclient.Dict{"state": newState}}, @@ -217,37 +205,45 @@ func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error { return err } -// RunDispatcher runs the main loop of the dispatcher until receiving a message -// on the dispatcher.DoneProcessing channel. It also installs a signal handler -// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT. -func (dispatcher *Dispatcher) RunDispatcher() (err error) { - err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth) +// Lock makes the lock API call which updates the state of a container to Locked. +func (dispatcher *Dispatcher) Lock(uuid string) error { + err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil) if err != nil { - log.Printf("Error getting my token UUID: %v", err) - return + log.Printf("Error locking container %s: %q", uuid, err) } + return err +} - dispatcher.mineMap = make(map[string]chan Container) - dispatcher.containers = make(chan Container) - - // Graceful shutdown on signal - sigChan := make(chan os.Signal) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) - - go func(sig <-chan os.Signal) { - for sig := range sig { - log.Printf("Caught signal: %v", sig) - dispatcher.DoneProcessing <- struct{}{} - } - }(sigChan) +// Unlock makes the unlock API call which updates the state of a container to Queued. +func (dispatcher *Dispatcher) Unlock(uuid string) error { + err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil) + if err != nil { + log.Printf("Error unlocking container %s: %q", uuid, err) + } + return err +} - defer close(sigChan) - defer signal.Stop(sigChan) +// Stop causes Run to return after the current polling cycle. +func (dispatcher *Dispatcher) Stop() { + if dispatcher.stop == nil { + // already stopped + return + } + close(dispatcher.stop) + dispatcher.stop = nil +} - go dispatcher.pollContainers() - for container := range dispatcher.containers { - dispatcher.handleUpdate(container) +// Run runs the main loop of the dispatcher. +func (dispatcher *Dispatcher) Run() (err error) { + err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth) + if err != nil { + log.Printf("Error getting my token UUID: %v", err) + return } + dispatcher.mineMap = make(map[string]chan arvados.Container) + dispatcher.stop = make(chan struct{}) + dispatcher.throttle.hold = dispatcher.MinRetryPeriod + dispatcher.pollContainers(dispatcher.stop) return nil }