Merge branch '17756-dispatch-lsf' into main
authorTom Clegg <tom@curii.com>
Thu, 29 Jul 2021 13:57:48 +0000 (09:57 -0400)
committerTom Clegg <tom@curii.com>
Thu, 29 Jul 2021 13:57:48 +0000 (09:57 -0400)
closes #17756

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

23 files changed:
build/run-build-packages-one-target.sh
build/run-build-packages.sh
cmd/arvados-server/arvados-dispatch-lsf.service [new file with mode: 0644]
cmd/arvados-server/cmd.go
doc/_config.yml
doc/install/crunch2-lsf/install-dispatch.html.textile.liquid [new file with mode: 0644]
doc/install/crunch2-slurm/install-dispatch.html.textile.liquid
lib/config/config.default.yml
lib/config/export.go
lib/config/generated_config.go
lib/lsf/dispatch.go [new file with mode: 0644]
lib/lsf/dispatch_test.go [new file with mode: 0644]
lib/lsf/lsfcli.go [new file with mode: 0644]
lib/lsf/lsfqueue.go [new file with mode: 0644]
sdk/go/arvados/config.go
sdk/go/arvadostest/fixtures.go
sdk/go/dispatch/dispatch.go
sdk/go/dispatch/dispatch_test.go
sdk/go/health/aggregator_test.go
services/crunch-dispatch-local/crunch-dispatch-local.go
services/crunch-dispatch-local/crunch-dispatch-local_test.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go

index 8365fecadbd29705924623374ba21763f934185e..056a1fbb1216a2742a5d90e2165db0798a1d2d1e 100755 (executable)
@@ -194,6 +194,7 @@ if test -z "$packages" ; then
         arvados-client
         arvados-controller
         arvados-dispatch-cloud
+        arvados-dispatch-lsf
         arvados-docker-cleaner
         arvados-git-httpd
         arvados-health
index d46c246da7c9fadb0fe96fbe7edce0f4c623c351..7829c8c6cd61792535960a153bb20baf1b7e1622 100755 (executable)
@@ -277,6 +277,8 @@ package_go_binary cmd/arvados-server arvados-controller \
     "Arvados cluster controller daemon"
 package_go_binary cmd/arvados-server arvados-dispatch-cloud \
     "Arvados cluster cloud dispatch"
+package_go_binary cmd/arvados-server arvados-dispatch-lsf \
+    "Dispatch Arvados containers to an LSF cluster"
 package_go_binary services/arv-git-httpd arvados-git-httpd \
     "Provide authenticated http access to Arvados-hosted git repositories"
 package_go_binary services/crunch-dispatch-local crunch-dispatch-local \
diff --git a/cmd/arvados-server/arvados-dispatch-lsf.service b/cmd/arvados-server/arvados-dispatch-lsf.service
new file mode 100644 (file)
index 0000000..f9e73a2
--- /dev/null
@@ -0,0 +1,30 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+[Unit]
+Description=arvados-dispatch-lsf
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/config.yml
+
+# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
+StartLimitInterval=0
+
+# systemd>=230 (debian:9) obeys StartLimitIntervalSec in the [Unit] section
+StartLimitIntervalSec=0
+
+[Service]
+Type=notify
+EnvironmentFile=-/etc/arvados/environment
+ExecStart=/usr/bin/arvados-dispatch-lsf
+# Set a reasonable default for the open file limit
+LimitNOFILE=65536
+Restart=always
+RestartSec=1
+
+# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
+StartLimitInterval=0
+
+[Install]
+WantedBy=multi-user.target
index d0aa9da94df537bf80a3ed232c2a0ae2c3a0e1d6..4b94a7813869915c38c14ec7927a8a2662e30475 100644 (file)
@@ -15,6 +15,7 @@ import (
        "git.arvados.org/arvados.git/lib/crunchrun"
        "git.arvados.org/arvados.git/lib/dispatchcloud"
        "git.arvados.org/arvados.git/lib/install"
+       "git.arvados.org/arvados.git/lib/lsf"
        "git.arvados.org/arvados.git/lib/recovercollection"
        "git.arvados.org/arvados.git/services/ws"
 )
