X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/cb4efac6793d18892dde09c631895cb98c3df470..3c848757efb9b1697962cb51068c87658565908b:/lib/dispatchcloud/container/queue.go diff --git a/lib/dispatchcloud/container/queue.go b/lib/dispatchcloud/container/queue.go index af17aaf392..a4a270dd10 100644 --- a/lib/dispatchcloud/container/queue.go +++ b/lib/dispatchcloud/container/queue.go @@ -10,7 +10,7 @@ import ( "sync" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -133,7 +133,7 @@ func (cq *Queue) Forget(uuid string) { cq.mtx.Lock() defer cq.mtx.Unlock() ctr := cq.current[uuid].Container - if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled { + if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled || (ctr.State == arvados.ContainerStateQueued && ctr.Priority == 0) { cq.delEnt(uuid, ctr.State) } } @@ -240,19 +240,15 @@ func (cq *Queue) addEnt(uuid string, ctr arvados.Container) { go func() { if ctr.State == arvados.ContainerStateQueued { // Can't set runtime error without - // locking first. If Lock() is - // successful, it will call addEnt() - // again itself, and we'll fall - // through to the - // setRuntimeError/Cancel code below. + // locking first. err := cq.Lock(ctr.UUID) if err != nil { logger.WithError(err).Warn("lock failed") + return // ...and try again on the // next Update, if the problem // still exists. } - return } var err error defer func() { @@ -314,15 +310,14 @@ func (cq *Queue) setRuntimeError(uuid, errorString string) error { // Cancel cancels the given container. func (cq *Queue) Cancel(uuid string) error { - err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{ + var resp arvados.Container + err := cq.client.RequestAndDecode(&resp, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{ "container": {"state": arvados.ContainerStateCancelled}, }) if err != nil { return err } - cq.mtx.Lock() - defer cq.mtx.Unlock() - cq.notify() + cq.updateWithResp(uuid, resp) return nil } @@ -332,20 +327,32 @@ func (cq *Queue) apiUpdate(uuid, action string) error { if err != nil { return err } + cq.updateWithResp(uuid, resp) + return nil +} +// Update the local queue with the response received from a +// state-changing API request (lock/unlock/cancel). +func (cq *Queue) updateWithResp(uuid string, resp arvados.Container) { cq.mtx.Lock() defer cq.mtx.Unlock() if cq.dontupdate != nil { cq.dontupdate[uuid] = struct{}{} } - if ent, ok := cq.current[uuid]; !ok { - cq.addEnt(uuid, resp) - } else { - ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID - cq.current[uuid] = ent + ent, ok := cq.current[uuid] + if !ok { + // Container is not in queue (e.g., it was not added + // because there is no suitable instance type, and + // we're just locking/updating it in order to set an + // error message). No need to add it, and we don't + // necessarily have enough information to add it here + // anyway because lock/unlock responses don't include + // runtime_constraints. + return } + ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID + cq.current[uuid] = ent cq.notify() - return nil } func (cq *Queue) poll() (map[string]*arvados.Container, error) {