17756: Add lsf dispatcher.
authorTom Clegg <tom@curii.com>
Fri, 2 Jul 2021 17:27:46 +0000 (13:27 -0400)
committerTom Clegg <tom@curii.com>
Sun, 4 Jul 2021 17:31:56 +0000 (13:31 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

cmd/arvados-server/arvados-dispatch-lsf.service [new file with mode: 0644]
cmd/arvados-server/cmd.go
lib/config/config.default.yml
lib/config/export.go
lib/config/generated_config.go
lib/lsf/dispatch.go [new file with mode: 0644]
lib/lsf/dispatch_test.go [new file with mode: 0644]
lib/lsf/lsfcli.go [new file with mode: 0644]
lib/lsf/lsfqueue.go [new file with mode: 0644]
sdk/go/arvados/config.go
sdk/go/arvadostest/fixtures.go

diff --git a/cmd/arvados-server/arvados-dispatch-lsf.service b/cmd/arvados-server/arvados-dispatch-lsf.service
new file mode 100644 (file)
index 0000000..f9e73a2
--- /dev/null
@@ -0,0 +1,30 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+[Unit]
+Description=arvados-dispatch-lsf
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/config.yml
+
+# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
+StartLimitInterval=0
+
+# systemd>=230 (debian:9) obeys StartLimitIntervalSec in the [Unit] section
+StartLimitIntervalSec=0
+
+[Service]
+Type=notify
+EnvironmentFile=-/etc/arvados/environment
+ExecStart=/usr/bin/arvados-dispatch-lsf
+# Set a reasonable default for the open file limit
+LimitNOFILE=65536
+Restart=always
+RestartSec=1
+
+# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
+StartLimitInterval=0
+
+[Install]
+WantedBy=multi-user.target
index d0aa9da94df537bf80a3ed232c2a0ae2c3a0e1d6..4b94a7813869915c38c14ec7927a8a2662e30475 100644 (file)
@@ -15,6 +15,7 @@ import (
        "git.arvados.org/arvados.git/lib/crunchrun"
        "git.arvados.org/arvados.git/lib/dispatchcloud"
        "git.arvados.org/arvados.git/lib/install"
+       "git.arvados.org/arvados.git/lib/lsf"
        "git.arvados.org/arvados.git/lib/recovercollection"
        "git.arvados.org/arvados.git/services/ws"
 )
@@ -33,6 +34,7 @@ var (
                "controller":         controller.Command,
                "crunch-run":         crunchrun.Command,
                "dispatch-cloud":     dispatchcloud.Command,
+               "dispatch-lsf":       lsf.DispatchCommand,
                "install":            install.Command,
                "init":               install.InitCommand,
                "recover-collection": recovercollection.Command,
index e28d5cbb7f0cd09b3ad559f6cab0c9a9967c10d5..975b4cd0f498d4c2cbec564923dd7d3c08364bcc 100644 (file)
@@ -52,6 +52,9 @@ Clusters:
       DispatchCloud:
         InternalURLs: {SAMPLE: {}}
         ExternalURL: "-"
+      DispatchLSF:
+        InternalURLs: {SAMPLE: {}}
+        ExternalURL: "-"
       Keepproxy:
         InternalURLs: {SAMPLE: {}}
         ExternalURL: ""
@@ -1012,6 +1015,15 @@ Clusters:
           # (See http://ruby-doc.org/core-2.2.2/Kernel.html#method-i-format for more.)
           AssignNodeHostname: "compute%<slot_number>d"
 
+      LSF:
+        # Additional arguments to bsub when submitting Arvados
+        # containers as LSF jobs.
+        BsubArgumentsList: []
+
+        # Use sudo to switch to this user account when submitting LSF
+        # jobs.
+        BsubSudoUser: "crunch"
+
       JobsAPI:
         # Enable the legacy 'jobs' API (crunch v1).  This value must be a string.
         #
index 8753b52f27e17dee80958fd32ab25e46cda2f9fb..7adb50ec374006063f23a1cd620ea46d8446728e 100644 (file)
@@ -120,6 +120,7 @@ var whitelist = map[string]bool{
        "Containers.JobsAPI.GitInternalDir":                   false,
        "Containers.Logging":                                  false,
        "Containers.LogReuseDecisions":                        false,
+       "Containers.LSF":                                      false,
        "Containers.MaxComputeVMs":                            false,
        "Containers.MaxDispatchAttempts":                      false,
        "Containers.MaxRetryAttempts":                         true,
index b15bf7eebc29facda6f1a0e2670c5482f83055cf..dec25055892dd7cf84d3896f76031d0fff0c5113 100644 (file)
@@ -58,6 +58,9 @@ Clusters:
       DispatchCloud:
         InternalURLs: {SAMPLE: {}}
         ExternalURL: "-"
+      DispatchLSF:
+        InternalURLs: {SAMPLE: {}}
+        ExternalURL: "-"
       Keepproxy:
         InternalURLs: {SAMPLE: {}}
         ExternalURL: ""
@@ -1018,6 +1021,15 @@ Clusters:
           # (See http://ruby-doc.org/core-2.2.2/Kernel.html#method-i-format for more.)
           AssignNodeHostname: "compute%<slot_number>d"
 
+      LSF:
+        # Additional arguments to bsub when submitting Arvados
+        # containers as LSF jobs.
+        BsubArgumentsList: []
+
+        # Use sudo to switch to this user account when submitting LSF
+        # jobs.
+        BsubSudoUser: "crunch"
+
       JobsAPI:
         # Enable the legacy 'jobs' API (crunch v1).  This value must be a string.
         #
diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go
new file mode 100644 (file)
index 0000000..ff95d0d
--- /dev/null
@@ -0,0 +1,330 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+       "bytes"
+       "context"
+       "errors"
+       "fmt"
+       "math"
+       "net/http"
+       "strings"
+       "sync"
+       "time"
+
+       "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/dispatchcloud"
+       "git.arvados.org/arvados.git/lib/service"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/auth"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "git.arvados.org/arvados.git/sdk/go/dispatch"
+       "git.arvados.org/arvados.git/sdk/go/health"
+       "github.com/julienschmidt/httprouter"
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promhttp"
+       "github.com/sirupsen/logrus"
+)
+
+var DispatchCommand cmd.Handler = service.Command(arvados.ServiceNameDispatchLSF, newHandler)
+
+func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
+       ac, err := arvados.NewClientFromConfig(cluster)
+       if err != nil {
+               return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
+       }
+       d := &dispatcher{
+               Cluster:   cluster,
+               Context:   ctx,
+               ArvClient: ac,
+               AuthToken: token,
+               Registry:  reg,
+       }
+       go d.Start()
+       return d
+}
+
+type dispatcher struct {
+       Cluster   *arvados.Cluster
+       Context   context.Context
+       ArvClient *arvados.Client
+       AuthToken string
+       Registry  *prometheus.Registry
+
+       logger        logrus.FieldLogger
+       lsfcli        lsfcli
+       lsfqueue      lsfqueue
+       arvDispatcher *dispatch.Dispatcher
+       httpHandler   http.Handler
+
+       initOnce sync.Once
+       stop     chan struct{}
+       stopped  chan struct{}
+}
+
+// Start starts the dispatcher. Start can be called multiple times
+// with no ill effect.
+func (disp *dispatcher) Start() {
+       disp.initOnce.Do(func() {
+               disp.init()
+               go func() {
+                       disp.checkLsfQueueForOrphans()
+                       err := disp.arvDispatcher.Run(disp.Context)
+                       if err != nil {
+                               disp.logger.Error(err)
+                               disp.Close()
+                       }
+               }()
+       })
+}
+
+// ServeHTTP implements service.Handler.
+func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+       disp.Start()
+       disp.httpHandler.ServeHTTP(w, r)
+}
+
+// CheckHealth implements service.Handler.
+func (disp *dispatcher) CheckHealth() error {
+       disp.Start()
+       select {
+       case <-disp.stopped:
+               return errors.New("stopped")
+       default:
+               return nil
+       }
+}
+
+// Done implements service.Handler.
+func (disp *dispatcher) Done() <-chan struct{} {
+       return disp.stopped
+}
+
+// Stop dispatching containers and release resources. Used by tests.
+func (disp *dispatcher) Close() {
+       disp.Start()
+       select {
+       case disp.stop <- struct{}{}:
+       default:
+       }
+       <-disp.stopped
+}
+
+func (disp *dispatcher) init() {
+       disp.logger = ctxlog.FromContext(disp.Context)
+       disp.lsfcli.logger = disp.logger
+       disp.lsfqueue = lsfqueue{
+               logger: disp.logger,
+               period: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
+               lsfcli: &disp.lsfcli,
+       }
+       disp.ArvClient.AuthToken = disp.AuthToken
+       disp.stop = make(chan struct{}, 1)
+       disp.stopped = make(chan struct{})
+
+       arv, err := arvadosclient.New(disp.ArvClient)
+       if err != nil {
+               disp.logger.Fatalf("Error making Arvados client: %v", err)
+       }
+       arv.Retries = 25
+       arv.ApiToken = disp.AuthToken
+       disp.arvDispatcher = &dispatch.Dispatcher{
+               Arv:            arv,
+               Logger:         disp.logger,
+               BatchSize:      disp.Cluster.API.MaxItemsPerResponse,
+               RunContainer:   disp.runContainer,
+               PollPeriod:     time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
+               MinRetryPeriod: time.Duration(disp.Cluster.Containers.MinRetryPeriod),
+       }
+
+       if disp.Cluster.ManagementToken == "" {
+               disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+                       http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
+               })
+       } else {
+               mux := httprouter.New()
+               metricsH := promhttp.HandlerFor(disp.Registry, promhttp.HandlerOpts{
+                       ErrorLog: disp.logger,
+               })
+               mux.Handler("GET", "/metrics", metricsH)
+               mux.Handler("GET", "/metrics.json", metricsH)
+               mux.Handler("GET", "/_health/:check", &health.Handler{
+                       Token:  disp.Cluster.ManagementToken,
+                       Prefix: "/_health/",
+                       Routes: health.Routes{"ping": disp.CheckHealth},
+               })
+               disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
+       }
+}
+
+func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+       ctx, cancel := context.WithCancel(disp.Context)
+       defer cancel()
+
+       if ctr.State != dispatch.Locked {
+               // already started by prior invocation
+       } else if _, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+               disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
+               cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
+               cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
+               cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...)
+               if err := disp.submit(ctr, cmd); err != nil {
+                       var text string
+                       switch err := err.(type) {
+                       case dispatchcloud.ConstraintsNotSatisfiableError:
+                               var logBuf bytes.Buffer
+                               fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
+                               if len(err.AvailableTypes) == 0 {
+                                       fmt.Fprint(&logBuf, "No instance types are configured.\n")
+                               } else {
+                                       fmt.Fprint(&logBuf, "Available instance types:\n")
+                                       for _, t := range err.AvailableTypes {
+                                               fmt.Fprintf(&logBuf,
+                                                       "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
+                                                       t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
+                                               )
+                                       }
+                               }
+                               text = logBuf.String()
+                               disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
+                       default:
+                               text = fmt.Sprintf("Error submitting container %s to LSF: %s", ctr.UUID, err)
+                       }
+                       disp.logger.Print(text)
+
+                       lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+                               "object_uuid": ctr.UUID,
+                               "event_type":  "dispatch",
+                               "properties":  map[string]string{"text": text}}}
+                       disp.arvDispatcher.Arv.Create("logs", lr, nil)
+
+                       disp.arvDispatcher.Unlock(ctr.UUID)
+                       return
+               }
+       }
+
+       disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
+       defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
+
+       // If the container disappears from the lsf queue, there is
+       // no point in waiting for further dispatch updates: just
+       // clean up and return.
+       go func(uuid string) {
+               for ctx.Err() == nil {
+                       if _, ok := disp.lsfqueue.JobID(uuid); !ok {
+                               disp.logger.Printf("container %s job disappeared from LSF queue", uuid)
+                               cancel()
+                               return
+                       }
+               }
+       }(ctr.UUID)
+
+       for done := false; !done; {
+               select {
+               case <-ctx.Done():
+                       // Disappeared from lsf queue
+                       if err := disp.arvDispatcher.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
+                               disp.logger.Printf("error getting final container state for %s: %s", ctr.UUID, err)
+                       }
+                       switch ctr.State {
+                       case dispatch.Running:
+                               disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
+                       case dispatch.Locked:
+                               disp.arvDispatcher.Unlock(ctr.UUID)
+                       }
+                       return
+               case updated, ok := <-status:
+                       if !ok {
+                               done = true
+                               break
+                       }
+                       if updated.State != ctr.State {
+                               disp.logger.Infof("container %s changed state from %s to %s", ctr.UUID, ctr.State, updated.State)
+                       }
+                       ctr = updated
+                       if ctr.Priority == 0 {
+                               disp.logger.Printf("container %s has state %s, priority %d: cancel lsf job", ctr.UUID, ctr.State, ctr.Priority)
+                               disp.bkill(ctr)
+                       } else {
+                               disp.lsfqueue.SetPriority(ctr.UUID, int64(ctr.Priority))
+                       }
+               }
+       }
+       disp.logger.Printf("container %s is done", ctr.UUID)
+
+       // Try "bkill" every few seconds until the LSF job disappears
+       // from the queue.
+       ticker := time.NewTicker(5 * time.Second)
+       defer ticker.Stop()
+       for jobid, ok := disp.lsfqueue.JobID(ctr.UUID); ok; _, ok = disp.lsfqueue.JobID(ctr.UUID) {
+               err := disp.lsfcli.Bkill(jobid)
+               if err != nil {
+                       disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+               }
+               <-ticker.C
+       }
+}
+
+func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
+       // Start with an empty slice here to ensure append() doesn't
+       // modify crunchRunCommand's underlying array
+       var crArgs []string
+       crArgs = append(crArgs, crunchRunCommand...)
+       crArgs = append(crArgs, container.UUID)
+       crScript := execScript(crArgs)
+
+       bsubArgs, err := disp.bsubArgs(container)
+       if err != nil {
+               return err
+       }
+       return disp.lsfcli.Bsub(crScript, bsubArgs, disp.ArvClient)
+}
+
+func (disp *dispatcher) bkill(ctr arvados.Container) {
+       if jobid, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+               disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID)
+       } else if err := disp.lsfcli.Bkill(jobid); err != nil {
+               disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+       }
+}
+
+func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) {
+       args := []string{"bsub"}
+       args = append(args, disp.Cluster.Containers.LSF.BsubArgumentsList...)
+       args = append(args, "-J", container.UUID)
+       args = append(args, disp.bsubConstraintArgs(container)...)
+       if u := disp.Cluster.Containers.LSF.BsubSudoUser; u != "" {
+               args = append([]string{"sudo", "-E", "-u", u}, args...)
+       }
+       return args, nil
+}
+
+func (disp *dispatcher) bsubConstraintArgs(container arvados.Container) []string {
+       // TODO: propagate container.SchedulingParameters.Partitions
+       tmp := int64(math.Ceil(float64(dispatchcloud.EstimateScratchSpace(&container)) / 1048576))
+       vcpus := container.RuntimeConstraints.VCPUs
+       mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+
+               container.RuntimeConstraints.KeepCacheRAM+
+               int64(disp.Cluster.Containers.ReserveExtraRAM)) / 1048576))
+       return []string{
+               "-R", fmt.Sprintf("rusage[mem=%dMB:tmp=%dMB] affinity[core(%d)]", mem, tmp, vcpus),
+       }
+}
+
+func (disp *dispatcher) checkLsfQueueForOrphans() {
+       disp.logger.Warn("FIXME: checkLsfQueueForOrphans")
+}
+
+func execScript(args []string) []byte {
+       s := "#!/bin/sh\nexec"
+       for _, w := range args {
+               s += ` '`
+               s += strings.Replace(w, `'`, `'\''`, -1)
+               s += `'`
+       }
+       return []byte(s + "\n")
+}
diff --git a/lib/lsf/dispatch_test.go b/lib/lsf/dispatch_test.go
new file mode 100644 (file)
index 0000000..25db95e
--- /dev/null
@@ -0,0 +1,145 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+       "context"
+       "fmt"
+       "math/rand"
+       "os/exec"
+       "strconv"
+       "sync"
+       "testing"
+       "time"
+
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadostest"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/prometheus/client_golang/prometheus"
+       "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
+
+var _ = check.Suite(&suite{})
+
+type suite struct {
+       disp *dispatcher
+}
+
+func (s *suite) TearDownTest(c *check.C) {
+       arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
+}
+
+func (s *suite) SetUpTest(c *check.C) {
+       cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+       c.Assert(err, check.IsNil)
+       cluster, err := cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+       cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second)
+       s.disp = newHandler(context.Background(), cluster, arvadostest.Dispatch1Token, prometheus.NewRegistry()).(*dispatcher)
+       s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
+               return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
+       }
+}
+
+type lsfstub struct {
+       errorRate float64
+}
+
+func (stub lsfstub) stubCommand(c *check.C) func(prog string, args ...string) *exec.Cmd {
+       mtx := sync.Mutex{}
+       nextjobid := 100
+       fakejobq := map[int]string{}
+       return func(prog string, args ...string) *exec.Cmd {
+               c.Logf("stubCommand: %q %q", prog, args)
+               if rand.Float64() < stub.errorRate {
+                       return exec.Command("bash", "-c", "echo >&2 'stub random failure' && false")
+               }
+               switch prog {
+               case "bsub":
+                       c.Assert(args, check.HasLen, 4)
+                       c.Check(args[0], check.Equals, "-J")
+                       switch args[1] {
+                       case arvadostest.LockedContainerUUID:
+                               c.Check(args, check.DeepEquals, []string{"-J", arvadostest.LockedContainerUUID, "-R", "rusage[mem=11701MB:tmp=0MB] affinity[core(4)]"})
+                               mtx.Lock()
+                               fakejobq[nextjobid] = args[1]
+                               nextjobid++
+                               mtx.Unlock()
+                       case arvadostest.QueuedContainerUUID:
+                               c.Check(args, check.DeepEquals, []string{"-J", arvadostest.QueuedContainerUUID, "-R", "rusage[mem=11701MB:tmp=45777MB] affinity[core(4)]"})
+                               mtx.Lock()
+                               fakejobq[nextjobid] = args[1]
+                               nextjobid++
+                               mtx.Unlock()
+                       default:
+                               c.Errorf("unexpected uuid passed to bsub: args %q", args)
+                               return exec.Command("false")
+                       }
+                       return exec.Command("echo", "submitted job")
+               case "bjobs":
+                       c.Check(args, check.DeepEquals, []string{"-noheader", "-o", "jobid stat job_name:30"})
+                       out := ""
+                       for jobid, uuid := range fakejobq {
+                               out += fmt.Sprintf(`%d %s %s\n`, jobid, "RUN", uuid)
+                       }
+                       c.Logf("bjobs out: %q", out)
+                       return exec.Command("printf", out)
+               case "bkill":
+                       killid, _ := strconv.Atoi(args[0])
+                       if uuid, ok := fakejobq[killid]; !ok {
+                               return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: No matching job found\n'", killid))
+                       } else if uuid == "" {
+                               return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: Job has already finished\n'", killid))
+                       } else {
+                               go func() {
+                                       time.Sleep(time.Millisecond)
+                                       mtx.Lock()
+                                       delete(fakejobq, killid)
+                                       mtx.Unlock()
+                               }()
+                               return exec.Command("bash", "-c", fmt.Sprintf("printf 'Job <%d> is being terminated\n'", killid))
+                       }
+               default:
+                       return exec.Command("bash", "-c", fmt.Sprintf("echo >&2 'stub: command not found: %+q'", prog))
+               }
+       }
+}
+
+func (s *suite) TestSubmit(c *check.C) {
+       s.disp.lsfcli.stubCommand = lsfstub{errorRate: 0.1}.stubCommand(c)
+       s.disp.Start()
+       deadline := time.Now().Add(20 * time.Second)
+       for range time.NewTicker(time.Second).C {
+               if time.Now().After(deadline) {
+                       c.Error("timed out")
+                       break
+               }
+               // "queuedcontainer" should be running
+               if _, ok := s.disp.lsfqueue.JobID(arvadostest.QueuedContainerUUID); !ok {
+                       continue
+               }
+               // "lockedcontainer" should be cancelled because it
+               // has priority 0 (no matching container requests)
+               if _, ok := s.disp.lsfqueue.JobID(arvadostest.LockedContainerUUID); ok {
+                       continue
+               }
+               var ctr arvados.Container
+               if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
+                       c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
+                       continue
+               }
+               if ctr.State != arvados.ContainerStateQueued {
+                       c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State)
+                       continue
+               }
+               c.Log("reached desired state")
+               break
+       }
+}
diff --git a/lib/lsf/lsfcli.go b/lib/lsf/lsfcli.go
new file mode 100644 (file)
index 0000000..9d712ee
--- /dev/null
@@ -0,0 +1,92 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+       "bytes"
+       "fmt"
+       "os"
+       "os/exec"
+       "strings"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/sirupsen/logrus"
+)
+
+type bjobsEntry struct {
+       id   int
+       name string
+       stat string
+}
+
+type lsfcli struct {
+       logger logrus.FieldLogger
+       // (for testing) if non-nil, call stubCommand() instead of
+       // exec.Command() when running lsf command line programs.
+       stubCommand func(string, ...string) *exec.Cmd
+}
+
+func (cli lsfcli) command(prog string, args ...string) *exec.Cmd {
+       if f := cli.stubCommand; f != nil {
+               return f(prog, args...)
+       } else {
+               return exec.Command(prog, args...)
+       }
+}
+
+func (cli lsfcli) Bsub(script []byte, args []string, arv *arvados.Client) error {
+       cli.logger.Infof("bsub command %q script %q", args, script)
+       cmd := cli.command(args[0], args[1:]...)
+       cmd.Env = append([]string(nil), os.Environ()...)
+       cmd.Env = append(cmd.Env, "ARVADOS_API_HOST="+arv.APIHost)
+       cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arv.AuthToken)
+       if arv.Insecure {
+               cmd.Env = append(cmd.Env, "ARVADOS_API_HOST_INSECURE=1")
+       }
+       cmd.Stdin = bytes.NewReader(script)
+       out, err := cmd.Output()
+       cli.logger.WithField("stdout", string(out)).Infof("bsub finished")
+       return errWithStderr(err)
+}
+
+func (cli lsfcli) Bjobs() ([]bjobsEntry, error) {
+       cli.logger.Debugf("Bjobs()")
+       cmd := cli.command("bjobs", "-u", "all", "-noheader", "-o", "jobid stat job_name:30")
+       buf, err := cmd.Output()
+       if err != nil {
+               return nil, errWithStderr(err)
+       }
+       var bjobs []bjobsEntry
+       for _, line := range strings.Split(string(buf), "\n") {
+               if line == "" {
+                       continue
+               }
+               var ent bjobsEntry
+               if _, err := fmt.Sscan(line, &ent.id, &ent.stat, &ent.name); err != nil {
+                       cli.logger.Warnf("ignoring unparsed line in bjobs output: %q", line)
+                       continue
+               }
+               bjobs = append(bjobs, ent)
+       }
+       return bjobs, nil
+}
+
+func (cli lsfcli) Bkill(id int) error {
+       cli.logger.Infof("Bkill(%d)", id)
+       cmd := cli.command("bkill", fmt.Sprintf("%d", id))
+       buf, err := cmd.CombinedOutput()
+       if err == nil || strings.Index(string(buf), "already finished") >= 0 {
+               return nil
+       } else {
+               return fmt.Errorf("%s (%q)", err, buf)
+       }
+}
+
+func errWithStderr(err error) error {
+       if err, ok := err.(*exec.ExitError); ok {
+               return fmt.Errorf("%s (%q)", err, err.Stderr)
+       }
+       return err
+}
diff --git a/lib/lsf/lsfqueue.go b/lib/lsf/lsfqueue.go
new file mode 100644 (file)
index 0000000..65f3869
--- /dev/null
@@ -0,0 +1,81 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+       "sync"
+       "time"
+
+       "github.com/sirupsen/logrus"
+)
+
+type lsfqueue struct {
+       logger logrus.FieldLogger
+       period time.Duration
+       lsfcli *lsfcli
+
+       initOnce   sync.Once
+       mutex      sync.Mutex
+       needUpdate chan bool
+       updated    *sync.Cond
+       latest     map[string]bjobsEntry
+}
+
+// JobID waits for the next queue update (so even a job that was only
+// submitted a nanosecond ago will show up) and then returns the LSF
+// job ID corresponding to the given container UUID.
+func (q *lsfqueue) JobID(uuid string) (int, bool) {
+       q.initOnce.Do(q.init)
+       q.mutex.Lock()
+       defer q.mutex.Unlock()
+       select {
+       case q.needUpdate <- true:
+       default:
+               // an update is already pending
+       }
+       q.updated.Wait()
+       ent, ok := q.latest[uuid]
+       q.logger.Debugf("JobID(%q) == %d", uuid, ent.id)
+       return ent.id, ok
+}
+
+func (q *lsfqueue) SetPriority(uuid string, priority int64) {
+       q.initOnce.Do(q.init)
+       q.logger.Debug("SetPriority is not implemented")
+}
+
+func (q *lsfqueue) init() {
+       q.updated = sync.NewCond(&q.mutex)
+       q.needUpdate = make(chan bool, 1)
+       ticker := time.NewTicker(time.Second)
+       go func() {
+               for range q.needUpdate {
+                       q.logger.Debug("running bjobs")
+                       ents, err := q.lsfcli.Bjobs()
+                       if err != nil {
+                               q.logger.Warnf("bjobs: %s", err)
+                               // Retry on the next tick, don't wait
+                               // for another new call to JobID().
+                               select {
+                               case q.needUpdate <- true:
+                               default:
+                               }
+                               <-ticker.C
+                               continue
+                       }
+                       next := make(map[string]bjobsEntry, len(ents))
+                       for _, ent := range ents {
+                               next[ent.name] = ent
+                       }
+                       q.mutex.Lock()
+                       q.latest = next
+                       q.updated.Broadcast()
+                       q.logger.Debugf("waking up waiters with latest %v", q.latest)
+                       q.mutex.Unlock()
+                       // Limit "bjobs" invocations to 1 per second
+                       <-ticker.C
+               }
+       }()
+}
index 6e59828a3cbf5656fef1e6c7fc790ca9d3b6268f..844991f41e03b219c5b1d1c0c537d5125d9b2413 100644 (file)
@@ -329,6 +329,7 @@ type Services struct {
        Composer       Service
        Controller     Service
        DispatchCloud  Service
+       DispatchLSF    Service
        GitHTTP        Service
        GitSSH         Service
        Health         Service
@@ -461,6 +462,10 @@ type ContainersConfig struct {
                        AssignNodeHostname     string
                }
        }
