17756: Merge branch 'main'
authorTom Clegg <tom@curii.com>
Tue, 6 Jul 2021 13:28:47 +0000 (09:28 -0400)
committerTom Clegg <tom@curii.com>
Tue, 6 Jul 2021 13:28:47 +0000 (09:28 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

17 files changed:
build/run-build-packages-one-target.sh
build/run-build-packages.sh
cmd/arvados-server/arvados-dispatch-lsf.service [new file with mode: 0644]
cmd/arvados-server/cmd.go
doc/_config.yml
doc/install/crunch2-lsf/install-dispatch.html.textile.liquid [new file with mode: 0644]
doc/install/crunch2-slurm/install-dispatch.html.textile.liquid
lib/config/config.default.yml
lib/config/export.go
lib/config/generated_config.go
lib/lsf/dispatch.go [new file with mode: 0644]
lib/lsf/dispatch_test.go [new file with mode: 0644]
lib/lsf/lsfcli.go [new file with mode: 0644]
lib/lsf/lsfqueue.go [new file with mode: 0644]
sdk/go/arvados/config.go
sdk/go/arvadostest/fixtures.go
sdk/go/health/aggregator_test.go

index 8365fecadbd29705924623374ba21763f934185e..056a1fbb1216a2742a5d90e2165db0798a1d2d1e 100755 (executable)
@@ -194,6 +194,7 @@ if test -z "$packages" ; then
         arvados-client
         arvados-controller
         arvados-dispatch-cloud
+        arvados-dispatch-lsf
         arvados-docker-cleaner
         arvados-git-httpd
         arvados-health
index e231a83df8d22e291a7d048bc8fa3d00c136a109..ba68119865b6a3dbcf37b9b44323b0d19f644bcf 100755 (executable)
@@ -277,6 +277,8 @@ package_go_binary cmd/arvados-server arvados-controller \
     "Arvados cluster controller daemon"
 package_go_binary cmd/arvados-server arvados-dispatch-cloud \
     "Arvados cluster cloud dispatch"
+package_go_binary cmd/arvados-server arvados-dispatch-lsf \
+    "Dispatch Arvados containers to an LSF cluster"
 package_go_binary services/arv-git-httpd arvados-git-httpd \
     "Provide authenticated http access to Arvados-hosted git repositories"
 package_go_binary services/crunch-dispatch-local crunch-dispatch-local \
diff --git a/cmd/arvados-server/arvados-dispatch-lsf.service b/cmd/arvados-server/arvados-dispatch-lsf.service
new file mode 100644 (file)
index 0000000..f9e73a2
--- /dev/null
@@ -0,0 +1,30 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+[Unit]
+Description=arvados-dispatch-lsf
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/config.yml
+
+# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
+StartLimitInterval=0
+
+# systemd>=230 (debian:9) obeys StartLimitIntervalSec in the [Unit] section
+StartLimitIntervalSec=0
+
+[Service]
+Type=notify
+EnvironmentFile=-/etc/arvados/environment
+ExecStart=/usr/bin/arvados-dispatch-lsf
+# Set a reasonable default for the open file limit
+LimitNOFILE=65536
+Restart=always
+RestartSec=1
+
+# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
+StartLimitInterval=0
+
+[Install]
+WantedBy=multi-user.target
index d0aa9da94df537bf80a3ed232c2a0ae2c3a0e1d6..4b94a7813869915c38c14ec7927a8a2662e30475 100644 (file)
@@ -15,6 +15,7 @@ import (
        "git.arvados.org/arvados.git/lib/crunchrun"
        "git.arvados.org/arvados.git/lib/dispatchcloud"
        "git.arvados.org/arvados.git/lib/install"
+       "git.arvados.org/arvados.git/lib/lsf"
        "git.arvados.org/arvados.git/lib/recovercollection"
        "git.arvados.org/arvados.git/services/ws"
 )
@@ -33,6 +34,7 @@ var (
                "controller":         controller.Command,
                "crunch-run":         crunchrun.Command,
                "dispatch-cloud":     dispatchcloud.Command,
+               "dispatch-lsf":       lsf.DispatchCommand,
                "install":            install.Command,
                "init":               install.InitCommand,
                "recover-collection": recovercollection.Command,
index 06d15952131f118d1d7b79c2e80a97032ee5d3d5..0b0fbc9300749a968d5bb1570aada605155f4eb1 100644 (file)
@@ -252,6 +252,8 @@ navbar:
       - install/crunch2-slurm/configure-slurm.html.textile.liquid
       - install/crunch2-slurm/install-compute-node.html.textile.liquid
       - install/crunch2-slurm/install-test.html.textile.liquid
+    - Containers API (lsf):
+      - install/crunch2-lsf/install-dispatch.html.textile.liquid
     - Additional configuration:
       - install/container-shell-access.html.textile.liquid
     - External dependencies:
diff --git a/doc/install/crunch2-lsf/install-dispatch.html.textile.liquid b/doc/install/crunch2-lsf/install-dispatch.html.textile.liquid
new file mode 100644 (file)
index 0000000..0dd6a8b
--- /dev/null
@@ -0,0 +1,114 @@
+---
+layout: default
+navsection: installguide
+title: Install the LSF dispatcher
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+{% include 'notebox_begin_warning' %}
+arvados-dispatch-lsf is only relevant for on premises clusters that will spool jobs to LSF. Skip this section if you are installing a cloud cluster.
+{% include 'notebox_end' %}
+
+Containers can be dispatched to an LSF cluster.  The dispatcher sends work to the cluster using LSF's @bsub@ command, so it works in a variety of LSF configurations.
+
+*LSF support is currently considered experimental.*
+
+Limitations include:
+* Arvados container priority is not propagated to LSF job priority. This can cause inefficient use of compute resources, and even deadlock if there are fewer compute nodes than concurrent Arvados workflows.
+* Combining LSF with docker may not work, depending on LSF configuration and user/group IDs (if LSF only sets up the configured user's primary group ID when executing the crunch-run process on a compute node, it may not have permission to connect to the docker daemon).
+
+In order to run containers, you must choose a user that has permission to set up FUSE mounts and run Singularity/Docker containers on each compute node.  This install guide refers to this user as the @crunch@ user.  We recommend you create this user on each compute node with the same UID and GID, and add it to the @fuse@ and @docker@ system groups to grant it the necessary permissions.  However, you can run the dispatcher under any account with sufficient permissions across the cluster.
+
+Set up all of your compute nodes "as you would for a SLURM cluster":../crunch2-slurm/install-compute-node.html.
+
+
+h2(#update-config). Update config.yml
+
+Arvados-dispatch-lsf reads the common configuration file at @/etc/arvados/config.yml@.
+
+Review the following configuration parameters and adjust as needed.
+
+
+h3(#BsubSudoUser). Containers.LSF.BsubSudoUser
+
+arvados-dispatch-lsf uses @sudo@ to execute @bsub@, for example @sudo -E -u crunch bsub [...]@. To use a user account other than @crunch@, configure @BsubSudoUser@:
+
+<notextile>
+<pre>    Containers:
+      LSF:
+        <code class="userinput">BsubSudoUser: <b>lsfuser</b>
+</code></pre>
+</notextile>
+
+Alternatively, you can arrange for the arvados-dispatch-lsf process to run as an unprivileged user that has a corresponding account on all compute nodes, and disable the use of @sudo@ by specifying an empty string:
+
+<notextile>
+<pre>    Containers:
+      LSF:
+        # Don't use sudo
+        <code class="userinput">BsubSudoUser: <b>""</b>
+</code></pre>
+</notextile>
+
+
+h3(#SbatchArguments). Containers.LSF.BsubArgumentsList
+
+When arvados-dispatch-lsf invokes @bsub@, you can add arguments to the command by specifying @BsubArgumentsList@.  You can use this to send the jobs to specific cluster partitions or add resource requests.  Set @BsubArgumentsList@ to an array of strings.  For example:
+
+<notextile>
+<pre>    Containers:
+      LSF:
+        <code class="userinput">BsubArgumentsList:
+          - <b>"-C"</b>
+          - <b>"0"</b></code>
+</pre>
+</notextile>
+
+
+h3(#PollPeriod). Containers.PollInterval
+
+arvados-dispatch-lsf polls the API server periodically for new containers to run.  The @PollInterval@ option controls how often this poll happens.  Set this to a string of numbers suffixed with one of the time units @s@, @m@, or @h@.  For example:
+
+<notextile>
+<pre>    Containers:
+      <code class="userinput">PollInterval: <b>10s</b>
+</code></pre>
+</notextile>
+
+
+h3(#ReserveExtraRAM). Containers.ReserveExtraRAM: Extra RAM for jobs
+
+Extra RAM to reserve (in bytes) on each LSF job submitted by Arvados, which is added to the amount specified in the container's @runtime_constraints@.  If not provided, the default value is zero.
+
+Supports suffixes @KB@, @KiB@, @MB@, @MiB@, @GB@, @GiB@, @TB@, @TiB@, @PB@, @PiB@, @EB@, @EiB@ (where @KB@ is 10[^3^], @KiB@ is 2[^10^], @MB@ is 10[^6^], @MiB@ is 2[^20^] and so forth).
+
+<notextile>
+<pre>    Containers:
+      <code class="userinput">ReserveExtraRAM: <b>256MiB</b></code>
+</pre>
+</notextile>
+
+
+h3(#CrunchRunCommand-network). Containers.CrunchRunArgumentList: Using host networking for containers
+
+Older Linux kernels (prior to 3.18) have bugs in network namespace handling which can lead to compute node lockups.  This by is indicated by blocked kernel tasks in "Workqueue: netns cleanup_net".   If you are experiencing this problem, as a workaround you can disable use of network namespaces by Docker across the cluster.  Be aware this reduces container isolation, which may be a security risk.
+
+<notextile>
+<pre>    Containers:
+      <code class="userinput">CrunchRunArgumentsList:
+        - <b>"-container-enable-networking=always"</b>
+        - <b>"-container-network-mode=host"</b></code>
+</pre>
+</notextile>
+
+{% assign arvados_component = 'arvados-dispatch-lsf' %}
+
+{% include 'install_packages' %}
+
+{% include 'start_service' %}
+
+{% include 'restart_api' %}
index 3996cc7930a70a44ace17a8bd55cade99876bd7c..5b5b868e57611fe0262b0e16e708289f1a001f95 100644 (file)
@@ -44,7 +44,7 @@ crunch-dispatch-slurm polls the API server periodically for new containers to ru
 
 h3(#ReserveExtraRAM). Containers.ReserveExtraRAM: Extra RAM for jobs
 
-Extra RAM to reserve (in bytes) on each Slurm job submitted by Arvados, which is added to the amount specified in the container's @runtime_constraints@.  If not provided, the default value is zero.  Helpful when using @-cgroup-parent-subsystem@, where @crunch-run@ and @arv-mount@ share the control group memory limit with the user process.  In this situation, at least 256MiB is recommended to accomodate each container's @crunch-run@ and @arv-mount@ processes.
+Extra RAM to reserve (in bytes) on each Slurm job submitted by Arvados, which is added to the amount specified in the container's @runtime_constraints@.  If not provided, the default value is zero.  Helpful when using @-cgroup-parent-subsystem@, where @crunch-run@ and @arv-mount@ share the control group memory limit with the user process.  In this situation, at least 256MiB is recommended to accommodate each container's @crunch-run@ and @arv-mount@ processes.
 
 Supports suffixes @KB@, @KiB@, @MB@, @MiB@, @GB@, @GiB@, @TB@, @TiB@, @PB@, @PiB@, @EB@, @EiB@ (where @KB@ is 10[^3^], @KiB@ is 2[^10^], @MB@ is 10[^6^], @MiB@ is 2[^20^] and so forth).
 
index e28d5cbb7f0cd09b3ad559f6cab0c9a9967c10d5..975b4cd0f498d4c2cbec564923dd7d3c08364bcc 100644 (file)
@@ -52,6 +52,9 @@ Clusters:
       DispatchCloud:
         InternalURLs: {SAMPLE: {}}
         ExternalURL: "-"
+      DispatchLSF:
+        InternalURLs: {SAMPLE: {}}
+        ExternalURL: "-"
       Keepproxy:
         InternalURLs: {SAMPLE: {}}
         ExternalURL: ""
@@ -1012,6 +1015,15 @@ Clusters:
           # (See http://ruby-doc.org/core-2.2.2/Kernel.html#method-i-format for more.)
           AssignNodeHostname: "compute%<slot_number>d"
 
+      LSF:
+        # Additional arguments to bsub when submitting Arvados
+        # containers as LSF jobs.
+        BsubArgumentsList: []
+
+        # Use sudo to switch to this user account when submitting LSF
+        # jobs.
+        BsubSudoUser: "crunch"
+
       JobsAPI:
         # Enable the legacy 'jobs' API (crunch v1).  This value must be a string.
         #
index 8753b52f27e17dee80958fd32ab25e46cda2f9fb..7adb50ec374006063f23a1cd620ea46d8446728e 100644 (file)
@@ -120,6 +120,7 @@ var whitelist = map[string]bool{
        "Containers.JobsAPI.GitInternalDir":                   false,
        "Containers.Logging":                                  false,
        "Containers.LogReuseDecisions":                        false,
+       "Containers.LSF":                                      false,
        "Containers.MaxComputeVMs":                            false,
        "Containers.MaxDispatchAttempts":                      false,
        "Containers.MaxRetryAttempts":                         true,
index b15bf7eebc29facda6f1a0e2670c5482f83055cf..dec25055892dd7cf84d3896f76031d0fff0c5113 100644 (file)
@@ -58,6 +58,9 @@ Clusters:
       DispatchCloud:
         InternalURLs: {SAMPLE: {}}
         ExternalURL: "-"
+      DispatchLSF:
+        InternalURLs: {SAMPLE: {}}
+        ExternalURL: "-"
       Keepproxy:
         InternalURLs: {SAMPLE: {}}
         ExternalURL: ""
@@ -1018,6 +1021,15 @@ Clusters:
           # (See http://ruby-doc.org/core-2.2.2/Kernel.html#method-i-format for more.)
           AssignNodeHostname: "compute%<slot_number>d"
 
+      LSF:
+        # Additional arguments to bsub when submitting Arvados
+        # containers as LSF jobs.
+        BsubArgumentsList: []
+
+        # Use sudo to switch to this user account when submitting LSF
+        # jobs.
+        BsubSudoUser: "crunch"
+
       JobsAPI:
         # Enable the legacy 'jobs' API (crunch v1).  This value must be a string.
         #
diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go
new file mode 100644 (file)
index 0000000..b7032dc
--- /dev/null
@@ -0,0 +1,346 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+       "bytes"
+       "context"
+       "errors"
+       "fmt"
+       "math"
+       "net/http"
+       "regexp"
+       "strings"
+       "sync"
+       "time"
+
+       "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/dispatchcloud"
+       "git.arvados.org/arvados.git/lib/service"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/auth"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "git.arvados.org/arvados.git/sdk/go/dispatch"
+       "git.arvados.org/arvados.git/sdk/go/health"
+       "github.com/julienschmidt/httprouter"
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promhttp"
+       "github.com/sirupsen/logrus"
+)
+
+var DispatchCommand cmd.Handler = service.Command(arvados.ServiceNameDispatchLSF, newHandler)
+
+func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
+       ac, err := arvados.NewClientFromConfig(cluster)
+       if err != nil {
+               return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
+       }
+       d := &dispatcher{
+               Cluster:   cluster,
+               Context:   ctx,
+               ArvClient: ac,
+               AuthToken: token,
+               Registry:  reg,
+       }
+       go d.Start()
+       return d
+}
+
+type dispatcher struct {
+       Cluster   *arvados.Cluster
+       Context   context.Context
+       ArvClient *arvados.Client
+       AuthToken string
+       Registry  *prometheus.Registry
+
+       logger        logrus.FieldLogger
+       lsfcli        lsfcli
+       lsfqueue      lsfqueue
+       arvDispatcher *dispatch.Dispatcher
+       httpHandler   http.Handler
+
+       initOnce sync.Once
+       stop     chan struct{}
+       stopped  chan struct{}
+}
+
+// Start starts the dispatcher. Start can be called multiple times
+// with no ill effect.
+func (disp *dispatcher) Start() {
+       disp.initOnce.Do(func() {
+               disp.init()
+               go func() {
+                       disp.checkLsfQueueForOrphans()
+                       err := disp.arvDispatcher.Run(disp.Context)
+                       if err != nil {
+                               disp.logger.Error(err)
+                               disp.Close()
+                       }
+               }()
+       })
+}
+
+// ServeHTTP implements service.Handler.
+func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+       disp.Start()
+       disp.httpHandler.ServeHTTP(w, r)
+}
+
+// CheckHealth implements service.Handler.
+func (disp *dispatcher) CheckHealth() error {
+       disp.Start()
+       select {
+       case <-disp.stopped:
+               return errors.New("stopped")
+       default:
+               return nil
+       }
+}
+
+// Done implements service.Handler.
+func (disp *dispatcher) Done() <-chan struct{} {
+       return disp.stopped
+}
+
+// Stop dispatching containers and release resources. Used by tests.
+func (disp *dispatcher) Close() {
+       disp.Start()
+       select {
+       case disp.stop <- struct{}{}:
+       default:
+       }
+       <-disp.stopped
+}
+
+func (disp *dispatcher) init() {
+       disp.logger = ctxlog.FromContext(disp.Context)
+       disp.lsfcli.logger = disp.logger
+       disp.lsfqueue = lsfqueue{
+               logger: disp.logger,
+               period: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
+               lsfcli: &disp.lsfcli,
+       }
+       disp.ArvClient.AuthToken = disp.AuthToken
+       disp.stop = make(chan struct{}, 1)
+       disp.stopped = make(chan struct{})
+
+       arv, err := arvadosclient.New(disp.ArvClient)
+       if err != nil {
+               disp.logger.Fatalf("Error making Arvados client: %v", err)
+       }
+       arv.Retries = 25
+       arv.ApiToken = disp.AuthToken
+       disp.arvDispatcher = &dispatch.Dispatcher{
+               Arv:            arv,
+               Logger:         disp.logger,
+               BatchSize:      disp.Cluster.API.MaxItemsPerResponse,
+               RunContainer:   disp.runContainer,
+               PollPeriod:     time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
+               MinRetryPeriod: time.Duration(disp.Cluster.Containers.MinRetryPeriod),
+       }
+
+       if disp.Cluster.ManagementToken == "" {
+               disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+                       http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
+               })
+       } else {
+               mux := httprouter.New()
+               metricsH := promhttp.HandlerFor(disp.Registry, promhttp.HandlerOpts{
+                       ErrorLog: disp.logger,
+               })
+               mux.Handler("GET", "/metrics", metricsH)
+               mux.Handler("GET", "/metrics.json", metricsH)
+               mux.Handler("GET", "/_health/:check", &health.Handler{
+                       Token:  disp.Cluster.ManagementToken,
+                       Prefix: "/_health/",
+                       Routes: health.Routes{"ping": disp.CheckHealth},
+               })
+               disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
+       }
+}
+
+func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+       ctx, cancel := context.WithCancel(disp.Context)
+       defer cancel()
+
+       if ctr.State != dispatch.Locked {
+               // already started by prior invocation
+       } else if _, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+               disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
+               cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
+               cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
+               cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...)
+               if err := disp.submit(ctr, cmd); err != nil {
+                       var text string
+                       switch err := err.(type) {
+                       case dispatchcloud.ConstraintsNotSatisfiableError:
+                               var logBuf bytes.Buffer
+                               fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
+                               if len(err.AvailableTypes) == 0 {
+                                       fmt.Fprint(&logBuf, "No instance types are configured.\n")
+                               } else {
+                                       fmt.Fprint(&logBuf, "Available instance types:\n")
+                                       for _, t := range err.AvailableTypes {
+                                               fmt.Fprintf(&logBuf,
+                                                       "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
+                                                       t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
+                                               )
+                                       }
+                               }
+                               text = logBuf.String()
+                               disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
+                       default:
+                               text = fmt.Sprintf("Error submitting container %s to LSF: %s", ctr.UUID, err)
+                       }
+                       disp.logger.Print(text)
+
+                       lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+                               "object_uuid": ctr.UUID,
+                               "event_type":  "dispatch",
+                               "properties":  map[string]string{"text": text}}}
+                       disp.arvDispatcher.Arv.Create("logs", lr, nil)
+
+                       disp.arvDispatcher.Unlock(ctr.UUID)
+                       return
+               }
+       }
+
+       disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
+       defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
+
+       // If the container disappears from the lsf queue, there is
+       // no point in waiting for further dispatch updates: just
+       // clean up and return.
+       go func(uuid string) {
+               for ctx.Err() == nil {
+                       if _, ok := disp.lsfqueue.JobID(uuid); !ok {
+                               disp.logger.Printf("container %s job disappeared from LSF queue", uuid)
+                               cancel()
+                               return
+                       }
+               }
+       }(ctr.UUID)
+
+       for done := false; !done; {
+               select {
+               case <-ctx.Done():
+                       // Disappeared from lsf queue
+                       if err := disp.arvDispatcher.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
+                               disp.logger.Printf("error getting final container state for %s: %s", ctr.UUID, err)
+                       }
+                       switch ctr.State {
+                       case dispatch.Running:
+                               disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
+                       case dispatch.Locked:
+                               disp.arvDispatcher.Unlock(ctr.UUID)
+                       }
+                       return
+               case updated, ok := <-status:
+                       if !ok {
+                               done = true
+                               break
+                       }
+                       if updated.State != ctr.State {
+                               disp.logger.Infof("container %s changed state from %s to %s", ctr.UUID, ctr.State, updated.State)
+                       }
+                       ctr = updated
+                       if ctr.Priority == 0 {
+                               disp.logger.Printf("container %s has state %s, priority %d: cancel lsf job", ctr.UUID, ctr.State, ctr.Priority)
+                               disp.bkill(ctr)
+                       } else {
+                               disp.lsfqueue.SetPriority(ctr.UUID, int64(ctr.Priority))
+                       }
+               }
+       }
+       disp.logger.Printf("container %s is done", ctr.UUID)
+
+       // Try "bkill" every few seconds until the LSF job disappears
+       // from the queue.
+       ticker := time.NewTicker(5 * time.Second)
+       defer ticker.Stop()
+       for jobid, ok := disp.lsfqueue.JobID(ctr.UUID); ok; _, ok = disp.lsfqueue.JobID(ctr.UUID) {
+               err := disp.lsfcli.Bkill(jobid)
+               if err != nil {
+                       disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+               }
+               <-ticker.C
+       }
+}
+
+func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
+       // Start with an empty slice here to ensure append() doesn't
+       // modify crunchRunCommand's underlying array
+       var crArgs []string
+       crArgs = append(crArgs, crunchRunCommand...)
+       crArgs = append(crArgs, container.UUID)
+       crScript := execScript(crArgs)
+
+       bsubArgs, err := disp.bsubArgs(container)
+       if err != nil {
+               return err
+       }
+       return disp.lsfcli.Bsub(crScript, bsubArgs, disp.ArvClient)
+}
+
+func (disp *dispatcher) bkill(ctr arvados.Container) {
+       if jobid, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+               disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID)
+       } else if err := disp.lsfcli.Bkill(jobid); err != nil {
+               disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+       }
+}
+
+func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) {
+       args := []string{"bsub"}
+       args = append(args, disp.Cluster.Containers.LSF.BsubArgumentsList...)
+       args = append(args, "-J", container.UUID)
+       args = append(args, disp.bsubConstraintArgs(container)...)
+       if u := disp.Cluster.Containers.LSF.BsubSudoUser; u != "" {
+               args = append([]string{"sudo", "-E", "-u", u}, args...)
+       }
+       return args, nil
+}
+
+func (disp *dispatcher) bsubConstraintArgs(container arvados.Container) []string {
+       // TODO: propagate container.SchedulingParameters.Partitions
+       tmp := int64(math.Ceil(float64(dispatchcloud.EstimateScratchSpace(&container)) / 1048576))
+       vcpus := container.RuntimeConstraints.VCPUs
+       mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+
+               container.RuntimeConstraints.KeepCacheRAM+
+               int64(disp.Cluster.Containers.ReserveExtraRAM)) / 1048576))
+       return []string{
+               "-R", fmt.Sprintf("rusage[mem=%dMB:tmp=%dMB] affinity[core(%d)]", mem, tmp, vcpus),
+       }
+}
+
+// Check the next bjobs report, and invoke TrackContainer for all the
+// containers in the report. This gives us a chance to cancel existing
+// Arvados LSF jobs (started by a previous dispatch process) that
+// never released their LSF job allocations even though their
+// container states are Cancelled or Complete. See
+// https://dev.arvados.org/issues/10979
+func (disp *dispatcher) checkLsfQueueForOrphans() {
+       containerUuidPattern := regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
+       for _, uuid := range disp.lsfqueue.All() {
+               if !containerUuidPattern.MatchString(uuid) || !strings.HasPrefix(uuid, disp.Cluster.ClusterID) {
+                       continue
+               }
+               err := disp.arvDispatcher.TrackContainer(uuid)
+               if err != nil {
+                       disp.logger.Warnf("checkLsfQueueForOrphans: TrackContainer(%s): %s", uuid, err)
+               }
+       }
+}
+
+func execScript(args []string) []byte {
+       s := "#!/bin/sh\nexec"
+       for _, w := range args {
+               s += ` '`
+               s += strings.Replace(w, `'`, `'\''`, -1)
+               s += `'`
+       }
+       return []byte(s + "\n")
+}
diff --git a/lib/lsf/dispatch_test.go b/lib/lsf/dispatch_test.go
new file mode 100644 (file)
index 0000000..7cf6df6
--- /dev/null
@@ -0,0 +1,156 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+       "context"
+       "fmt"
+       "math/rand"
+       "os/exec"
+       "strconv"
+       "sync"
+       "testing"
+       "time"
+
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadostest"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/prometheus/client_golang/prometheus"
+       "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
+
+var _ = check.Suite(&suite{})
+
+type suite struct {
+       disp *dispatcher
+}
+
+func (s *suite) TearDownTest(c *check.C) {
+       arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
+}
+
+func (s *suite) SetUpTest(c *check.C) {
+       cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+       c.Assert(err, check.IsNil)
+       cluster, err := cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+       cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second)
+       s.disp = newHandler(context.Background(), cluster, arvadostest.Dispatch1Token, prometheus.NewRegistry()).(*dispatcher)
+       s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
+               return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
+       }
+}
+
+type lsfstub struct {
+       sudoUser  string
+       errorRate float64
+}
+
+func (stub lsfstub) stubCommand(c *check.C) func(prog string, args ...string) *exec.Cmd {
+       mtx := sync.Mutex{}
+       nextjobid := 100
+       fakejobq := map[int]string{}
+       return func(prog string, args ...string) *exec.Cmd {
+               c.Logf("stubCommand: %q %q", prog, args)
+               if rand.Float64() < stub.errorRate {
+                       return exec.Command("bash", "-c", "echo >&2 'stub random failure' && false")
+               }
+               if stub.sudoUser != "" && len(args) > 3 &&
+                       prog == "sudo" &&
+                       args[0] == "-E" &&
+                       args[1] == "-u" &&
+                       args[2] == stub.sudoUser {
+                       prog, args = args[3], args[4:]
+               }
+               switch prog {
+               case "bsub":
+                       c.Assert(args, check.HasLen, 4)
+                       c.Check(args[0], check.Equals, "-J")
+                       switch args[1] {
+                       case arvadostest.LockedContainerUUID:
+                               c.Check(args, check.DeepEquals, []string{"-J", arvadostest.LockedContainerUUID, "-R", "rusage[mem=11701MB:tmp=0MB] affinity[core(4)]"})
+                               mtx.Lock()
+                               fakejobq[nextjobid] = args[1]
+                               nextjobid++
+                               mtx.Unlock()
+                       case arvadostest.QueuedContainerUUID:
+                               c.Check(args, check.DeepEquals, []string{"-J", arvadostest.QueuedContainerUUID, "-R", "rusage[mem=11701MB:tmp=45777MB] affinity[core(4)]"})
+                               mtx.Lock()
+                               fakejobq[nextjobid] = args[1]
+                               nextjobid++
+                               mtx.Unlock()
+                       default:
+                               c.Errorf("unexpected uuid passed to bsub: args %q", args)
+                               return exec.Command("false")
+                       }
+                       return exec.Command("echo", "submitted job")
+               case "bjobs":
+                       c.Check(args, check.DeepEquals, []string{"-u", "all", "-noheader", "-o", "jobid stat job_name:30"})
+                       out := ""
+                       for jobid, uuid := range fakejobq {
+                               out += fmt.Sprintf(`%d %s %s\n`, jobid, "RUN", uuid)
+                       }
+                       c.Logf("bjobs out: %q", out)
+                       return exec.Command("printf", out)
+               case "bkill":
+                       killid, _ := strconv.Atoi(args[0])
+                       if uuid, ok := fakejobq[killid]; !ok {
+                               return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: No matching job found\n'", killid))
+                       } else if uuid == "" {
+                               return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: Job has already finished\n'", killid))
+                       } else {
+                               go func() {
+                                       time.Sleep(time.Millisecond)
+                                       mtx.Lock()
+                                       delete(fakejobq, killid)
+                                       mtx.Unlock()
+                               }()
+                               return exec.Command("bash", "-c", fmt.Sprintf("printf 'Job <%d> is being terminated\n'", killid))
+                       }
+               default:
+                       return exec.Command("bash", "-c", fmt.Sprintf("echo >&2 'stub: command not found: %+q'", prog))
+               }
+       }
+}
+
+func (s *suite) TestSubmit(c *check.C) {
+       s.disp.lsfcli.stubCommand = lsfstub{
+               errorRate: 0.1,
+               sudoUser:  s.disp.Cluster.Containers.LSF.BsubSudoUser,
+       }.stubCommand(c)
+       s.disp.Start()
+       deadline := time.Now().Add(20 * time.Second)
+       for range time.NewTicker(time.Second).C {
+               if time.Now().After(deadline) {
+                       c.Error("timed out")
+                       break
+               }
+               // "queuedcontainer" should be running
+               if _, ok := s.disp.lsfqueue.JobID(arvadostest.QueuedContainerUUID); !ok {
+                       continue
+               }
+               // "lockedcontainer" should be cancelled because it
+               // has priority 0 (no matching container requests)
+               if _, ok := s.disp.lsfqueue.JobID(arvadostest.LockedContainerUUID); ok {
+                       continue
+               }
+               var ctr arvados.Container
+               if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
+                       c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
+                       continue
+               }
+               if ctr.State != arvados.ContainerStateQueued {
+                       c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State)
+                       continue
+               }
+               c.Log("reached desired state")
+               break
+       }
+}
diff --git a/lib/lsf/lsfcli.go b/lib/lsf/lsfcli.go
new file mode 100644 (file)
index 0000000..9d712ee
--- /dev/null
@@ -0,0 +1,92 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+       "bytes"
+       "fmt"
+       "os"
+       "os/exec"
+       "strings"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/sirupsen/logrus"
+)
+
+type bjobsEntry struct {
+       id   int
+       name string
+       stat string
+}
+
+type lsfcli struct {
+       logger logrus.FieldLogger
+       // (for testing) if non-nil, call stubCommand() instead of
+       // exec.Command() when running lsf command line programs.
+       stubCommand func(string, ...string) *exec.Cmd
+}
+
+func (cli lsfcli) command(prog string, args ...string) *exec.Cmd {
+       if f := cli.stubCommand; f != nil {
+               return f(prog, args...)
+       } else {
+               return exec.Command(prog, args...)
+       }
+}
+
+func (cli lsfcli) Bsub(script []byte, args []string, arv *arvados.Client) error {
+       cli.logger.Infof("bsub command %q script %q", args, script)
+       cmd := cli.command(args[0], args[1:]...)
+       cmd.Env = append([]string(nil), os.Environ()...)
+       cmd.Env = append(cmd.Env, "ARVADOS_API_HOST="+arv.APIHost)
+       cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arv.AuthToken)
+       if arv.Insecure {
+               cmd.Env = append(cmd.Env, "ARVADOS_API_HOST_INSECURE=1")
+       }
+       cmd.Stdin = bytes.NewReader(script)
+       out, err := cmd.Output()
+       cli.logger.WithField("stdout", string(out)).Infof("bsub finished")
+       return errWithStderr(err)
+}
+
+func (cli lsfcli) Bjobs() ([]bjobsEntry, error) {
+       cli.logger.Debugf("Bjobs()")
+       cmd := cli.command("bjobs", "-u", "all", "-noheader", "-o", "jobid stat job_name:30")
+       buf, err := cmd.Output()
+       if err != nil {
+               return nil, errWithStderr(err)
+       }
+       var bjobs []bjobsEntry
+       for _, line := range strings.Split(string(buf), "\n") {
+               if line == "" {
+                       continue
+               }
+               var ent bjobsEntry
+               if _, err := fmt.Sscan(line, &ent.id, &ent.stat, &ent.name); err != nil {
+                       cli.logger.Warnf("ignoring unparsed line in bjobs output: %q", line)
+                       continue
+               }
+               bjobs = append(bjobs, ent)
+       }
+       return bjobs, nil
+}
+
+func (cli lsfcli) Bkill(id int) error {
+       cli.logger.Infof("Bkill(%d)", id)
+       cmd := cli.command("bkill", fmt.Sprintf("%d", id))
+       buf, err := cmd.CombinedOutput()
+       if err == nil || strings.Index(string(buf), "already finished") >= 0 {
+               return nil
+       } else {
+               return fmt.Errorf("%s (%q)", err, buf)
+       }
+}
+
+func errWithStderr(err error) error {
+       if err, ok := err.(*exec.ExitError); ok {
+               return fmt.Errorf("%s (%q)", err, err.Stderr)
+       }
+       return err
+}
diff --git a/lib/lsf/lsfqueue.go b/lib/lsf/lsfqueue.go
new file mode 100644 (file)
index 0000000..3c4fc4c
--- /dev/null
@@ -0,0 +1,108 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+       "sync"
+       "time"
+
+       "github.com/sirupsen/logrus"
+)
+
+type lsfqueue struct {
+       logger logrus.FieldLogger
+       period time.Duration
+       lsfcli *lsfcli
+
+       initOnce  sync.Once
+       mutex     sync.Mutex
+       nextReady chan (<-chan struct{})
+       updated   *sync.Cond
+       latest    map[string]bjobsEntry
+}
+
+// JobID waits for the next queue update (so even a job that was only
+// submitted a nanosecond ago will show up) and then returns the LSF
+// job ID corresponding to the given container UUID.
+func (q *lsfqueue) JobID(uuid string) (int, bool) {
+       ent, ok := q.getNext()[uuid]
+       return ent.id, ok
+}
+
+// All waits for the next queue update, then returns the names of all
+// jobs in the queue. Used by checkLsfQueueForOrphans().
+func (q *lsfqueue) All() []string {
+       latest := q.getNext()
+       names := make([]string, 0, len(latest))
+       for name := range latest {
+               names = append(names, name)
+       }
+       return names
+}
+
+func (q *lsfqueue) SetPriority(uuid string, priority int64) {
+       q.initOnce.Do(q.init)
+       q.logger.Debug("SetPriority is not implemented")
+}
+
+func (q *lsfqueue) getNext() map[string]bjobsEntry {
+       q.initOnce.Do(q.init)
+       <-(<-q.nextReady)
+       q.mutex.Lock()
+       defer q.mutex.Unlock()
+       return q.latest
+}
+
+func (q *lsfqueue) init() {
+       q.updated = sync.NewCond(&q.mutex)
+       q.nextReady = make(chan (<-chan struct{}))
+       ticker := time.NewTicker(time.Second)
+       go func() {
+               for range ticker.C {
+                       // Send a new "next update ready" channel to
+                       // the next goroutine that wants one (and any
+                       // others that have already queued up since
+                       // the first one started waiting).
+                       //
+                       // Below, when we get a new update, we'll
+                       // signal that to the other goroutines by
+                       // closing the ready chan.
+                       ready := make(chan struct{})
+                       q.nextReady <- ready
+                       for {
+                               select {
+                               case q.nextReady <- ready:
+                                       continue
+                               default:
+                               }
+                               break
+                       }
+                       // Run bjobs repeatedly if needed, until we
+                       // get valid output.
+                       var ents []bjobsEntry
+                       for {
+                               q.logger.Debug("running bjobs")
+                               var err error
+                               ents, err = q.lsfcli.Bjobs()
+                               if err == nil {
+                                       break
+                               }
+                               q.logger.Warnf("bjobs: %s", err)
+                               <-ticker.C
+                       }
+                       next := make(map[string]bjobsEntry, len(ents))
+                       for _, ent := range ents {
+                               next[ent.name] = ent
+                       }
+                       // Replace q.latest and notify all the
+                       // goroutines that the "next update" they
+                       // asked for is now ready.
+                       q.mutex.Lock()
+                       q.latest = next
+                       q.mutex.Unlock()
+                       close(ready)
+               }
+       }()
+}
index 6e59828a3cbf5656fef1e6c7fc790ca9d3b6268f..844991f41e03b219c5b1d1c0c537d5125d9b2413 100644 (file)
@@ -329,6 +329,7 @@ type Services struct {
        Composer       Service
        Controller     Service
        DispatchCloud  Service
+       DispatchLSF    Service
        GitHTTP        Service
        GitSSH         Service
        Health         Service
@@ -461,6 +462,10 @@ type ContainersConfig struct {
                        AssignNodeHostname     string
                }
        }