@@ -33,6 +34,7 @@ var (
                "controller":         controller.Command,
                "crunch-run":         crunchrun.Command,
                "dispatch-cloud":     dispatchcloud.Command,
+               "dispatch-lsf":       lsf.DispatchCommand,
                "install":            install.Command,
                "init":               install.InitCommand,
                "recover-collection": recovercollection.Command,
index 39fe22fde3e1379205a26e0e5f28ef398c3d3980..b18607ebb7490622d38e119ad6a0f0383fdb35ba 100644 (file)
@@ -254,6 +254,8 @@ navbar:
       - install/crunch2-slurm/configure-slurm.html.textile.liquid
       - install/crunch2-slurm/install-compute-node.html.textile.liquid
       - install/crunch2-slurm/install-test.html.textile.liquid
+    - Containers API (lsf):
+      - install/crunch2-lsf/install-dispatch.html.textile.liquid
     - Additional configuration:
       - install/container-shell-access.html.textile.liquid
     - External dependencies:
diff --git a/doc/install/crunch2-lsf/install-dispatch.html.textile.liquid b/doc/install/crunch2-lsf/install-dispatch.html.textile.liquid
new file mode 100644 (file)
index 0000000..66b562d
--- /dev/null
@@ -0,0 +1,112 @@
+---
+layout: default
+navsection: installguide
+title: Install the LSF dispatcher
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+{% include 'notebox_begin_warning' %}
+arvados-dispatch-lsf is only relevant for on premises clusters that will spool jobs to LSF. Skip this section if you are installing a cloud cluster.
+{% include 'notebox_end' %}
+
+Containers can be dispatched to an LSF cluster.  The dispatcher sends work to the cluster using LSF's @bsub@ command, so it works in a variety of LSF configurations.
+
+*LSF support is currently considered experimental.*
+
+Limitations include:
+* Arvados container priority is not propagated to LSF job priority. This can cause inefficient use of compute resources, and even deadlock if there are fewer compute nodes than concurrent Arvados workflows.
+* Combining LSF with docker may not work, depending on LSF configuration and user/group IDs (if LSF only sets up the configured user's primary group ID when executing the crunch-run process on a compute node, it may not have permission to connect to the docker daemon).
+
+In order to run containers, you must choose a user that has permission to set up FUSE mounts and run Singularity/Docker containers on each compute node.  This install guide refers to this user as the @crunch@ user.  We recommend you create this user on each compute node with the same UID and GID, and add it to the @fuse@ and @docker@ system groups to grant it the necessary permissions.  However, you can run the dispatcher under any account with sufficient permissions across the cluster.
+
+Set up all of your compute nodes "as you would for a SLURM cluster":../crunch2-slurm/install-compute-node.html.
+
+
+h2(#update-config). Update config.yml
+
+Arvados-dispatch-lsf reads the common configuration file at @/etc/arvados/config.yml@.
+
+Review the following configuration parameters and adjust as needed.
+
+
+h3(#BsubSudoUser). Containers.LSF.BsubSudoUser
+
+arvados-dispatch-lsf uses @sudo@ to execute @bsub@, for example @sudo -E -u crunch bsub [...]@. This means the @crunch@ account must exist on the hosts where LSF jobs run ("execution hosts"), as well as on the host where you are installing the Arvados LSF dispatcher (the "submission host"). To use a user account other than @crunch@, configure @BsubSudoUser@:
+
+<notextile>
+<pre>    Containers:
+      LSF:
+        <code class="userinput">BsubSudoUser: <b>lsfuser</b>
+</code></pre>
+</notextile>
+
+Alternatively, you can arrange for the arvados-dispatch-lsf process to run as an unprivileged user that has a corresponding account on all compute nodes, and disable the use of @sudo@ by specifying an empty string:
+
+<notextile>
+<pre>    Containers:
+      LSF:
+        # Don't use sudo
+        <code class="userinput">BsubSudoUser: <b>""</b>
+</code></pre>
+</notextile>
+
+
+h3(#SbatchArguments). Containers.LSF.BsubArgumentsList
+
+When arvados-dispatch-lsf invokes @bsub@, you can add arguments to the command by specifying @BsubArgumentsList@.  You can use this to send the jobs to specific cluster partitions or add resource requests.  Set @BsubArgumentsList@ to an array of strings.  For example:
+
+<notextile>
+<pre>    Containers:
+      LSF:
+        <code class="userinput">BsubArgumentsList: <b>["-C", "0"]</b></code>
+</pre>
+</notextile>
+
+
+h3(#PollPeriod). Containers.PollInterval
+
+arvados-dispatch-lsf polls the API server periodically for new containers to run.  The @PollInterval@ option controls how often this poll happens.  Set this to a string of numbers suffixed with one of the time units @s@, @m@, or @h@.  For example:
+
+<notextile>
+<pre>    Containers:
+      <code class="userinput">PollInterval: <b>10s</b>
+</code></pre>
+</notextile>
+
+
+h3(#ReserveExtraRAM). Containers.ReserveExtraRAM: Extra RAM for jobs
+
+Extra RAM to reserve (in bytes) on each LSF job submitted by Arvados, which is added to the amount specified in the container's @runtime_constraints@.  If not provided, the default value is zero.
+
+Supports suffixes @KB@, @KiB@, @MB@, @MiB@, @GB@, @GiB@, @TB@, @TiB@, @PB@, @PiB@, @EB@, @EiB@ (where @KB@ is 10[^3^], @KiB@ is 2[^10^], @MB@ is 10[^6^], @MiB@ is 2[^20^] and so forth).
+
+<notextile>
+<pre>    Containers:
+      <code class="userinput">ReserveExtraRAM: <b>256MiB</b></code>
+</pre>
+</notextile>
+
+
+h3(#CrunchRunCommand-network). Containers.CrunchRunArgumentList: Using host networking for containers
+
+Older Linux kernels (prior to 3.18) have bugs in network namespace handling which can lead to compute node lockups.  This by is indicated by blocked kernel tasks in "Workqueue: netns cleanup_net".   If you are experiencing this problem, as a workaround you can disable use of network namespaces by Docker across the cluster.  Be aware this reduces container isolation, which may be a security risk.
+
+<notextile>
+<pre>    Containers:
+      <code class="userinput">CrunchRunArgumentsList:
+        - <b>"-container-enable-networking=always"</b>
+        - <b>"-container-network-mode=host"</b></code>
+</pre>
+</notextile>
+
+{% assign arvados_component = 'arvados-dispatch-lsf' %}
+
+{% include 'install_packages' %}
+
+{% include 'start_service' %}
+
+{% include 'restart_api' %}
index 3996cc7930a70a44ace17a8bd55cade99876bd7c..5b5b868e57611fe0262b0e16e708289f1a001f95 100644 (file)
@@ -44,7 +44,7 @@ crunch-dispatch-slurm polls the API server periodically for new containers to ru
 
 h3(#ReserveExtraRAM). Containers.ReserveExtraRAM: Extra RAM for jobs
 
-Extra RAM to reserve (in bytes) on each Slurm job submitted by Arvados, which is added to the amount specified in the container's @runtime_constraints@.  If not provided, the default value is zero.  Helpful when using @-cgroup-parent-subsystem@, where @crunch-run@ and @arv-mount@ share the control group memory limit with the user process.  In this situation, at least 256MiB is recommended to accomodate each container's @crunch-run@ and @arv-mount@ processes.
+Extra RAM to reserve (in bytes) on each Slurm job submitted by Arvados, which is added to the amount specified in the container's @runtime_constraints@.  If not provided, the default value is zero.  Helpful when using @-cgroup-parent-subsystem@, where @crunch-run@ and @arv-mount@ share the control group memory limit with the user process.  In this situation, at least 256MiB is recommended to accommodate each container's @crunch-run@ and @arv-mount@ processes.
 
 Supports suffixes @KB@, @KiB@, @MB@, @MiB@, @GB@, @GiB@, @TB@, @TiB@, @PB@, @PiB@, @EB@, @EiB@ (where @KB@ is 10[^3^], @KiB@ is 2[^10^], @MB@ is 10[^6^], @MiB@ is 2[^20^] and so forth).
 
index e28d5cbb7f0cd09b3ad559f6cab0c9a9967c10d5..64d59f2c01a130f7e61790d3a867f053373d1e5c 100644 (file)
@@ -52,6 +52,9 @@ Clusters:
       DispatchCloud:
         InternalURLs: {SAMPLE: {}}
         ExternalURL: "-"
+      DispatchLSF:
+        InternalURLs: {SAMPLE: {}}
+        ExternalURL: "-"
       Keepproxy:
         InternalURLs: {SAMPLE: {}}
         ExternalURL: ""
@@ -1012,6 +1015,19 @@ Clusters:
           # (See http://ruby-doc.org/core-2.2.2/Kernel.html#method-i-format for more.)
           AssignNodeHostname: "compute%<slot_number>d"
 
+      LSF:
+        # Additional arguments to bsub when submitting Arvados
+        # containers as LSF jobs.
+        BsubArgumentsList: []
+
+        # Use sudo to switch to this user account when submitting LSF
+        # jobs.
+        #
+        # This account must exist on the hosts where LSF jobs run
+        # ("execution hosts"), as well as on the host where the
+        # Arvados LSF dispatcher runs ("submission host").
+        BsubSudoUser: "crunch"
+
       JobsAPI:
         # Enable the legacy 'jobs' API (crunch v1).  This value must be a string.
         #
index bb939321c9ce17e220bb031f041efae53c79fa46..da5495352a53e7450ada8dabdbb578f9ec13647c 100644 (file)
@@ -120,6 +120,7 @@ var whitelist = map[string]bool{
        "Containers.JobsAPI.GitInternalDir":                   false,
        "Containers.Logging":                                  false,
        "Containers.LogReuseDecisions":                        false,
+       "Containers.LSF":                                      false,
        "Containers.MaxComputeVMs":                            false,
        "Containers.MaxDispatchAttempts":                      false,
        "Containers.MaxRetryAttempts":                         true,
index b15bf7eebc29facda6f1a0e2670c5482f83055cf..a7aebe63879041d9a506d352205ba98d7672a737 100644 (file)
@@ -58,6 +58,9 @@ Clusters:
       DispatchCloud:
         InternalURLs: {SAMPLE: {}}
         ExternalURL: "-"
+      DispatchLSF:
+        InternalURLs: {SAMPLE: {}}
+        ExternalURL: "-"
       Keepproxy:
         InternalURLs: {SAMPLE: {}}
         ExternalURL: ""
@@ -1018,6 +1021,19 @@ Clusters:
           # (See http://ruby-doc.org/core-2.2.2/Kernel.html#method-i-format for more.)
           AssignNodeHostname: "compute%<slot_number>d"
 
+      LSF:
+        # Additional arguments to bsub when submitting Arvados
+        # containers as LSF jobs.
+        BsubArgumentsList: []
+
+        # Use sudo to switch to this user account when submitting LSF
+        # jobs.
+        #
+        # This account must exist on the hosts where LSF jobs run
+        # ("execution hosts"), as well as on the host where the
+        # Arvados LSF dispatcher runs ("submission host").
+        BsubSudoUser: "crunch"
+
       JobsAPI:
         # Enable the legacy 'jobs' API (crunch v1).  This value must be a string.
         #
diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go
new file mode 100644 (file)
index 0000000..7461597
--- /dev/null
@@ -0,0 +1,322 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "math"
+       "net/http"
+       "regexp"
+       "strings"
+       "sync"
+       "time"
+
+       "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/dispatchcloud"
+       "git.arvados.org/arvados.git/lib/service"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/auth"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "git.arvados.org/arvados.git/sdk/go/dispatch"
+       "git.arvados.org/arvados.git/sdk/go/health"
+       "github.com/julienschmidt/httprouter"
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promhttp"
+       "github.com/sirupsen/logrus"
+)
+
+var DispatchCommand cmd.Handler = service.Command(arvados.ServiceNameDispatchLSF, newHandler)
+
+func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
+       ac, err := arvados.NewClientFromConfig(cluster)
+       if err != nil {
+               return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
+       }
+       d := &dispatcher{
+               Cluster:   cluster,
+               Context:   ctx,
+               ArvClient: ac,
+               AuthToken: token,
+               Registry:  reg,
+       }
+       go d.Start()
+       return d
+}
+
+type dispatcher struct {
+       Cluster   *arvados.Cluster
+       Context   context.Context
+       ArvClient *arvados.Client
+       AuthToken string
+       Registry  *prometheus.Registry
+
+       logger        logrus.FieldLogger
+       lsfcli        lsfcli
+       lsfqueue      lsfqueue
+       arvDispatcher *dispatch.Dispatcher
+       httpHandler   http.Handler
+
+       initOnce sync.Once
+       stop     chan struct{}
+       stopped  chan struct{}
+}
+
+// Start starts the dispatcher. Start can be called multiple times
+// with no ill effect.
+func (disp *dispatcher) Start() {
+       disp.initOnce.Do(func() {
+               disp.init()
+               go func() {
+                       disp.checkLsfQueueForOrphans()
+                       err := disp.arvDispatcher.Run(disp.Context)
+                       if err != nil {
+                               disp.logger.Error(err)
+                               disp.Close()
+                       }
+               }()
+       })
+}
+
+// ServeHTTP implements service.Handler.
+func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+       disp.Start()
+       disp.httpHandler.ServeHTTP(w, r)
+}
+
+// CheckHealth implements service.Handler.
+func (disp *dispatcher) CheckHealth() error {
+       disp.Start()
+       select {
+       case <-disp.stopped:
+               return errors.New("stopped")
+       default:
+               return nil
+       }
+}
+
+// Done implements service.Handler.
+func (disp *dispatcher) Done() <-chan struct{} {
+       return disp.stopped
+}
+
+// Stop dispatching containers and release resources. Used by tests.
+func (disp *dispatcher) Close() {
+       disp.Start()
+       select {
+       case disp.stop <- struct{}{}:
+       default:
+       }
+       <-disp.stopped
+}
+
+func (disp *dispatcher) init() {
+       disp.logger = ctxlog.FromContext(disp.Context)
+       disp.lsfcli.logger = disp.logger
+       disp.lsfqueue = lsfqueue{
+               logger: disp.logger,
+               period: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
+               lsfcli: &disp.lsfcli,
+       }
+       disp.ArvClient.AuthToken = disp.AuthToken
+       disp.stop = make(chan struct{}, 1)
+       disp.stopped = make(chan struct{})
+
+       arv, err := arvadosclient.New(disp.ArvClient)
+       if err != nil {
+               disp.logger.Fatalf("Error making Arvados client: %v", err)
+       }
+       arv.Retries = 25
+       arv.ApiToken = disp.AuthToken
+       disp.arvDispatcher = &dispatch.Dispatcher{
+               Arv:            arv,
+               Logger:         disp.logger,
+               BatchSize:      disp.Cluster.API.MaxItemsPerResponse,
+               RunContainer:   disp.runContainer,
+               PollPeriod:     time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval),
+               MinRetryPeriod: time.Duration(disp.Cluster.Containers.MinRetryPeriod),
+       }
+
+       if disp.Cluster.ManagementToken == "" {
+               disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+                       http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
+               })
+       } else {
+               mux := httprouter.New()
+               metricsH := promhttp.HandlerFor(disp.Registry, promhttp.HandlerOpts{
+                       ErrorLog: disp.logger,
+               })
+               mux.Handler("GET", "/metrics", metricsH)
+               mux.Handler("GET", "/metrics.json", metricsH)
+               mux.Handler("GET", "/_health/:check", &health.Handler{
+                       Token:  disp.Cluster.ManagementToken,
+                       Prefix: "/_health/",
+                       Routes: health.Routes{"ping": disp.CheckHealth},
+               })
+               disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
+       }
+}
+
+func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
+       ctx, cancel := context.WithCancel(disp.Context)
+       defer cancel()
+
+       if ctr.State != dispatch.Locked {
+               // already started by prior invocation
+       } else if _, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+               disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
+               cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
+               cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
+               cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...)
+               err := disp.submit(ctr, cmd)
+               if err != nil {
+                       return err
+               }
+       }
+
+       disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
+       defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
+
+       // If the container disappears from the lsf queue, there is
+       // no point in waiting for further dispatch updates: just
+       // clean up and return.
+       go func(uuid string) {
+               for ctx.Err() == nil {
+                       if _, ok := disp.lsfqueue.JobID(uuid); !ok {
+                               disp.logger.Printf("container %s job disappeared from LSF queue", uuid)
+                               cancel()
+                               return
+                       }
+               }
+       }(ctr.UUID)
+
+       for done := false; !done; {
+               select {
+               case <-ctx.Done():
+                       // Disappeared from lsf queue
+                       if err := disp.arvDispatcher.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
+                               disp.logger.Printf("error getting final container state for %s: %s", ctr.UUID, err)
+                       }
+                       switch ctr.State {
+                       case dispatch.Running:
+                               disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
+                       case dispatch.Locked:
+                               disp.arvDispatcher.Unlock(ctr.UUID)
+                       }
+                       return nil
+               case updated, ok := <-status:
+                       if !ok {
+                               // status channel is closed, which is
+                               // how arvDispatcher tells us to stop
+                               // touching the container record, kill
+                               // off any remaining LSF processes,
+                               // etc.
+                               done = true
+                               break
+                       }
+                       if updated.State != ctr.State {
+                               disp.logger.Infof("container %s changed state from %s to %s", ctr.UUID, ctr.State, updated.State)
+                       }
+                       ctr = updated
+                       if ctr.Priority < 1 {
+                               disp.logger.Printf("container %s has state %s, priority %d: cancel lsf job", ctr.UUID, ctr.State, ctr.Priority)
+                               disp.bkill(ctr)
+                       } else {
+                               disp.lsfqueue.SetPriority(ctr.UUID, int64(ctr.Priority))
+                       }
+               }
+       }
+       disp.logger.Printf("container %s is done", ctr.UUID)
+
+       // Try "bkill" every few seconds until the LSF job disappears
+       // from the queue.
+       ticker := time.NewTicker(5 * time.Second)
+       defer ticker.Stop()
+       for jobid, ok := disp.lsfqueue.JobID(ctr.UUID); ok; _, ok = disp.lsfqueue.JobID(ctr.UUID) {
+               err := disp.lsfcli.Bkill(jobid)
+               if err != nil {
+                       disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+               }
+               <-ticker.C
+       }
+       return nil
+}
+
+func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
+       // Start with an empty slice here to ensure append() doesn't
+       // modify crunchRunCommand's underlying array
+       var crArgs []string
+       crArgs = append(crArgs, crunchRunCommand...)
+       crArgs = append(crArgs, container.UUID)
+       crScript := execScript(crArgs)
+
+       bsubArgs, err := disp.bsubArgs(container)
+       if err != nil {
+               return err
+       }
+       return disp.lsfcli.Bsub(crScript, bsubArgs, disp.ArvClient)
+}
+
+func (disp *dispatcher) bkill(ctr arvados.Container) {
+       if jobid, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+               disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID)
+       } else if err := disp.lsfcli.Bkill(jobid); err != nil {
+               disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+       }
+}
+
+func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) {
+       args := []string{"bsub"}
+       args = append(args, disp.Cluster.Containers.LSF.BsubArgumentsList...)
+       args = append(args, "-J", container.UUID)
+       args = append(args, disp.bsubConstraintArgs(container)...)
+       if u := disp.Cluster.Containers.LSF.BsubSudoUser; u != "" {
+               args = append([]string{"sudo", "-E", "-u", u}, args...)
+       }
+       return args, nil
+}
+
+func (disp *dispatcher) bsubConstraintArgs(container arvados.Container) []string {
+       // TODO: propagate container.SchedulingParameters.Partitions
+       tmp := int64(math.Ceil(float64(dispatchcloud.EstimateScratchSpace(&container)) / 1048576))
+       vcpus := container.RuntimeConstraints.VCPUs
+       mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+
+               container.RuntimeConstraints.KeepCacheRAM+
+               int64(disp.Cluster.Containers.ReserveExtraRAM)) / 1048576))
+       return []string{
+               "-R", fmt.Sprintf("rusage[mem=%dMB:tmp=%dMB] affinity[core(%d)]", mem, tmp, vcpus),
+       }
+}
+
+// Check the next bjobs report, and invoke TrackContainer for all the
+// containers in the report. This gives us a chance to cancel existing
+// Arvados LSF jobs (started by a previous dispatch process) that
+// never released their LSF job allocations even though their
+// container states are Cancelled or Complete. See
+// https://dev.arvados.org/issues/10979
+func (disp *dispatcher) checkLsfQueueForOrphans() {
+       containerUuidPattern := regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
+       for _, uuid := range disp.lsfqueue.All() {
+               if !containerUuidPattern.MatchString(uuid) || !strings.HasPrefix(uuid, disp.Cluster.ClusterID) {
+                       continue
+               }
+               err := disp.arvDispatcher.TrackContainer(uuid)
+               if err != nil {
+                       disp.logger.Warnf("checkLsfQueueForOrphans: TrackContainer(%s): %s", uuid, err)
+               }
+       }
+}
+
+func execScript(args []string) []byte {
+       s := "#!/bin/sh\nexec"
+       for _, w := range args {
+               s += ` '`
+               s += strings.Replace(w, `'`, `'\''`, -1)
+               s += `'`
+       }
+       return []byte(s + "\n")
+}
diff --git a/lib/lsf/dispatch_test.go b/lib/lsf/dispatch_test.go
new file mode 100644 (file)
index 0000000..7cf6df6
--- /dev/null
@@ -0,0 +1,156 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+       "context"
+       "fmt"
+       "math/rand"
+       "os/exec"
+       "strconv"
+       "sync"
+       "testing"
+       "time"
+
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadostest"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/prometheus/client_golang/prometheus"
+       "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
+
+var _ = check.Suite(&suite{})
+
+type suite struct {
+       disp *dispatcher
+}
+
+func (s *suite) TearDownTest(c *check.C) {
+       arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
+}
+
+func (s *suite) SetUpTest(c *check.C) {
+       cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+       c.Assert(err, check.IsNil)
+       cluster, err := cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+       cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second)
+       s.disp = newHandler(context.Background(), cluster, arvadostest.Dispatch1Token, prometheus.NewRegistry()).(*dispatcher)
+       s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
+               return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
+       }
+}
+
+type lsfstub struct {
+       sudoUser  string
+       errorRate float64
+}
+
+func (stub lsfstub) stubCommand(c *check.C) func(prog string, args ...string) *exec.Cmd {
+       mtx := sync.Mutex{}
+       nextjobid := 100
+       fakejobq := map[int]string{}
+       return func(prog string, args ...string) *exec.Cmd {
+               c.Logf("stubCommand: %q %q", prog, args)
+               if rand.Float64() < stub.errorRate {
+                       return exec.Command("bash", "-c", "echo >&2 'stub random failure' && false")
+               }
+               if stub.sudoUser != "" && len(args) > 3 &&
+                       prog == "sudo" &&
+                       args[0] == "-E" &&
+                       args[1] == "-u" &&
+                       args[2] == stub.sudoUser {
+                       prog, args = args[3], args[4:]
+               }
+               switch prog {
+               case "bsub":
+                       c.Assert(args, check.HasLen, 4)
+                       c.Check(args[0], check.Equals, "-J")
+                       switch args[1] {
+                       case arvadostest.LockedContainerUUID:
+                               c.Check(args, check.DeepEquals, []string{"-J", arvadostest.LockedContainerUUID, "-R", "rusage[mem=11701MB:tmp=0MB] affinity[core(4)]"})
+                               mtx.Lock()
+                               fakejobq[nextjobid] = args[1]
+                               nextjobid++
+                               mtx.Unlock()
+                       case arvadostest.QueuedContainerUUID:
+                               c.Check(args, check.DeepEquals, []string{"-J", arvadostest.QueuedContainerUUID, "-R", "rusage[mem=11701MB:tmp=45777MB] affinity[core(4)]"})
+                               mtx.Lock()
+                               fakejobq[nextjobid] = args[1]
+                               nextjobid++
+                               mtx.Unlock()
+                       default:
+                               c.Errorf("unexpected uuid passed to bsub: args %q", args)
+                               return exec.Command("false")
+                       }
+                       return exec.Command("echo", "submitted job")
+               case "bjobs":
+                       c.Check(args, check.DeepEquals, []string{"-u", "all", "-noheader", "-o", "jobid stat job_name:30"})
+                       out := ""
+                       for jobid, uuid := range fakejobq {
+                               out += fmt.Sprintf(`%d %s %s\n`, jobid, "RUN", uuid)
+                       }
+                       c.Logf("bjobs out: %q", out)
+                       return exec.Command("printf", out)
+               case "bkill":
+                       killid, _ := strconv.Atoi(args[0])
+                       if uuid, ok := fakejobq[killid]; !ok {
+                               return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: No matching job found\n'", killid))
+                       } else if uuid == "" {
+                               return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: Job has already finished\n'", killid))
+                       } else {
+                               go func() {
+                                       time.Sleep(time.Millisecond)
+                                       mtx.Lock()
+                                       delete(fakejobq, killid)
+                                       mtx.Unlock()
+                               }()
+                               return exec.Command("bash", "-c", fmt.Sprintf("printf 'Job <%d> is being terminated\n'", killid))
+                       }
+               default:
+                       return exec.Command("bash", "-c", fmt.Sprintf("echo >&2 'stub: command not found: %+q'", prog))
+               }
+       }
+}
+
+func (s *suite) TestSubmit(c *check.C) {
+       s.disp.lsfcli.stubCommand = lsfstub{
+               errorRate: 0.1,
+               sudoUser:  s.disp.Cluster.Containers.LSF.BsubSudoUser,
+       }.stubCommand(c)
+       s.disp.Start()
+       deadline := time.Now().Add(20 * time.Second)
+       for range time.NewTicker(time.Second).C {
+               if time.Now().After(deadline) {
+                       c.Error("timed out")
+                       break
+               }
+               // "queuedcontainer" should be running
+               if _, ok := s.disp.lsfqueue.JobID(arvadostest.QueuedContainerUUID); !ok {
+                       continue
+               }
+               // "lockedcontainer" should be cancelled because it
+               // has priority 0 (no matching container requests)
+               if _, ok := s.disp.lsfqueue.JobID(arvadostest.LockedContainerUUID); ok {
+                       continue
+               }
+               var ctr arvados.Container
+               if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
+                       c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
+                       continue
+               }
+               if ctr.State != arvados.ContainerStateQueued {
+                       c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State)
+                       continue
+               }
+               c.Log("reached desired state")
+               break
+       }
+}
diff --git a/lib/lsf/lsfcli.go b/lib/lsf/lsfcli.go
new file mode 100644 (file)
index 0000000..9d712ee
--- /dev/null
@@ -0,0 +1,92 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+       "bytes"
+       "fmt"
+       "os"
+       "os/exec"
+       "strings"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/sirupsen/logrus"
+)
+
+type bjobsEntry struct {
+       id   int
+       name string
+       stat string
+}
+
+type lsfcli struct {
+       logger logrus.FieldLogger
+       // (for testing) if non-nil, call stubCommand() instead of
+       // exec.Command() when running lsf command line programs.
+       stubCommand func(string, ...string) *exec.Cmd
+}
+
+func (cli lsfcli) command(prog string, args ...string) *exec.Cmd {
+       if f := cli.stubCommand; f != nil {
+               return f(prog, args...)
+       } else {
+               return exec.Command(prog, args...)
+       }
+}
+
+func (cli lsfcli) Bsub(script []byte, args []string, arv *arvados.Client) error {
+       cli.logger.Infof("bsub command %q script %q", args, script)
+       cmd := cli.command(args[0], args[1:]...)
+       cmd.Env = append([]string(nil), os.Environ()...)
+       cmd.Env = append(cmd.Env, "ARVADOS_API_HOST="+arv.APIHost)
+       cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arv.AuthToken)
+       if arv.Insecure {
+               cmd.Env = append(cmd.Env, "ARVADOS_API_HOST_INSECURE=1")
+       }
+       cmd.Stdin = bytes.NewReader(script)
+       out, err := cmd.Output()
+       cli.logger.WithField("stdout", string(out)).Infof("bsub finished")
+       return errWithStderr(err)
+}
+
+func (cli lsfcli) Bjobs() ([]bjobsEntry, error) {
+       cli.logger.Debugf("Bjobs()")
+       cmd := cli.command("bjobs", "-u", "all", "-noheader", "-o", "jobid stat job_name:30")
+       buf, err := cmd.Output()
+       if err != nil {
+               return nil, errWithStderr(err)
+       }
+       var bjobs []bjobsEntry
+       for _, line := range strings.Split(string(buf), "\n") {
+               if line == "" {
+                       continue
+               }
+               var ent bjobsEntry
+               if _, err := fmt.Sscan(line, &ent.id, &ent.stat, &ent.name); err != nil {
+                       cli.logger.Warnf("ignoring unparsed line in bjobs output: %q", line)
+                       continue
+               }
+               bjobs = append(bjobs, ent)
+       }
+       return bjobs, nil
+}
+
+func (cli lsfcli) Bkill(id int) error {
+       cli.logger.Infof("Bkill(%d)", id)
+       cmd := cli.command("bkill", fmt.Sprintf("%d", id))
+       buf, err := cmd.CombinedOutput()
+       if err == nil || strings.Index(string(buf), "already finished") >= 0 {
+               return nil
+       } else {
+               return fmt.Errorf("%s (%q)", err, buf)
+       }
+}
+
+func errWithStderr(err error) error {
+       if err, ok := err.(*exec.ExitError); ok {
+               return fmt.Errorf("%s (%q)", err, err.Stderr)
+       }
+       return err
+}
diff --git a/lib/lsf/lsfqueue.go b/lib/lsf/lsfqueue.go
new file mode 100644 (file)
index 0000000..3c4fc4c
--- /dev/null
@@ -0,0 +1,108 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package lsf
+
+import (
+       "sync"
+       "time"
+
+       "github.com/sirupsen/logrus"
+)
+
+type lsfqueue struct {
+       logger logrus.FieldLogger
+       period time.Duration
+       lsfcli *lsfcli
+
+       initOnce  sync.Once
+       mutex     sync.Mutex
+       nextReady chan (<-chan struct{})
+       updated   *sync.Cond
+       latest    map[string]bjobsEntry
+}
+
+// JobID waits for the next queue update (so even a job that was only
+// submitted a nanosecond ago will show up) and then returns the LSF
+// job ID corresponding to the given container UUID.
+func (q *lsfqueue) JobID(uuid string) (int, bool) {
+       ent, ok := q.getNext()[uuid]
+       return ent.id, ok
+}
+
+// All waits for the next queue update, then returns the names of all
+// jobs in the queue. Used by checkLsfQueueForOrphans().
+func (q *lsfqueue) All() []string {
+       latest := q.getNext()
+       names := make([]string, 0, len(latest))
+       for name := range latest {
+               names = append(names, name)
+       }
+       return names
+}
+
+func (q *lsfqueue) SetPriority(uuid string, priority int64) {
+       q.initOnce.Do(q.init)
+       q.logger.Debug("SetPriority is not implemented")
+}
+
+func (q *lsfqueue) getNext() map[string]bjobsEntry {
+       q.initOnce.Do(q.init)
+       <-(<-q.nextReady)
+       q.mutex.Lock()
+       defer q.mutex.Unlock()
+       return q.latest
+}
+
+func (q *lsfqueue) init() {
+       q.updated = sync.NewCond(&q.mutex)
+       q.nextReady = make(chan (<-chan struct{}))
+       ticker := time.NewTicker(time.Second)
+       go func() {
+               for range ticker.C {
+                       // Send a new "next update ready" channel to
+                       // the next goroutine that wants one (and any
+                       // others that have already queued up since
+                       // the first one started waiting).
+                       //
+                       // Below, when we get a new update, we'll
+                       // signal that to the other goroutines by
+                       // closing the ready chan.
+                       ready := make(chan struct{})
+                       q.nextReady <- ready
+                       for {
+                               select {
+                               case q.nextReady <- ready:
+                                       continue
+                               default:
+                               }
+                               break
+                       }
+                       // Run bjobs repeatedly if needed, until we
+                       // get valid output.
+                       var ents []bjobsEntry
+                       for {
+                               q.logger.Debug("running bjobs")
+                               var err error
+                               ents, err = q.lsfcli.Bjobs()
+                               if err == nil {
+                                       break
+                               }
+                               q.logger.Warnf("bjobs: %s", err)
+                               <-ticker.C
+                       }
+                       next := make(map[string]bjobsEntry, len(ents))
+                       for _, ent := range ents {
+                               next[ent.name] = ent
+                       }
+                       // Replace q.latest and notify all the
+                       // goroutines that the "next update" they
+                       // asked for is now ready.
+                       q.mutex.Lock()
+                       q.latest = next
+                       q.mutex.Unlock()
+                       close(ready)
+               }
+       }()
+}
index 6e59828a3cbf5656fef1e6c7fc790ca9d3b6268f..844991f41e03b219c5b1d1c0c537d5125d9b2413 100644 (file)
@@ -329,6 +329,7 @@ type Services struct {
        Composer       Service
        Controller     Service
        DispatchCloud  Service
+       DispatchLSF    Service
        GitHTTP        Service
        GitSSH         Service
        Health         Service
@@ -461,6 +462,10 @@ type ContainersConfig struct {
                        AssignNodeHostname     string
                }
        }