+       LSF struct {
+               BsubSudoUser      string
+               BsubArgumentsList []string
+       }
 }
 
 type CloudVMsConfig struct {
@@ -597,6 +602,7 @@ const (
        ServiceNameRailsAPI      ServiceName = "arvados-api-server"
        ServiceNameController    ServiceName = "arvados-controller"
        ServiceNameDispatchCloud ServiceName = "arvados-dispatch-cloud"
+       ServiceNameDispatchLSF   ServiceName = "arvados-dispatch-lsf"
        ServiceNameHealth        ServiceName = "arvados-health"
        ServiceNameWorkbench1    ServiceName = "arvados-workbench1"
        ServiceNameWorkbench2    ServiceName = "arvados-workbench2"
@@ -614,6 +620,7 @@ func (svcs Services) Map() map[ServiceName]Service {
                ServiceNameRailsAPI:      svcs.RailsAPI,
                ServiceNameController:    svcs.Controller,
                ServiceNameDispatchCloud: svcs.DispatchCloud,
+               ServiceNameDispatchLSF:   svcs.DispatchLSF,
                ServiceNameHealth:        svcs.Health,
                ServiceNameWorkbench1:    svcs.Workbench1,
                ServiceNameWorkbench2:    svcs.Workbench2,
index 4b7ad6dd59fa426e8b1e71c546ee43a851d99c54..3de4225d568a95324e5f81ecc34e8b2486433f8e 100644 (file)
@@ -45,6 +45,8 @@ const (
        QueuedContainerRequestUUID = "zzzzz-xvhdp-cr4queuedcontnr"
        QueuedContainerUUID        = "zzzzz-dz642-queuedcontainer"
 
+       LockedContainerUUID = "zzzzz-dz642-lockedcontainer"
+
        RunningContainerUUID = "zzzzz-dz642-runningcontainr"
 
        CompletedContainerUUID         = "zzzzz-dz642-compltcontainer"