19129: add missing exit_code from select
[arvados.git] / lib / lsf / dispatch.go
index d17c458e8005b8837492caf2e6c2ba8ea158d502..0d9324784d503e1fb30789e45e2f65ae7b84fdd1 100644 (file)
@@ -119,7 +119,7 @@ func (disp *dispatcher) init() {
        disp.lsfcli.logger = disp.logger
        disp.lsfqueue = lsfqueue{
                logger: disp.logger,
-               period: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
+               period: disp.Cluster.Containers.CloudVMs.PollInterval.Duration(),
                lsfcli: &disp.lsfcli,
        }
        disp.ArvClient.AuthToken = disp.AuthToken
@@ -167,7 +167,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
 
        if ctr.State != dispatch.Locked {
                // already started by prior invocation
-       } else if _, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+       } else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
                disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
                cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
                cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
@@ -181,16 +181,38 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
        disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
        defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
 
-       // If the container disappears from the lsf queue, there is
-       // no point in waiting for further dispatch updates: just
-       // clean up and return.
        go func(uuid string) {
+               cancelled := false
                for ctx.Err() == nil {
-                       if _, ok := disp.lsfqueue.JobID(uuid); !ok {
+                       qent, ok := disp.lsfqueue.Lookup(uuid)
+                       if !ok {
+                               // If the container disappears from
+                               // the lsf queue, there is no point in
+                               // waiting for further dispatch
+                               // updates: just clean up and return.
                                disp.logger.Printf("container %s job disappeared from LSF queue", uuid)
                                cancel()
                                return
                        }
+                       if !cancelled && qent.Stat == "PEND" && strings.Contains(qent.PendReason, "There are no suitable hosts for the job") {
+                               disp.logger.Printf("container %s: %s", uuid, qent.PendReason)
+                               err := disp.arvDispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{
+                                       "container": map[string]interface{}{
+                                               "runtime_status": map[string]string{
+                                                       "error": qent.PendReason,
+                                               },
+                                       },
+                               }, nil)
+                               if err != nil {
+                                       disp.logger.Printf("error setting runtime_status on %s: %s", uuid, err)
+                                       continue // retry
+                               }
+                               err = disp.arvDispatcher.UpdateState(uuid, dispatch.Cancelled)
+                               if err != nil {
+                                       continue // retry (UpdateState() already logged the error)
+                               }
+                               cancelled = true
+                       }
                }
        }(ctr.UUID)
 
@@ -234,12 +256,12 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
 
        // Try "bkill" every few seconds until the LSF job disappears
        // from the queue.
-       ticker := time.NewTicker(5 * time.Second)
+       ticker := time.NewTicker(disp.Cluster.Containers.CloudVMs.PollInterval.Duration() / 2)
        defer ticker.Stop()
-       for jobid, ok := disp.lsfqueue.JobID(ctr.UUID); ok; _, ok = disp.lsfqueue.JobID(ctr.UUID) {
-               err := disp.lsfcli.Bkill(jobid)
+       for qent, ok := disp.lsfqueue.Lookup(ctr.UUID); ok; _, ok = disp.lsfqueue.Lookup(ctr.UUID) {
+               err := disp.lsfcli.Bkill(qent.ID)
                if err != nil {
-                       disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+                       disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
                }
                <-ticker.C
        }
@@ -262,17 +284,15 @@ func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []s
 }
 
 func (disp *dispatcher) bkill(ctr arvados.Container) {
-       if jobid, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+       if qent, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
                disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID)
-       } else if err := disp.lsfcli.Bkill(jobid); err != nil {
-               disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+       } else if err := disp.lsfcli.Bkill(qent.ID); err != nil {
+               disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
        }
 }
 
 func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) {
-       tmpArgs := []string{}
        args := []string{"bsub"}
-       tmpArgs = append(tmpArgs, disp.Cluster.Containers.LSF.BsubArgumentsList...)
 
        tmp := int64(math.Ceil(float64(dispatchcloud.EstimateScratchSpace(&container)) / 1048576))
        vcpus := container.RuntimeConstraints.VCPUs
@@ -280,16 +300,32 @@ func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error)
                container.RuntimeConstraints.KeepCacheRAM+
                int64(disp.Cluster.Containers.ReserveExtraRAM)) / 1048576))
 
-       r := regexp.MustCompile(`([^%]|^)%([^%])`)
-       undoubleRE := regexp.MustCompile(`%%`)
-       for _, a := range tmpArgs {
-               tmp := r.ReplaceAllStringFunc(a, func(m string) string {
-                       parts := r.FindStringSubmatch(m)
-                       return parts[1] + disp.substitute(parts[2], container.UUID, vcpus, mem, tmp)
-               })
-               // handle escaped literal % symbols
-               tmp = undoubleRE.ReplaceAllString(tmp, "%")
-               args = append(args, tmp)
+       repl := map[string]string{
+               "%%": "%",
+               "%C": fmt.Sprintf("%d", vcpus),
+               "%M": fmt.Sprintf("%d", mem),
+               "%T": fmt.Sprintf("%d", tmp),
+               "%U": container.UUID,
+               "%G": fmt.Sprintf("%d", container.RuntimeConstraints.CUDA.DeviceCount),
+       }
+
+       re := regexp.MustCompile(`%.`)
+       var substitutionErrors string
+       argumentTemplate := disp.Cluster.Containers.LSF.BsubArgumentsList
+       if container.RuntimeConstraints.CUDA.DeviceCount > 0 {
+               argumentTemplate = append(argumentTemplate, disp.Cluster.Containers.LSF.BsubCUDAArguments...)
+       }
+       for _, a := range argumentTemplate {
+               args = append(args, re.ReplaceAllStringFunc(a, func(s string) string {
+                       subst := repl[s]
+                       if len(subst) == 0 {
+                               substitutionErrors += fmt.Sprintf("Unknown substitution parameter %s in BsubArgumentsList, ", s)
+                       }
+                       return subst
+               }))
+       }
+       if len(substitutionErrors) != 0 {
+               return nil, fmt.Errorf("%s", substitutionErrors[:len(substitutionErrors)-2])
        }
 
        if u := disp.Cluster.Containers.LSF.BsubSudoUser; u != "" {
@@ -298,23 +334,6 @@ func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error)
        return args, nil
 }
 
-func (disp *dispatcher) substitute(l string, uuid string, vcpus int, mem, tmp int64) string {
-       var arg string
-       switch l {
-       case "C":
-               arg = fmt.Sprintf("%d", vcpus)
-       case "T":
-               arg = fmt.Sprintf("%d", tmp)
-       case "M":
-               arg = fmt.Sprintf("%d", mem)
-       case "U":
-               arg = uuid
-       default:
-               arg = "%" + l
-       }
-       return arg
-}
-
 // Check the next bjobs report, and invoke TrackContainer for all the
 // containers in the report. This gives us a chance to cancel existing
 // Arvados LSF jobs (started by a previous dispatch process) that