type QueueEnt struct {
// The container to run. Only the UUID, State, Priority, and
// RuntimeConstraints fields are populated.
- Container arvados.Container
- InstanceType arvados.InstanceType
+ Container arvados.Container `json:"container"`
+ InstanceType arvados.InstanceType `json:"instance_type"`
}
// String implements fmt.Stringer by returning the queued container's
func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
it, err := cq.chooseType(&ctr)
- if err != nil {
- // FIXME: throttle warnings, cancel after timeout
- cq.logger.Warnf("cannot run %s", &ctr)
+ if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
+ // We assume here that any chooseType error is a hard
+ // error: it wouldn't help to try again, or to leave
+ // it for a different dispatcher process to attempt.
+ errorString := err.Error()
+ cq.logger.WithField("ContainerUUID", ctr.UUID).Warn("cancel container with no suitable instance type")
+ go func() {
+ var err error
+ defer func() {
+ if err == nil {
+ return
+ }
+ // On failure, check current container
+ // state, and don't log the error if
+ // the failure came from losing a
+ // race.
+ var latest arvados.Container
+ cq.client.RequestAndDecode(&latest, "GET", "arvados/v1/containers/"+ctr.UUID, nil, map[string][]string{"select": {"state"}})
+ if latest.State == arvados.ContainerStateCancelled {
+ return
+ }
+ cq.logger.WithField("ContainerUUID", ctr.UUID).WithError(err).Warn("error while trying to cancel unsatisfiable container")
+ }()
+ if ctr.State == arvados.ContainerStateQueued {
+ err = cq.Lock(ctr.UUID)
+ if err != nil {
+ return
+ }
+ }
+ err = cq.setRuntimeError(ctr.UUID, errorString)
+ if err != nil {
+ return
+ }
+ err = cq.Cancel(ctr.UUID)
+ if err != nil {
+ return
+ }
+ }()
return
}
cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
return cq.apiUpdate(uuid, "unlock")
}
+// setRuntimeError sets runtime_status["error"] to the given value.
+// Container should already have state==Locked or Running.
+func (cq *Queue) setRuntimeError(uuid, errorString string) error {
+ return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]map[string]interface{}{
+ "container": {
+ "runtime_status": {
+ "error": errorString,
+ },
+ },
+ })
+}
+
// Cancel cancels the given container.
func (cq *Queue) Cancel(uuid string) error {
err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{