Merge branch 'main' into 18842-arv-mount-disk-config
[arvados.git] / lib / lsf / dispatch.go
index 0d9324784d503e1fb30789e45e2f65ae7b84fdd1..d1408d23cb1a4e3c2274f40d2f02b66bda29e82d 100644 (file)
@@ -6,6 +6,8 @@ package lsf
 
 import (
        "context"
+       "crypto/hmac"
+       "crypto/sha256"
        "errors"
        "fmt"
        "math"
@@ -16,6 +18,8 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/controller/dblock"
+       "git.arvados.org/arvados.git/lib/ctrlctx"
        "git.arvados.org/arvados.git/lib/dispatchcloud"
        "git.arvados.org/arvados.git/lib/service"
        "git.arvados.org/arvados.git/sdk/go/arvados"
@@ -56,6 +60,7 @@ type dispatcher struct {
        Registry  *prometheus.Registry
 
        logger        logrus.FieldLogger
+       dbConnector   ctrlctx.DBConnector
        lsfcli        lsfcli
        lsfqueue      lsfqueue
        arvDispatcher *dispatch.Dispatcher
@@ -71,7 +76,9 @@ type dispatcher struct {
 func (disp *dispatcher) Start() {
        disp.initOnce.Do(func() {
                disp.init()
+               dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB)
                go func() {
+                       defer dblock.Dispatch.Unlock()
                        disp.checkLsfQueueForOrphans()
                        err := disp.arvDispatcher.Run(disp.Context)
                        if err != nil {
@@ -123,6 +130,7 @@ func (disp *dispatcher) init() {
                lsfcli: &disp.lsfcli,
        }
        disp.ArvClient.AuthToken = disp.AuthToken
+       disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL}
        disp.stop = make(chan struct{}, 1)
        disp.stopped = make(chan struct{})
 
@@ -168,6 +176,19 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
        if ctr.State != dispatch.Locked {
                // already started by prior invocation
        } else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
+               if _, err := dispatchcloud.ChooseInstanceType(disp.Cluster, &ctr); errors.As(err, &dispatchcloud.ConstraintsNotSatisfiableError{}) {
+                       err := disp.arvDispatcher.Arv.Update("containers", ctr.UUID, arvadosclient.Dict{
+                               "container": map[string]interface{}{
+                                       "runtime_status": map[string]string{
+                                               "error": err.Error(),
+                                       },
+                               },
+                       }, nil)
+                       if err != nil {
+                               return fmt.Errorf("error setting runtime_status on %s: %s", ctr.UUID, err)
+                       }
+                       return disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
+               }
                disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
                cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
                cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
@@ -182,9 +203,8 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
        defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
 
        go func(uuid string) {
-               cancelled := false
                for ctx.Err() == nil {
-                       qent, ok := disp.lsfqueue.Lookup(uuid)
+                       _, ok := disp.lsfqueue.Lookup(uuid)
                        if !ok {
                                // If the container disappears from
                                // the lsf queue, there is no point in
@@ -194,25 +214,6 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
                                cancel()
                                return
                        }
-                       if !cancelled && qent.Stat == "PEND" && strings.Contains(qent.PendReason, "There are no suitable hosts for the job") {
-                               disp.logger.Printf("container %s: %s", uuid, qent.PendReason)
-                               err := disp.arvDispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{
-                                       "container": map[string]interface{}{
-                                               "runtime_status": map[string]string{
-                                                       "error": qent.PendReason,
-                                               },
-                                       },
-                               }, nil)
-                               if err != nil {
-                                       disp.logger.Printf("error setting runtime_status on %s: %s", uuid, err)
-                                       continue // retry
-                               }
-                               err = disp.arvDispatcher.UpdateState(uuid, dispatch.Cancelled)
-                               if err != nil {
-                                       continue // retry (UpdateState() already logged the error)
-                               }
-                               cancelled = true
-                       }
                }
        }(ctr.UUID)
 
@@ -274,7 +275,12 @@ func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []s
        var crArgs []string
        crArgs = append(crArgs, crunchRunCommand...)
        crArgs = append(crArgs, container.UUID)
-       crScript := execScript(crArgs)
+
+       h := hmac.New(sha256.New, []byte(disp.Cluster.SystemRootToken))
+       fmt.Fprint(h, container.UUID)
+       authsecret := fmt.Sprintf("%x", h.Sum(nil))
+
+       crScript := execScript(crArgs, map[string]string{"GatewayAuthSecret": authsecret})
 
        bsubArgs, err := disp.bsubArgs(container)
        if err != nil {
@@ -353,8 +359,14 @@ func (disp *dispatcher) checkLsfQueueForOrphans() {
        }
 }
 
-func execScript(args []string) []byte {
-       s := "#!/bin/sh\nexec"
+func execScript(args []string, env map[string]string) []byte {
+       s := "#!/bin/sh\n"
+       for k, v := range env {
+               s += k + `='`
+               s += strings.Replace(v, `'`, `'\''`, -1)
+               s += `' `
+       }
+       s += `exec`
        for _, w := range args {
                s += ` '`
                s += strings.Replace(w, `'`, `'\''`, -1)