Merge branch 'main' into 18842-arv-mount-disk-config
[arvados.git] / lib / lsf / dispatch.go
index ff95d0db29c0a86aa38432cf3928184537fd8f23..d1408d23cb1a4e3c2274f40d2f02b66bda29e82d 100644 (file)
@@ -5,17 +5,21 @@
 package lsf
 
 import (
-       "bytes"
        "context"
+       "crypto/hmac"
+       "crypto/sha256"
        "errors"
        "fmt"
        "math"
        "net/http"
+       "regexp"
        "strings"
        "sync"
        "time"
 
        "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/controller/dblock"
+       "git.arvados.org/arvados.git/lib/ctrlctx"
        "git.arvados.org/arvados.git/lib/dispatchcloud"
        "git.arvados.org/arvados.git/lib/service"
        "git.arvados.org/arvados.git/sdk/go/arvados"
@@ -56,6 +60,7 @@ type dispatcher struct {
        Registry  *prometheus.Registry
 
        logger        logrus.FieldLogger
+       dbConnector   ctrlctx.DBConnector
        lsfcli        lsfcli
        lsfqueue      lsfqueue
        arvDispatcher *dispatch.Dispatcher
@@ -71,7 +76,9 @@ type dispatcher struct {
 func (disp *dispatcher) Start() {
        disp.initOnce.Do(func() {
                disp.init()
+               dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB)
                go func() {
+                       defer dblock.Dispatch.Unlock()
                        disp.checkLsfQueueForOrphans()
                        err := disp.arvDispatcher.Run(disp.Context)
                        if err != nil {
@@ -119,10 +126,11 @@ func (disp *dispatcher) init() {
        disp.lsfcli.logger = disp.logger
        disp.lsfqueue = lsfqueue{
                logger: disp.logger,
-               period: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
+               period: disp.Cluster.Containers.CloudVMs.PollInterval.Duration(),
                lsfcli: &disp.lsfcli,
        }
        disp.ArvClient.AuthToken = disp.AuthToken
+       disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL}
        disp.stop = make(chan struct{}, 1)
        disp.stopped = make(chan struct{})
 
@@ -161,61 +169,47 @@ func (disp *dispatcher) init() {
        }
 }
 
-func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
        ctx, cancel := context.WithCancel(disp.Context)
        defer cancel()
 
        if ctr.State != dispatch.Locked {
                // already started by prior invocation
-       } else if _, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+       } else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
+               if _, err := dispatchcloud.ChooseInstanceType(disp.Cluster, &ctr); errors.As(err, &dispatchcloud.ConstraintsNotSatisfiableError{}) {
+                       err := disp.arvDispatcher.Arv.Update("containers", ctr.UUID, arvadosclient.Dict{
+                               "container": map[string]interface{}{
+                                       "runtime_status": map[string]string{
+                                               "error": err.Error(),
+                                       },
+                               },
+                       }, nil)
+                       if err != nil {
+                               return fmt.Errorf("error setting runtime_status on %s: %s", ctr.UUID, err)
+                       }
+                       return disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
+               }
                disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
                cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
                cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
                cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...)
-               if err := disp.submit(ctr, cmd); err != nil {
-                       var text string
-                       switch err := err.(type) {
-                       case dispatchcloud.ConstraintsNotSatisfiableError:
-                               var logBuf bytes.Buffer
-                               fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
-                               if len(err.AvailableTypes) == 0 {
-                                       fmt.Fprint(&logBuf, "No instance types are configured.\n")
-                               } else {
-                                       fmt.Fprint(&logBuf, "Available instance types:\n")
-                                       for _, t := range err.AvailableTypes {
-                                               fmt.Fprintf(&logBuf,
-                                                       "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
-                                                       t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
-                                               )
-                                       }
-                               }
-                               text = logBuf.String()
-                               disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
-                       default:
-                               text = fmt.Sprintf("Error submitting container %s to LSF: %s", ctr.UUID, err)
-                       }
-                       disp.logger.Print(text)
-
-                       lr := arvadosclient.Dict{"log": arvadosclient.Dict{
-                               "object_uuid": ctr.UUID,
-                               "event_type":  "dispatch",
-                               "properties":  map[string]string{"text": text}}}
-                       disp.arvDispatcher.Arv.Create("logs", lr, nil)
-
-                       disp.arvDispatcher.Unlock(ctr.UUID)
-                       return
+               err := disp.submit(ctr, cmd)
+               if err != nil {
+                       return err
                }
        }
 
        disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
        defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
 
-       // If the container disappears from the lsf queue, there is
-       // no point in waiting for further dispatch updates: just
-       // clean up and return.
        go func(uuid string) {
                for ctx.Err() == nil {
-                       if _, ok := disp.lsfqueue.JobID(uuid); !ok {
+                       _, ok := disp.lsfqueue.Lookup(uuid)
+                       if !ok {
+                               // If the container disappears from
+                               // the lsf queue, there is no point in
+                               // waiting for further dispatch
+                               // updates: just clean up and return.
                                disp.logger.Printf("container %s job disappeared from LSF queue", uuid)
                                cancel()
                                return
@@ -236,9 +230,14 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
                        case dispatch.Locked:
                                disp.arvDispatcher.Unlock(ctr.UUID)
                        }
-                       return
+                       return nil
                case updated, ok := <-status:
                        if !ok {
+                               // status channel is closed, which is
+                               // how arvDispatcher tells us to stop
+                               // touching the container record, kill
+                               // off any remaining LSF processes,
+                               // etc.
                                done = true
                                break
                        }
@@ -246,7 +245,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
                                disp.logger.Infof("container %s changed state from %s to %s", ctr.UUID, ctr.State, updated.State)
                        }
                        ctr = updated
-                       if ctr.Priority == 0 {
+                       if ctr.Priority < 1 {
                                disp.logger.Printf("container %s has state %s, priority %d: cancel lsf job", ctr.UUID, ctr.State, ctr.Priority)
                                disp.bkill(ctr)
                        } else {
@@ -258,15 +257,16 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
 
        // Try "bkill" every few seconds until the LSF job disappears
        // from the queue.
-       ticker := time.NewTicker(5 * time.Second)
+       ticker := time.NewTicker(disp.Cluster.Containers.CloudVMs.PollInterval.Duration() / 2)
        defer ticker.Stop()
-       for jobid, ok := disp.lsfqueue.JobID(ctr.UUID); ok; _, ok = disp.lsfqueue.JobID(ctr.UUID) {
-               err := disp.lsfcli.Bkill(jobid)
+       for qent, ok := disp.lsfqueue.Lookup(ctr.UUID); ok; _, ok = disp.lsfqueue.Lookup(ctr.UUID) {
+               err := disp.lsfcli.Bkill(qent.ID)
                if err != nil {
-                       disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+                       disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
                }
                <-ticker.C
        }
+       return nil
 }
 
 func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
@@ -275,7 +275,12 @@ func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []s
        var crArgs []string
        crArgs = append(crArgs, crunchRunCommand...)
        crArgs = append(crArgs, container.UUID)
-       crScript := execScript(crArgs)
+
+       h := hmac.New(sha256.New, []byte(disp.Cluster.SystemRootToken))
+       fmt.Fprint(h, container.UUID)
+       authsecret := fmt.Sprintf("%x", h.Sum(nil))
+
+       crScript := execScript(crArgs, map[string]string{"GatewayAuthSecret": authsecret})
 
        bsubArgs, err := disp.bsubArgs(container)
        if err != nil {
@@ -285,42 +290,83 @@ func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []s
 }
 
 func (disp *dispatcher) bkill(ctr arvados.Container) {
-       if jobid, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+       if qent, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
                disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID)
-       } else if err := disp.lsfcli.Bkill(jobid); err != nil {
-               disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+       } else if err := disp.lsfcli.Bkill(qent.ID); err != nil {
+               disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
        }
 }
 
 func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) {
        args := []string{"bsub"}
-       args = append(args, disp.Cluster.Containers.LSF.BsubArgumentsList...)
-       args = append(args, "-J", container.UUID)
-       args = append(args, disp.bsubConstraintArgs(container)...)
-       if u := disp.Cluster.Containers.LSF.BsubSudoUser; u != "" {
-               args = append([]string{"sudo", "-E", "-u", u}, args...)
-       }
-       return args, nil
-}
 
-func (disp *dispatcher) bsubConstraintArgs(container arvados.Container) []string {
-       // TODO: propagate container.SchedulingParameters.Partitions
        tmp := int64(math.Ceil(float64(dispatchcloud.EstimateScratchSpace(&container)) / 1048576))
        vcpus := container.RuntimeConstraints.VCPUs
        mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+
                container.RuntimeConstraints.KeepCacheRAM+
                int64(disp.Cluster.Containers.ReserveExtraRAM)) / 1048576))
-       return []string{
-               "-R", fmt.Sprintf("rusage[mem=%dMB:tmp=%dMB] affinity[core(%d)]", mem, tmp, vcpus),
+
+       repl := map[string]string{
+               "%%": "%",
+               "%C": fmt.Sprintf("%d", vcpus),
+               "%M": fmt.Sprintf("%d", mem),
+               "%T": fmt.Sprintf("%d", tmp),
+               "%U": container.UUID,
+               "%G": fmt.Sprintf("%d", container.RuntimeConstraints.CUDA.DeviceCount),
+       }
+
+       re := regexp.MustCompile(`%.`)
+       var substitutionErrors string
+       argumentTemplate := disp.Cluster.Containers.LSF.BsubArgumentsList
+       if container.RuntimeConstraints.CUDA.DeviceCount > 0 {
+               argumentTemplate = append(argumentTemplate, disp.Cluster.Containers.LSF.BsubCUDAArguments...)
+       }
+       for _, a := range argumentTemplate {
+               args = append(args, re.ReplaceAllStringFunc(a, func(s string) string {
+                       subst := repl[s]
+                       if len(subst) == 0 {
+                               substitutionErrors += fmt.Sprintf("Unknown substitution parameter %s in BsubArgumentsList, ", s)
+                       }
+                       return subst
+               }))
+       }
+       if len(substitutionErrors) != 0 {
+               return nil, fmt.Errorf("%s", substitutionErrors[:len(substitutionErrors)-2])
+       }
+
+       if u := disp.Cluster.Containers.LSF.BsubSudoUser; u != "" {
+               args = append([]string{"sudo", "-E", "-u", u}, args...)
        }
+       return args, nil
 }
 
+// Check the next bjobs report, and invoke TrackContainer for all the
+// containers in the report. This gives us a chance to cancel existing
+// Arvados LSF jobs (started by a previous dispatch process) that
+// never released their LSF job allocations even though their
+// container states are Cancelled or Complete. See
+// https://dev.arvados.org/issues/10979
 func (disp *dispatcher) checkLsfQueueForOrphans() {
-       disp.logger.Warn("FIXME: checkLsfQueueForOrphans")
+       containerUuidPattern := regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
+       for _, uuid := range disp.lsfqueue.All() {
+               if !containerUuidPattern.MatchString(uuid) || !strings.HasPrefix(uuid, disp.Cluster.ClusterID) {
+                       continue
+               }
+               err := disp.arvDispatcher.TrackContainer(uuid)
+               if err != nil {
+                       disp.logger.Warnf("checkLsfQueueForOrphans: TrackContainer(%s): %s", uuid, err)
+               }
+       }
 }
 
-func execScript(args []string) []byte {
-       s := "#!/bin/sh\nexec"
+func execScript(args []string, env map[string]string) []byte {
+       s := "#!/bin/sh\n"
+       for k, v := range env {
+               s += k + `='`
+               s += strings.Replace(v, `'`, `'\''`, -1)
+               s += `' `
+       }
+       s += `exec`
        for _, w := range args {
                s += ` '`
                s += strings.Replace(w, `'`, `'\''`, -1)