X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/7337b18bf7b6996a7fe4df0aba5356a03bda452d..8d660b2b276475f1bd5b56614a28a33a358dd038:/lib/dispatchcloud/container/queue.go diff --git a/lib/dispatchcloud/container/queue.go b/lib/dispatchcloud/container/queue.go index 965407e518..4e807a12ab 100644 --- a/lib/dispatchcloud/container/queue.go +++ b/lib/dispatchcloud/container/queue.go @@ -5,6 +5,7 @@ package container import ( + "errors" "io" "sync" "time" @@ -27,8 +28,8 @@ type APIClient interface { type QueueEnt struct { // The container to run. Only the UUID, State, Priority, and // RuntimeConstraints fields are populated. - Container arvados.Container - InstanceType arvados.InstanceType + Container arvados.Container `json:"container"` + InstanceType arvados.InstanceType `json:"instance_type"` } // String implements fmt.Stringer by returning the queued container's @@ -131,7 +132,7 @@ func (cq *Queue) Forget(uuid string) { defer cq.mtx.Unlock() ctr := cq.current[uuid].Container if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled { - delete(cq.current, uuid) + cq.delEnt(uuid, ctr.State) } } @@ -184,7 +185,9 @@ func (cq *Queue) Update() error { cq.mtx.Lock() defer cq.mtx.Unlock() for uuid, ctr := range next { - if _, keep := cq.dontupdate[uuid]; keep { + if _, dontupdate := cq.dontupdate[uuid]; dontupdate { + // Don't clobber a local update that happened + // after we started polling. continue } if cur, ok := cq.current[uuid]; !ok { @@ -194,13 +197,18 @@ func (cq *Queue) Update() error { cq.current[uuid] = cur } } - for uuid := range cq.current { - if _, keep := cq.dontupdate[uuid]; keep { - continue - } else if _, keep = next[uuid]; keep { + for uuid, ent := range cq.current { + if _, dontupdate := cq.dontupdate[uuid]; dontupdate { + // Don't expunge an entry that was + // added/updated locally after we started + // polling. continue - } else { - delete(cq.current, uuid) + } else if _, stillpresent := next[uuid]; !stillpresent { + // Expunge an entry that no longer appears in + // the poll response (evidently it's + // cancelled, completed, deleted, or taken by + // a different dispatcher). + cq.delEnt(uuid, ent.Container.State) } } cq.dontupdate = nil @@ -209,13 +217,74 @@ func (cq *Queue) Update() error { return nil } +// Caller must have lock. +func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) { + cq.logger.WithFields(logrus.Fields{ + "ContainerUUID": uuid, + "State": state, + }).Info("dropping container from queue") + delete(cq.current, uuid) +} + func (cq *Queue) addEnt(uuid string, ctr arvados.Container) { it, err := cq.chooseType(&ctr) - if err != nil { - // FIXME: throttle warnings, cancel after timeout - cq.logger.Warnf("cannot run %s", &ctr) + if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) { + // We assume here that any chooseType error is a hard + // error: it wouldn't help to try again, or to leave + // it for a different dispatcher process to attempt. + errorString := err.Error() + logger := cq.logger.WithField("ContainerUUID", ctr.UUID) + logger.WithError(err).Warn("cancel container with no suitable instance type") + go func() { + if ctr.State == arvados.ContainerStateQueued { + // Can't set runtime error without + // locking first. If Lock() is + // successful, it will call addEnt() + // again itself, and we'll fall + // through to the + // setRuntimeError/Cancel code below. + err := cq.Lock(ctr.UUID) + if err != nil { + logger.WithError(err).Warn("lock failed") + // ...and try again on the + // next Update, if the problem + // still exists. + } + return + } + var err error + defer func() { + if err == nil { + return + } + // On failure, check current container + // state, and don't log the error if + // the failure came from losing a + // race. + var latest arvados.Container + cq.client.RequestAndDecode(&latest, "GET", "arvados/v1/containers/"+ctr.UUID, nil, map[string][]string{"select": {"state"}}) + if latest.State == arvados.ContainerStateCancelled { + return + } + logger.WithError(err).Warn("error while trying to cancel unsatisfiable container") + }() + err = cq.setRuntimeError(ctr.UUID, errorString) + if err != nil { + return + } + err = cq.Cancel(ctr.UUID) + if err != nil { + return + } + }() return } + cq.logger.WithFields(logrus.Fields{ + "ContainerUUID": ctr.UUID, + "State": ctr.State, + "Priority": ctr.Priority, + "InstanceType": it.Name, + }).Info("adding container to queue") cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it} } @@ -229,6 +298,18 @@ func (cq *Queue) Unlock(uuid string) error { return cq.apiUpdate(uuid, "unlock") } +// setRuntimeError sets runtime_status["error"] to the given value. +// Container should already have state==Locked or Running. +func (cq *Queue) setRuntimeError(uuid, errorString string) error { + return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]map[string]interface{}{ + "container": { + "runtime_status": { + "error": errorString, + }, + }, + }) +} + // Cancel cancels the given container. func (cq *Queue) Cancel(uuid string) error { err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{ @@ -318,32 +399,62 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) { } apply(avail) - var missing []string + missing := map[string]bool{} cq.mtx.Lock() for uuid, ent := range cq.current { if next[uuid] == nil && ent.Container.State != arvados.ContainerStateCancelled && ent.Container.State != arvados.ContainerStateComplete { - missing = append(missing, uuid) + missing[uuid] = true } } cq.mtx.Unlock() - for i, page := 0, 20; i < len(missing); i += page { - batch := missing[i:] - if len(batch) > page { - batch = batch[:page] + for len(missing) > 0 { + var batch []string + for uuid := range missing { + batch = append(batch, uuid) + if len(batch) == 20 { + break + } } + filters := []arvados.Filter{{"uuid", "in", batch}} ended, err := cq.fetchAll(arvados.ResourceListParams{ Select: selectParam, Order: "uuid", Count: "none", - Filters: []arvados.Filter{{"uuid", "in", batch}}, + Filters: filters, }) if err != nil { return nil, err } apply(ended) + if len(ended) == 0 { + // This is the only case where we can conclude + // a container has been deleted from the + // database. A short (but non-zero) page, on + // the other hand, can be caused by a response + // size limit. + for _, uuid := range batch { + cq.logger.WithField("ContainerUUID", uuid).Warn("container not found by controller (deleted?)") + delete(missing, uuid) + cq.mtx.Lock() + cq.delEnt(uuid, cq.current[uuid].Container.State) + cq.mtx.Unlock() + } + continue + } + for _, ctr := range ended { + if _, ok := missing[ctr.UUID]; !ok { + msg := "BUG? server response did not match requested filters, erroring out rather than risk deadlock" + cq.logger.WithFields(logrus.Fields{ + "ContainerUUID": ctr.UUID, + "Filters": filters, + }).Error(msg) + return nil, errors.New(msg) + } + delete(missing, ctr.UUID) + } } return next, nil }