+       LSF struct {
+               BsubSudoUser      string
+               BsubArgumentsList []string
+       }
 }
 
 type CloudVMsConfig struct {
@@ -597,6 +602,7 @@ const (
        ServiceNameRailsAPI      ServiceName = "arvados-api-server"
        ServiceNameController    ServiceName = "arvados-controller"
        ServiceNameDispatchCloud ServiceName = "arvados-dispatch-cloud"
+       ServiceNameDispatchLSF   ServiceName = "arvados-dispatch-lsf"
        ServiceNameHealth        ServiceName = "arvados-health"
        ServiceNameWorkbench1    ServiceName = "arvados-workbench1"
        ServiceNameWorkbench2    ServiceName = "arvados-workbench2"
@@ -614,6 +620,7 @@ func (svcs Services) Map() map[ServiceName]Service {
                ServiceNameRailsAPI:      svcs.RailsAPI,
                ServiceNameController:    svcs.Controller,
                ServiceNameDispatchCloud: svcs.DispatchCloud,
+               ServiceNameDispatchLSF:   svcs.DispatchLSF,
                ServiceNameHealth:        svcs.Health,
                ServiceNameWorkbench1:    svcs.Workbench1,
                ServiceNameWorkbench2:    svcs.Workbench2,
index 4b7ad6dd59fa426e8b1e71c546ee43a851d99c54..3de4225d568a95324e5f81ecc34e8b2486433f8e 100644 (file)
@@ -45,6 +45,8 @@ const (
        QueuedContainerRequestUUID = "zzzzz-xvhdp-cr4queuedcontnr"
        QueuedContainerUUID        = "zzzzz-dz642-queuedcontainer"
 
+       LockedContainerUUID = "zzzzz-dz642-lockedcontainer"
+
        RunningContainerUUID = "zzzzz-dz642-runningcontainr"
 
        CompletedContainerUUID         = "zzzzz-dz642-compltcontainer"
index df43c2b10d9778d7c62befc8a3f7e71babacb168..00c75154f656a70e0b42deed7ef0e34fa7a01d7d 100644 (file)
@@ -7,11 +7,13 @@
 package dispatch
 
 import (
+       "bytes"
        "context"
        "fmt"
        "sync"
        "time"
 
+       "git.arvados.org/arvados.git/lib/dispatchcloud"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "github.com/sirupsen/logrus"
@@ -66,7 +68,7 @@ type Dispatcher struct {
 // running, and return.
 //
 // The DispatchFunc should not return until the container is finished.
-type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
+type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) error
 
 // Run watches the API server's queue for containers that are either
 // ready to run and available to lock, or are already locked by this
@@ -170,9 +172,34 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
        }
        tracker.updates <- c
        go func() {
-               d.RunContainer(d, c, tracker.updates)
-               // RunContainer blocks for the lifetime of the container.  When
-               // it returns, the tracker should delete itself.
+               err := d.RunContainer(d, c, tracker.updates)
+               if err != nil {
+                       text := fmt.Sprintf("Error running container %s: %s", c.UUID, err)
+                       if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
+                               var logBuf bytes.Buffer
+                               fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err)
+                               if len(err.AvailableTypes) == 0 {
+                                       fmt.Fprint(&logBuf, "No instance types are configured.\n")
+                               } else {
+                                       fmt.Fprint(&logBuf, "Available instance types:\n")
+                                       for _, t := range err.AvailableTypes {
+                                               fmt.Fprintf(&logBuf,
+                                                       "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
+                                                       t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price)
+                                       }
+                               }
+                               text = logBuf.String()
+                               d.UpdateState(c.UUID, Cancelled)
+                       }
+                       d.Logger.Printf("%s", text)
+                       lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+                               "object_uuid": c.UUID,
+                               "event_type":  "dispatch",
+                               "properties":  map[string]string{"text": text}}}
+                       d.Arv.Create("logs", lr, nil)
+                       d.Unlock(c.UUID)
+               }
+
                d.mtx.Lock()
                delete(d.trackers, c.UUID)
                d.mtx.Unlock()
index 25a4d2b87902531b913cc2203c8439a748469753..4b115229b403bc69aadd275a0177b00d4d171f79 100644 (file)
@@ -35,11 +35,12 @@ func (s *suite) TestTrackContainer(c *C) {
        time.AfterFunc(10*time.Second, func() { done <- false })
        d := &Dispatcher{
                Arv: arv,
-               RunContainer: func(dsp *Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+               RunContainer: func(dsp *Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
                        for ctr := range status {
                                c.Logf("%#v", ctr)
                        }
                        done <- true
+                       return nil
                },
        }
        d.TrackContainer(arvadostest.QueuedContainerUUID)
index 2acf3e59ab81ae10ff816577f5f33fdaea8b9922..04106caa442cfb52fbc098a516112e11b55643bb 100644 (file)
@@ -153,6 +153,7 @@ func (s *AggregatorSuite) setAllServiceURLs(listen string) {
        for _, svc := range []*arvados.Service{
                &svcs.Controller,
                &svcs.DispatchCloud,
+               &svcs.DispatchLSF,
                &svcs.Keepbalance,
                &svcs.Keepproxy,
                &svcs.Keepstore,
index c202e683f2810e85ab6fddda40793f021b3e0eff..a3cb1341a4677e7ecdc7c03976da7483e47c1aa5 100644 (file)
@@ -169,7 +169,7 @@ type LocalRun struct {
 // crunch-run terminates, mark the container as Cancelled.
 func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
        container arvados.Container,
-       status <-chan arvados.Container) {
+       status <-chan arvados.Container) error {
 
        uuid := container.UUID
 
@@ -179,7 +179,7 @@ func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
                case lr.concurrencyLimit <- true:
                        break
                case <-lr.ctx.Done():
-                       return
+                       return lr.ctx.Err()
                }
 
                defer func() { <-lr.concurrencyLimit }()
@@ -270,4 +270,5 @@ Finish:
        }
 
        dispatcher.Logger.Printf("finalized container %v", uuid)
+       return nil
 }
index d976bf0812950488b796cb063def8c960d128849..92b8d2adcd6fe22e20c66afc1d4f803521ccd545 100644 (file)
@@ -83,9 +83,9 @@ func (s *TestSuite) TestIntegration(c *C) {
 
        cl := arvados.Cluster{Containers: arvados.ContainersConfig{RuntimeEngine: "docker"}}
 
-       dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
-               (&LocalRun{startCmd, make(chan bool, 8), ctx, &cl}).run(d, c, s)
-               cancel()
+       dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) error {
+               defer cancel()
+               return (&LocalRun{startCmd, make(chan bool, 8), ctx, &cl}).run(d, c, s)
        }
 
        err = dispatcher.Run(ctx)
@@ -188,9 +188,9 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 
        cl := arvados.Cluster{Containers: arvados.ContainersConfig{RuntimeEngine: "docker"}}
 
-       dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
-               (&LocalRun{startCmd, make(chan bool, 8), ctx, &cl}).run(d, c, s)
-               cancel()
+       dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) error {
+               defer cancel()
+               return (&LocalRun{startCmd, make(chan bool, 8), ctx, &cl}).run(d, c, s)
        }
 
        re := regexp.MustCompile(`(?ms).*` + expected + `.*`)
index 2f2f013c714a0be6bf863cbf8329efae62e616b6..584db38edf7e93ac57ad8929ca31e04de907b78d 100644 (file)
@@ -7,7 +7,6 @@ package main
 // Dispatcher service for Crunch that submits containers to the slurm queue.
 
 import (
-       "bytes"
        "context"
        "flag"
        "fmt"
@@ -271,7 +270,7 @@ func (disp *Dispatcher) submit(container arvados.Container, crunchRunCommand []s
 // already in the queue).  Cancel the slurm job if the container's
 // priority changes to zero or its state indicates it's no longer
 // running.
-func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
 
@@ -279,38 +278,9 @@ func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
                log.Printf("Submitting container %s to slurm", ctr.UUID)
                cmd := []string{disp.cluster.Containers.CrunchRunCommand}
                cmd = append(cmd, disp.cluster.Containers.CrunchRunArgumentsList...)
-               if err := disp.submit(ctr, cmd); err != nil {
-                       var text string
-                       switch err := err.(type) {
-                       case dispatchcloud.ConstraintsNotSatisfiableError:
-                               var logBuf bytes.Buffer
-                               fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
-                               if len(err.AvailableTypes) == 0 {
-                                       fmt.Fprint(&logBuf, "No instance types are configured.\n")
-                               } else {
-                                       fmt.Fprint(&logBuf, "Available instance types:\n")
-                                       for _, t := range err.AvailableTypes {
-                                               fmt.Fprintf(&logBuf,
-                                                       "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
-                                                       t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
-                                               )
-                                       }
-                               }
-                               text = logBuf.String()
-                               disp.UpdateState(ctr.UUID, dispatch.Cancelled)
-                       default:
-                               text = fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
-                       }
-                       log.Print(text)
-
-                       lr := arvadosclient.Dict{"log": arvadosclient.Dict{
-                               "object_uuid": ctr.UUID,
-                               "event_type":  "dispatch",
-                               "properties":  map[string]string{"text": text}}}
-                       disp.Arv.Create("logs", lr, nil)
-
-                       disp.Unlock(ctr.UUID)
-                       return
+               err := disp.submit(ctr, cmd)
+               if err != nil {
+                       return err
                }
        }
 
@@ -339,7 +309,7 @@ func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
                        case dispatch.Locked:
                                disp.Unlock(ctr.UUID)
                        }
-                       return
+                       return nil
                case updated, ok := <-status:
                        if !ok {
                                log.Printf("container %s is done: cancel slurm job", ctr.UUID)
index 480434de65d291fad8ac2ac8ff8fbb6092ece327..e7a89db23c8743b3e2934a5c26f12a446a5bd6a9 100644 (file)
@@ -104,7 +104,7 @@ func (sf *slurmFake) Cancel(name string) error {
 
 func (s *IntegrationSuite) integrationTest(c *C,
        expectBatch [][]string,
-       runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
+       runContainer func(*dispatch.Dispatcher, arvados.Container)) (arvados.Container, error) {
        arvadostest.ResetEnv()
 
        arv, err := arvadosclient.MakeArvadosClient()
@@ -123,18 +123,21 @@ func (s *IntegrationSuite) integrationTest(c *C,
 
        ctx, cancel := context.WithCancel(context.Background())
        doneRun := make(chan struct{})
+       doneDispatch := make(chan error)
 
        s.disp.Dispatcher = &dispatch.Dispatcher{
                Arv:        arv,
                PollPeriod: time.Second,
-               RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+               RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
                        go func() {
                                runContainer(disp, ctr)
                                s.slurm.queue = ""
                                doneRun <- struct{}{}
                        }()
-                       s.disp.runContainer(disp, ctr, status)
+                       err := s.disp.runContainer(disp, ctr, status)
                        cancel()
+                       doneDispatch <- err
+                       return nil
                },
        }
 
@@ -148,6 +151,7 @@ func (s *IntegrationSuite) integrationTest(c *C,
        err = s.disp.Dispatcher.Run(ctx)
        <-doneRun
        c.Assert(err, Equals, context.Canceled)
+       errDispatch := <-doneDispatch
 
        s.disp.sqCheck.Stop()
 
@@ -162,12 +166,12 @@ func (s *IntegrationSuite) integrationTest(c *C,
        var container arvados.Container
        err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
        c.Check(err, IsNil)
-       return container
+       return container, errDispatch
 }
 
 func (s *IntegrationSuite) TestNormal(c *C) {
        s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 10000 100 PENDING Resources\n"}
-       container := s.integrationTest(c,
+       container, _ := s.integrationTest(c,
                nil,
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -181,7 +185,7 @@ func (s *IntegrationSuite) TestCancel(c *C) {
        s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 10000 100 PENDING Resources\n"}
        readyToCancel := make(chan bool)
        s.slurm.onCancel = func() { <-readyToCancel }
-       container := s.integrationTest(c,
+       container, _ := s.integrationTest(c,
                nil,
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -199,7 +203,7 @@ func (s *IntegrationSuite) TestCancel(c *C) {
 }
 
 func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
-       container := s.integrationTest(c,
+       container, _ := s.integrationTest(c,
                [][]string{{
                        fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
                        fmt.Sprintf("--nice=%d", 10000),
@@ -218,24 +222,14 @@ func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
 
 func (s *IntegrationSuite) TestSbatchFail(c *C) {
        s.slurm = slurmFake{errBatch: errors.New("something terrible happened")}
-       container := s.integrationTest(c,
+       container, err := s.integrationTest(c,
                [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--nice=10000", "--no-requeue", "--mem=11445", "--cpus-per-task=4", "--tmp=45777"}},
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
                        dispatcher.UpdateState(container.UUID, dispatch.Complete)
                })
        c.Check(container.State, Equals, arvados.ContainerStateComplete)
-
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, IsNil)
-
-       var ll arvados.LogList
-       err = arv.List("logs", arvadosclient.Dict{"filters": [][]string{
-               {"object_uuid", "=", container.UUID},
-               {"event_type", "=", "dispatch"},
-       }}, &ll)
-       c.Assert(err, IsNil)
-       c.Assert(len(ll.Items), Equals, 1)
+       c.Check(err, ErrorMatches, `something terrible happened`)
 }
 
 type StubbedSuite struct {
@@ -280,7 +274,7 @@ func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arva
        dispatcher := dispatch.Dispatcher{
                Arv:        arv,
                PollPeriod: time.Second,
-               RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+               RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
                        go func() {
                                time.Sleep(time.Second)
                                disp.UpdateState(ctr.UUID, dispatch.Running)
@@ -288,6 +282,7 @@ func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arva
                        }()
                        s.disp.runContainer(disp, ctr, status)
                        cancel()
+                       return nil
                },
        }