+       LSF struct {
+               BsubSudoUser      string
+               BsubArgumentsList []string
+       }
 }
 
 type CloudVMsConfig struct {
@@ -597,6 +602,7 @@ const (
        ServiceNameRailsAPI      ServiceName = "arvados-api-server"
        ServiceNameController    ServiceName = "arvados-controller"
        ServiceNameDispatchCloud ServiceName = "arvados-dispatch-cloud"
+       ServiceNameDispatchLSF   ServiceName = "arvados-dispatch-lsf"
        ServiceNameHealth        ServiceName = "arvados-health"
        ServiceNameWorkbench1    ServiceName = "arvados-workbench1"
        ServiceNameWorkbench2    ServiceName = "arvados-workbench2"
@@ -614,6 +620,7 @@ func (svcs Services) Map() map[ServiceName]Service {
                ServiceNameRailsAPI:      svcs.RailsAPI,
                ServiceNameController:    svcs.Controller,
                ServiceNameDispatchCloud: svcs.DispatchCloud,
+               ServiceNameDispatchLSF:   svcs.DispatchLSF,
                ServiceNameHealth:        svcs.Health,
                ServiceNameWorkbench1:    svcs.Workbench1,
                ServiceNameWorkbench2:    svcs.Workbench2,
index 4b7ad6dd59fa426e8b1e71c546ee43a851d99c54..3de4225d568a95324e5f81ecc34e8b2486433f8e 100644 (file)
@@ -45,6 +45,8 @@ const (
        QueuedContainerRequestUUID = "zzzzz-xvhdp-cr4queuedcontnr"
        QueuedContainerUUID        = "zzzzz-dz642-queuedcontainer"
 
+       LockedContainerUUID = "zzzzz-dz642-lockedcontainer"
+
        RunningContainerUUID = "zzzzz-dz642-runningcontainr"
 
        CompletedContainerUUID         = "zzzzz-dz642-compltcontainer"
index 2acf3e59ab81ae10ff816577f5f33fdaea8b9922..04106caa442cfb52fbc098a516112e11b55643bb 100644 (file)
@@ -153,6 +153,7 @@ func (s *AggregatorSuite) setAllServiceURLs(listen string) {
        for _, svc := range []*arvados.Service{
                &svcs.Controller,
                &svcs.DispatchCloud,
+               &svcs.DispatchLSF,
                &svcs.Keepbalance,
                &svcs.Keepproxy,
                &svcs.Keepstore,