X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ffa1fd1fdf584c71e248e9bb7d523f788a517510..5a36ea60cadf2f041f460eeeea970ff9dac66e71:/lib/lsf/dispatch.go diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go index ff95d0db29..7461597c45 100644 --- a/lib/lsf/dispatch.go +++ b/lib/lsf/dispatch.go @@ -5,12 +5,12 @@ package lsf import ( - "bytes" "context" "errors" "fmt" "math" "net/http" + "regexp" "strings" "sync" "time" @@ -161,7 +161,7 @@ func (disp *dispatcher) init() { } } -func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) { +func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error { ctx, cancel := context.WithCancel(disp.Context) defer cancel() @@ -172,38 +172,9 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain cmd := []string{disp.Cluster.Containers.CrunchRunCommand} cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine) cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...) - if err := disp.submit(ctr, cmd); err != nil { - var text string - switch err := err.(type) { - case dispatchcloud.ConstraintsNotSatisfiableError: - var logBuf bytes.Buffer - fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err) - if len(err.AvailableTypes) == 0 { - fmt.Fprint(&logBuf, "No instance types are configured.\n") - } else { - fmt.Fprint(&logBuf, "Available instance types:\n") - for _, t := range err.AvailableTypes { - fmt.Fprintf(&logBuf, - "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n", - t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price, - ) - } - } - text = logBuf.String() - disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled) - default: - text = fmt.Sprintf("Error submitting container %s to LSF: %s", ctr.UUID, err) - } - disp.logger.Print(text) - - lr := arvadosclient.Dict{"log": arvadosclient.Dict{ - "object_uuid": ctr.UUID, - "event_type": "dispatch", - "properties": map[string]string{"text": text}}} - disp.arvDispatcher.Arv.Create("logs", lr, nil) - - disp.arvDispatcher.Unlock(ctr.UUID) - return + err := disp.submit(ctr, cmd) + if err != nil { + return err } } @@ -236,9 +207,14 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain case dispatch.Locked: disp.arvDispatcher.Unlock(ctr.UUID) } - return + return nil case updated, ok := <-status: if !ok { + // status channel is closed, which is + // how arvDispatcher tells us to stop + // touching the container record, kill + // off any remaining LSF processes, + // etc. done = true break } @@ -246,7 +222,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain disp.logger.Infof("container %s changed state from %s to %s", ctr.UUID, ctr.State, updated.State) } ctr = updated - if ctr.Priority == 0 { + if ctr.Priority < 1 { disp.logger.Printf("container %s has state %s, priority %d: cancel lsf job", ctr.UUID, ctr.State, ctr.Priority) disp.bkill(ctr) } else { @@ -267,6 +243,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain } <-ticker.C } + return nil } func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error { @@ -315,8 +292,23 @@ func (disp *dispatcher) bsubConstraintArgs(container arvados.Container) []string } } +// Check the next bjobs report, and invoke TrackContainer for all the +// containers in the report. This gives us a chance to cancel existing +// Arvados LSF jobs (started by a previous dispatch process) that +// never released their LSF job allocations even though their +// container states are Cancelled or Complete. See +// https://dev.arvados.org/issues/10979 func (disp *dispatcher) checkLsfQueueForOrphans() { - disp.logger.Warn("FIXME: checkLsfQueueForOrphans") + containerUuidPattern := regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`) + for _, uuid := range disp.lsfqueue.All() { + if !containerUuidPattern.MatchString(uuid) || !strings.HasPrefix(uuid, disp.Cluster.ClusterID) { + continue + } + err := disp.arvDispatcher.TrackContainer(uuid) + if err != nil { + disp.logger.Warnf("checkLsfQueueForOrphans: TrackContainer(%s): %s", uuid, err) + } + } } func execScript(args []string) []byte {