package lsf
import (
- "bytes"
"context"
"errors"
"fmt"
"math"
"net/http"
+ "regexp"
"strings"
"sync"
"time"
}
}
-func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
ctx, cancel := context.WithCancel(disp.Context)
defer cancel()
cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...)
- if err := disp.submit(ctr, cmd); err != nil {
- var text string
- switch err := err.(type) {
- case dispatchcloud.ConstraintsNotSatisfiableError:
- var logBuf bytes.Buffer
- fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
- if len(err.AvailableTypes) == 0 {
- fmt.Fprint(&logBuf, "No instance types are configured.\n")
- } else {
- fmt.Fprint(&logBuf, "Available instance types:\n")
- for _, t := range err.AvailableTypes {
- fmt.Fprintf(&logBuf,
- "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
- t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
- )
- }
- }
- text = logBuf.String()
- disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
- default:
- text = fmt.Sprintf("Error submitting container %s to LSF: %s", ctr.UUID, err)
- }
- disp.logger.Print(text)
-
- lr := arvadosclient.Dict{"log": arvadosclient.Dict{
- "object_uuid": ctr.UUID,
- "event_type": "dispatch",
- "properties": map[string]string{"text": text}}}
- disp.arvDispatcher.Arv.Create("logs", lr, nil)
-
- disp.arvDispatcher.Unlock(ctr.UUID)
- return
+ err := disp.submit(ctr, cmd)
+ if err != nil {
+ return err
}
}
case dispatch.Locked:
disp.arvDispatcher.Unlock(ctr.UUID)
}
- return
+ return nil
case updated, ok := <-status:
if !ok {
+ // status channel is closed, which is
+ // how arvDispatcher tells us to stop
+ // touching the container record, kill
+ // off any remaining LSF processes,
+ // etc.
done = true
break
}
disp.logger.Infof("container %s changed state from %s to %s", ctr.UUID, ctr.State, updated.State)
}
ctr = updated
- if ctr.Priority == 0 {
+ if ctr.Priority < 1 {
disp.logger.Printf("container %s has state %s, priority %d: cancel lsf job", ctr.UUID, ctr.State, ctr.Priority)
disp.bkill(ctr)
} else {
}
<-ticker.C
}
+ return nil
}
func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
}
}
+// Check the next bjobs report, and invoke TrackContainer for all the
+// containers in the report. This gives us a chance to cancel existing
+// Arvados LSF jobs (started by a previous dispatch process) that
+// never released their LSF job allocations even though their
+// container states are Cancelled or Complete. See
+// https://dev.arvados.org/issues/10979
func (disp *dispatcher) checkLsfQueueForOrphans() {
- disp.logger.Warn("FIXME: checkLsfQueueForOrphans")
+ containerUuidPattern := regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
+ for _, uuid := range disp.lsfqueue.All() {
+ if !containerUuidPattern.MatchString(uuid) || !strings.HasPrefix(uuid, disp.Cluster.ClusterID) {
+ continue
+ }
+ err := disp.arvDispatcher.TrackContainer(uuid)
+ if err != nil {
+ disp.logger.Warnf("checkLsfQueueForOrphans: TrackContainer(%s): %s", uuid, err)
+ }
+ }
}
func execScript(args []string) []byte {