Merge branch '14383-java-sdk-double-slash'. Fixes #14383.
[arvados.git] / lib / dispatchcloud / container / queue.go
index 965407e518a1aaf16444d2b3ce6f84af90be3b30..bbe47625a893d6874d2c3c415952948f290de74f 100644 (file)
@@ -27,8 +27,8 @@ type APIClient interface {
 type QueueEnt struct {
        // The container to run. Only the UUID, State, Priority, and
        // RuntimeConstraints fields are populated.
-       Container    arvados.Container
-       InstanceType arvados.InstanceType
+       Container    arvados.Container    `json:"container"`
+       InstanceType arvados.InstanceType `json:"instance_type"`
 }
 
 // String implements fmt.Stringer by returning the queued container's
@@ -131,7 +131,7 @@ func (cq *Queue) Forget(uuid string) {
        defer cq.mtx.Unlock()
        ctr := cq.current[uuid].Container
        if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
-               delete(cq.current, uuid)
+               cq.delEnt(uuid, ctr.State)
        }
 }
 
@@ -184,7 +184,9 @@ func (cq *Queue) Update() error {
        cq.mtx.Lock()
        defer cq.mtx.Unlock()
        for uuid, ctr := range next {
-               if _, keep := cq.dontupdate[uuid]; keep {
+               if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
+                       // Don't clobber a local update that happened
+                       // after we started polling.
                        continue
                }
                if cur, ok := cq.current[uuid]; !ok {
@@ -194,13 +196,18 @@ func (cq *Queue) Update() error {
                        cq.current[uuid] = cur
                }
        }
-       for uuid := range cq.current {
-               if _, keep := cq.dontupdate[uuid]; keep {
-                       continue
-               } else if _, keep = next[uuid]; keep {
+       for uuid, ent := range cq.current {
+               if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
+                       // Don't expunge an entry that was
+                       // added/updated locally after we started
+                       // polling.
                        continue
-               } else {
-                       delete(cq.current, uuid)
+               } else if _, stillpresent := next[uuid]; !stillpresent {
+                       // Expunge an entry that no longer appears in
+                       // the poll response (evidently it's
+                       // cancelled, completed, deleted, or taken by
+                       // a different dispatcher).
+                       cq.delEnt(uuid, ent.Container.State)
                }
        }
        cq.dontupdate = nil
@@ -209,13 +216,74 @@ func (cq *Queue) Update() error {
        return nil
 }
 
+// Caller must have lock.
+func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
+       cq.logger.WithFields(logrus.Fields{
+               "ContainerUUID": uuid,
+               "State":         state,
+       }).Info("dropping container from queue")
+       delete(cq.current, uuid)
+}
+
 func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
        it, err := cq.chooseType(&ctr)
-       if err != nil {
-               // FIXME: throttle warnings, cancel after timeout
-               cq.logger.Warnf("cannot run %s", &ctr)
+       if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
+               // We assume here that any chooseType error is a hard
+               // error: it wouldn't help to try again, or to leave
+               // it for a different dispatcher process to attempt.
+               errorString := err.Error()
+               logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
+               logger.WithError(err).Warn("cancel container with no suitable instance type")
+               go func() {
+                       if ctr.State == arvados.ContainerStateQueued {
+                               // Can't set runtime error without
+                               // locking first. If Lock() is
+                               // successful, it will call addEnt()
+                               // again itself, and we'll fall
+                               // through to the
+                               // setRuntimeError/Cancel code below.
+                               err := cq.Lock(ctr.UUID)
+                               if err != nil {
+                                       logger.WithError(err).Warn("lock failed")
+                                       // ...and try again on the
+                                       // next Update, if the problem
+                                       // still exists.
+                               }
+                               return
+                       }
+                       var err error
+                       defer func() {
+                               if err == nil {
+                                       return
+                               }
+                               // On failure, check current container
+                               // state, and don't log the error if
+                               // the failure came from losing a
+                               // race.
+                               var latest arvados.Container
+                               cq.client.RequestAndDecode(&latest, "GET", "arvados/v1/containers/"+ctr.UUID, nil, map[string][]string{"select": {"state"}})
+                               if latest.State == arvados.ContainerStateCancelled {
+                                       return
+                               }
+                               logger.WithError(err).Warn("error while trying to cancel unsatisfiable container")
+                       }()
+                       err = cq.setRuntimeError(ctr.UUID, errorString)
+                       if err != nil {
+                               return
+                       }
+                       err = cq.Cancel(ctr.UUID)
+                       if err != nil {
+                               return
+                       }
+               }()
                return
        }
+       cq.logger.WithFields(logrus.Fields{
+               "ContainerUUID": ctr.UUID,
+               "State":         ctr.State,
+               "Priority":      ctr.Priority,
+               "InstanceType":  it.Name,
+       }).Info("adding container to queue")
        cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
 }
 
@@ -229,6 +297,18 @@ func (cq *Queue) Unlock(uuid string) error {
        return cq.apiUpdate(uuid, "unlock")
 }
 
+// setRuntimeError sets runtime_status["error"] to the given value.
+// Container should already have state==Locked or Running.
+func (cq *Queue) setRuntimeError(uuid, errorString string) error {
+       return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]map[string]interface{}{
+               "container": {
+                       "runtime_status": {
+                               "error": errorString,
+                       },
+               },
+       })
+}
+
 // Cancel cancels the given container.
 func (cq *Queue) Cancel(uuid string) error {
        err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{