package lsf
import (
- "bytes"
"context"
"errors"
"fmt"
}
}
-func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
ctx, cancel := context.WithCancel(disp.Context)
defer cancel()
if ctr.State != dispatch.Locked {
// already started by prior invocation
- } else if _, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+ } else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...)
- if err := disp.submit(ctr, cmd); err != nil {
- var text string
- switch err := err.(type) {
- case dispatchcloud.ConstraintsNotSatisfiableError:
- var logBuf bytes.Buffer
- fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
- if len(err.AvailableTypes) == 0 {
- fmt.Fprint(&logBuf, "No instance types are configured.\n")
- } else {
- fmt.Fprint(&logBuf, "Available instance types:\n")
- for _, t := range err.AvailableTypes {
- fmt.Fprintf(&logBuf,
- "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
- t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
- )
- }
- }
- text = logBuf.String()
- disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
- default:
- text = fmt.Sprintf("Error submitting container %s to LSF: %s", ctr.UUID, err)
- }
- disp.logger.Print(text)
-
- lr := arvadosclient.Dict{"log": arvadosclient.Dict{
- "object_uuid": ctr.UUID,
- "event_type": "dispatch",
- "properties": map[string]string{"text": text}}}
- disp.arvDispatcher.Arv.Create("logs", lr, nil)
-
- disp.arvDispatcher.Unlock(ctr.UUID)
- return
+ err := disp.submit(ctr, cmd)
+ if err != nil {
+ return err
}
}
disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
- // If the container disappears from the lsf queue, there is
- // no point in waiting for further dispatch updates: just
- // clean up and return.
go func(uuid string) {
+ cancelled := false
for ctx.Err() == nil {
- if _, ok := disp.lsfqueue.JobID(uuid); !ok {
+ qent, ok := disp.lsfqueue.Lookup(uuid)
+ if !ok {
+ // If the container disappears from
+ // the lsf queue, there is no point in
+ // waiting for further dispatch
+ // updates: just clean up and return.
disp.logger.Printf("container %s job disappeared from LSF queue", uuid)
cancel()
return
}
+ if !cancelled && qent.Stat == "PEND" && strings.Contains(qent.PendReason, "There are no suitable hosts for the job") {
+ disp.logger.Printf("container %s: %s", uuid, qent.PendReason)
+ err := disp.arvDispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{
+ "container": map[string]interface{}{
+ "runtime_status": map[string]string{
+ "error": qent.PendReason,
+ },
+ },
+ }, nil)
+ if err != nil {
+ disp.logger.Printf("error setting runtime_status on %s: %s", uuid, err)
+ continue // retry
+ }
+ err = disp.arvDispatcher.UpdateState(uuid, dispatch.Cancelled)
+ if err != nil {
+ continue // retry (UpdateState() already logged the error)
+ }
+ cancelled = true
+ }
}
}(ctr.UUID)
case dispatch.Locked:
disp.arvDispatcher.Unlock(ctr.UUID)
}
- return
+ return nil
case updated, ok := <-status:
if !ok {
+ // status channel is closed, which is
+ // how arvDispatcher tells us to stop
+ // touching the container record, kill
+ // off any remaining LSF processes,
+ // etc.
done = true
break
}
disp.logger.Infof("container %s changed state from %s to %s", ctr.UUID, ctr.State, updated.State)
}
ctr = updated
- if ctr.Priority == 0 {
+ if ctr.Priority < 1 {
disp.logger.Printf("container %s has state %s, priority %d: cancel lsf job", ctr.UUID, ctr.State, ctr.Priority)
disp.bkill(ctr)
} else {
// from the queue.
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
- for jobid, ok := disp.lsfqueue.JobID(ctr.UUID); ok; _, ok = disp.lsfqueue.JobID(ctr.UUID) {
- err := disp.lsfcli.Bkill(jobid)
+ for qent, ok := disp.lsfqueue.Lookup(ctr.UUID); ok; _, ok = disp.lsfqueue.Lookup(ctr.UUID) {
+ err := disp.lsfcli.Bkill(qent.ID)
if err != nil {
- disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+ disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
}
<-ticker.C
}
+ return nil
}
func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
}
func (disp *dispatcher) bkill(ctr arvados.Container) {
- if jobid, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+ if qent, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID)
- } else if err := disp.lsfcli.Bkill(jobid); err != nil {
- disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+ } else if err := disp.lsfcli.Bkill(qent.ID); err != nil {
+ disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
}
}
func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) {
args := []string{"bsub"}
- args = append(args, disp.Cluster.Containers.LSF.BsubArgumentsList...)
- args = append(args, "-J", container.UUID)
- args = append(args, disp.bsubConstraintArgs(container)...)
- if u := disp.Cluster.Containers.LSF.BsubSudoUser; u != "" {
- args = append([]string{"sudo", "-E", "-u", u}, args...)
- }
- return args, nil
-}
-func (disp *dispatcher) bsubConstraintArgs(container arvados.Container) []string {
- // TODO: propagate container.SchedulingParameters.Partitions
tmp := int64(math.Ceil(float64(dispatchcloud.EstimateScratchSpace(&container)) / 1048576))
vcpus := container.RuntimeConstraints.VCPUs
mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+
container.RuntimeConstraints.KeepCacheRAM+
int64(disp.Cluster.Containers.ReserveExtraRAM)) / 1048576))
- return []string{
- "-R", fmt.Sprintf("rusage[mem=%dMB:tmp=%dMB] affinity[core(%d)]", mem, tmp, vcpus),
+
+ repl := map[string]string{
+ "%%": "%",
+ "%C": fmt.Sprintf("%d", vcpus),
+ "%M": fmt.Sprintf("%d", mem),
+ "%T": fmt.Sprintf("%d", tmp),
+ "%U": container.UUID,
+ "%G": fmt.Sprintf("%d", container.RuntimeConstraints.CUDA.DeviceCount),
+ }
+
+ re := regexp.MustCompile(`%.`)
+ var substitutionErrors string
+ argumentTemplate := disp.Cluster.Containers.LSF.BsubArgumentsList
+ if container.RuntimeConstraints.CUDA.DeviceCount > 0 {
+ argumentTemplate = append(argumentTemplate, disp.Cluster.Containers.LSF.BsubCUDAArguments...)
+ }
+ for _, a := range argumentTemplate {
+ args = append(args, re.ReplaceAllStringFunc(a, func(s string) string {
+ subst := repl[s]
+ if len(subst) == 0 {
+ substitutionErrors += fmt.Sprintf("Unknown substitution parameter %s in BsubArgumentsList, ", s)
+ }
+ return subst
+ }))
}
+ if len(substitutionErrors) != 0 {
+ return nil, fmt.Errorf("%s", substitutionErrors[:len(substitutionErrors)-2])
+ }
+
+ if u := disp.Cluster.Containers.LSF.BsubSudoUser; u != "" {
+ args = append([]string{"sudo", "-E", "-u", u}, args...)
+ }
+ return args, nil
}
// Check the next bjobs report, and invoke TrackContainer for all the