X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/39464fc833e3ee2fb771f83dce9f94e3856c1075..ced6d55c36132aee7da3a5fe65f608c9dbf33362:/lib/lsf/dispatch.go diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go index d17c458e80..0d9324784d 100644 --- a/lib/lsf/dispatch.go +++ b/lib/lsf/dispatch.go @@ -119,7 +119,7 @@ func (disp *dispatcher) init() { disp.lsfcli.logger = disp.logger disp.lsfqueue = lsfqueue{ logger: disp.logger, - period: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval), + period: disp.Cluster.Containers.CloudVMs.PollInterval.Duration(), lsfcli: &disp.lsfcli, } disp.ArvClient.AuthToken = disp.AuthToken @@ -167,7 +167,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain if ctr.State != dispatch.Locked { // already started by prior invocation - } else if _, ok := disp.lsfqueue.JobID(ctr.UUID); !ok { + } else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok { disp.logger.Printf("Submitting container %s to LSF", ctr.UUID) cmd := []string{disp.Cluster.Containers.CrunchRunCommand} cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine) @@ -181,16 +181,38 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State) defer disp.logger.Printf("Done monitoring container %s", ctr.UUID) - // If the container disappears from the lsf queue, there is - // no point in waiting for further dispatch updates: just - // clean up and return. go func(uuid string) { + cancelled := false for ctx.Err() == nil { - if _, ok := disp.lsfqueue.JobID(uuid); !ok { + qent, ok := disp.lsfqueue.Lookup(uuid) + if !ok { + // If the container disappears from + // the lsf queue, there is no point in + // waiting for further dispatch + // updates: just clean up and return. disp.logger.Printf("container %s job disappeared from LSF queue", uuid) cancel() return } + if !cancelled && qent.Stat == "PEND" && strings.Contains(qent.PendReason, "There are no suitable hosts for the job") { + disp.logger.Printf("container %s: %s", uuid, qent.PendReason) + err := disp.arvDispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{ + "container": map[string]interface{}{ + "runtime_status": map[string]string{ + "error": qent.PendReason, + }, + }, + }, nil) + if err != nil { + disp.logger.Printf("error setting runtime_status on %s: %s", uuid, err) + continue // retry + } + err = disp.arvDispatcher.UpdateState(uuid, dispatch.Cancelled) + if err != nil { + continue // retry (UpdateState() already logged the error) + } + cancelled = true + } } }(ctr.UUID) @@ -234,12 +256,12 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain // Try "bkill" every few seconds until the LSF job disappears // from the queue. - ticker := time.NewTicker(5 * time.Second) + ticker := time.NewTicker(disp.Cluster.Containers.CloudVMs.PollInterval.Duration() / 2) defer ticker.Stop() - for jobid, ok := disp.lsfqueue.JobID(ctr.UUID); ok; _, ok = disp.lsfqueue.JobID(ctr.UUID) { - err := disp.lsfcli.Bkill(jobid) + for qent, ok := disp.lsfqueue.Lookup(ctr.UUID); ok; _, ok = disp.lsfqueue.Lookup(ctr.UUID) { + err := disp.lsfcli.Bkill(qent.ID) if err != nil { - disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err) + disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err) } <-ticker.C } @@ -262,17 +284,15 @@ func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []s } func (disp *dispatcher) bkill(ctr arvados.Container) { - if jobid, ok := disp.lsfqueue.JobID(ctr.UUID); !ok { + if qent, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok { disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID) - } else if err := disp.lsfcli.Bkill(jobid); err != nil { - disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err) + } else if err := disp.lsfcli.Bkill(qent.ID); err != nil { + disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err) } } func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) { - tmpArgs := []string{} args := []string{"bsub"} - tmpArgs = append(tmpArgs, disp.Cluster.Containers.LSF.BsubArgumentsList...) tmp := int64(math.Ceil(float64(dispatchcloud.EstimateScratchSpace(&container)) / 1048576)) vcpus := container.RuntimeConstraints.VCPUs @@ -280,16 +300,32 @@ func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) container.RuntimeConstraints.KeepCacheRAM+ int64(disp.Cluster.Containers.ReserveExtraRAM)) / 1048576)) - r := regexp.MustCompile(`([^%]|^)%([^%])`) - undoubleRE := regexp.MustCompile(`%%`) - for _, a := range tmpArgs { - tmp := r.ReplaceAllStringFunc(a, func(m string) string { - parts := r.FindStringSubmatch(m) - return parts[1] + disp.substitute(parts[2], container.UUID, vcpus, mem, tmp) - }) - // handle escaped literal % symbols - tmp = undoubleRE.ReplaceAllString(tmp, "%") - args = append(args, tmp) + repl := map[string]string{ + "%%": "%", + "%C": fmt.Sprintf("%d", vcpus), + "%M": fmt.Sprintf("%d", mem), + "%T": fmt.Sprintf("%d", tmp), + "%U": container.UUID, + "%G": fmt.Sprintf("%d", container.RuntimeConstraints.CUDA.DeviceCount), + } + + re := regexp.MustCompile(`%.`) + var substitutionErrors string + argumentTemplate := disp.Cluster.Containers.LSF.BsubArgumentsList + if container.RuntimeConstraints.CUDA.DeviceCount > 0 { + argumentTemplate = append(argumentTemplate, disp.Cluster.Containers.LSF.BsubCUDAArguments...) + } + for _, a := range argumentTemplate { + args = append(args, re.ReplaceAllStringFunc(a, func(s string) string { + subst := repl[s] + if len(subst) == 0 { + substitutionErrors += fmt.Sprintf("Unknown substitution parameter %s in BsubArgumentsList, ", s) + } + return subst + })) + } + if len(substitutionErrors) != 0 { + return nil, fmt.Errorf("%s", substitutionErrors[:len(substitutionErrors)-2]) } if u := disp.Cluster.Containers.LSF.BsubSudoUser; u != "" { @@ -298,23 +334,6 @@ func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) return args, nil } -func (disp *dispatcher) substitute(l string, uuid string, vcpus int, mem, tmp int64) string { - var arg string - switch l { - case "C": - arg = fmt.Sprintf("%d", vcpus) - case "T": - arg = fmt.Sprintf("%d", tmp) - case "M": - arg = fmt.Sprintf("%d", mem) - case "U": - arg = uuid - default: - arg = "%" + l - } - return arg -} - // Check the next bjobs report, and invoke TrackContainer for all the // containers in the report. This gives us a chance to cancel existing // Arvados LSF jobs (started by a previous dispatch process) that