15397: Remove default_owner_uuid from docs/examples.
[arvados.git] / lib / lsf / dispatch.go
index e2348337e62992eb4463947690e809e1927bb232..897e5803f2334d6d7b3a40671c3a1c04e5cdd34d 100644 (file)
@@ -18,6 +18,8 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/controller/dblock"
+       "git.arvados.org/arvados.git/lib/ctrlctx"
        "git.arvados.org/arvados.git/lib/dispatchcloud"
        "git.arvados.org/arvados.git/lib/service"
        "git.arvados.org/arvados.git/sdk/go/arvados"
@@ -58,6 +60,7 @@ type dispatcher struct {
        Registry  *prometheus.Registry
 
        logger        logrus.FieldLogger
+       dbConnector   ctrlctx.DBConnector
        lsfcli        lsfcli
        lsfqueue      lsfqueue
        arvDispatcher *dispatch.Dispatcher
@@ -73,7 +76,9 @@ type dispatcher struct {
 func (disp *dispatcher) Start() {
        disp.initOnce.Do(func() {
                disp.init()
+               dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB)
                go func() {
+                       defer dblock.Dispatch.Unlock()
                        disp.checkLsfQueueForOrphans()
                        err := disp.arvDispatcher.Run(disp.Context)
                        if err != nil {
@@ -125,6 +130,7 @@ func (disp *dispatcher) init() {
                lsfcli: &disp.lsfcli,
        }
        disp.ArvClient.AuthToken = disp.AuthToken
+       disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL}
        disp.stop = make(chan struct{}, 1)
        disp.stopped = make(chan struct{})
 
@@ -170,6 +176,19 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
        if ctr.State != dispatch.Locked {
                // already started by prior invocation
        } else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
+               if _, err := dispatchcloud.ChooseInstanceType(disp.Cluster, &ctr); errors.As(err, &dispatchcloud.ConstraintsNotSatisfiableError{}) {
+                       err := disp.arvDispatcher.Arv.Update("containers", ctr.UUID, arvadosclient.Dict{
+                               "container": map[string]interface{}{
+                                       "runtime_status": map[string]string{
+                                               "error": err.Error(),
+                                       },
+                               },
+                       }, nil)
+                       if err != nil {
+                               return fmt.Errorf("error setting runtime_status on %s: %s", ctr.UUID, err)
+                       }
+                       return disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
+               }
                disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
                cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
                cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
@@ -184,9 +203,8 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
        defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
 
        go func(uuid string) {
-               cancelled := false
                for ctx.Err() == nil {
-                       qent, ok := disp.lsfqueue.Lookup(uuid)
+                       _, ok := disp.lsfqueue.Lookup(uuid)
                        if !ok {
                                // If the container disappears from
                                // the lsf queue, there is no point in
@@ -196,25 +214,6 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
                                cancel()
                                return
                        }
-                       if !cancelled && qent.Stat == "PEND" && strings.Contains(qent.PendReason, "There are no suitable hosts for the job") {
-                               disp.logger.Printf("container %s: %s", uuid, qent.PendReason)
-                               err := disp.arvDispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{
-                                       "container": map[string]interface{}{
-                                               "runtime_status": map[string]string{
-                                                       "error": qent.PendReason,
-                                               },
-                                       },
-                               }, nil)
-                               if err != nil {
-                                       disp.logger.Printf("error setting runtime_status on %s: %s", uuid, err)
-                                       continue // retry
-                               }
-                               err = disp.arvDispatcher.UpdateState(uuid, dispatch.Cancelled)
-                               if err != nil {
-                                       continue // retry (UpdateState() already logged the error)
-                               }
-                               cancelled = true
-                       }
                }
        }(ctr.UUID)
 
@@ -307,6 +306,15 @@ func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error)
                container.RuntimeConstraints.KeepCacheRAM+
                int64(disp.Cluster.Containers.ReserveExtraRAM)) / 1048576))
 
+       maxruntime := time.Duration(container.SchedulingParameters.MaxRunTime) * time.Second
+       if maxruntime == 0 {
+               maxruntime = disp.Cluster.Containers.LSF.MaxRunTimeDefault.Duration()
+       }
+       if maxruntime > 0 {
+               maxruntime += disp.Cluster.Containers.LSF.MaxRunTimeOverhead.Duration()
+       }
+       maxrunminutes := int64(math.Ceil(float64(maxruntime.Seconds()) / 60))
+
        repl := map[string]string{
                "%%": "%",
                "%C": fmt.Sprintf("%d", vcpus),
@@ -314,6 +322,7 @@ func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error)
                "%T": fmt.Sprintf("%d", tmp),
                "%U": container.UUID,
                "%G": fmt.Sprintf("%d", container.RuntimeConstraints.CUDA.DeviceCount),
+               "%W": fmt.Sprintf("%d", maxrunminutes),
        }
 
        re := regexp.MustCompile(`%.`)
@@ -322,7 +331,16 @@ func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error)
        if container.RuntimeConstraints.CUDA.DeviceCount > 0 {
                argumentTemplate = append(argumentTemplate, disp.Cluster.Containers.LSF.BsubCUDAArguments...)
        }
-       for _, a := range argumentTemplate {
+       for idx, a := range argumentTemplate {
+               if idx > 0 && (argumentTemplate[idx-1] == "-W" || argumentTemplate[idx-1] == "-We") && a == "%W" && maxrunminutes == 0 {
+                       // LSF docs don't specify an argument to "-W"
+                       // or "-We" that indicates "unknown", so
+                       // instead we drop the "-W %W" part of the
+                       // command line entirely when max runtime is
+                       // unknown.
+                       args = args[:len(args)-1]
+                       continue
+               }
                args = append(args, re.ReplaceAllStringFunc(a, func(s string) string {
                        subst := repl[s]
                        if len(subst) == 0 {