From ffa1fd1fdf584c71e248e9bb7d523f788a517510 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Fri, 2 Jul 2021 13:27:46 -0400 Subject: [PATCH] 17756: Add lsf dispatcher. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- .../arvados-dispatch-lsf.service | 30 ++ cmd/arvados-server/cmd.go | 2 + lib/config/config.default.yml | 12 + lib/config/export.go | 1 + lib/config/generated_config.go | 12 + lib/lsf/dispatch.go | 330 ++++++++++++++++++ lib/lsf/dispatch_test.go | 145 ++++++++ lib/lsf/lsfcli.go | 92 +++++ lib/lsf/lsfqueue.go | 81 +++++ sdk/go/arvados/config.go | 7 + sdk/go/arvadostest/fixtures.go | 2 + 11 files changed, 714 insertions(+) create mode 100644 cmd/arvados-server/arvados-dispatch-lsf.service create mode 100644 lib/lsf/dispatch.go create mode 100644 lib/lsf/dispatch_test.go create mode 100644 lib/lsf/lsfcli.go create mode 100644 lib/lsf/lsfqueue.go diff --git a/cmd/arvados-server/arvados-dispatch-lsf.service b/cmd/arvados-server/arvados-dispatch-lsf.service new file mode 100644 index 0000000000..f9e73a2c79 --- /dev/null +++ b/cmd/arvados-server/arvados-dispatch-lsf.service @@ -0,0 +1,30 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 + +[Unit] +Description=arvados-dispatch-lsf +Documentation=https://doc.arvados.org/ +After=network.target +AssertPathExists=/etc/arvados/config.yml + +# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section +StartLimitInterval=0 + +# systemd>=230 (debian:9) obeys StartLimitIntervalSec in the [Unit] section +StartLimitIntervalSec=0 + +[Service] +Type=notify +EnvironmentFile=-/etc/arvados/environment +ExecStart=/usr/bin/arvados-dispatch-lsf +# Set a reasonable default for the open file limit +LimitNOFILE=65536 +Restart=always +RestartSec=1 + +# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section +StartLimitInterval=0 + +[Install] +WantedBy=multi-user.target diff --git a/cmd/arvados-server/cmd.go b/cmd/arvados-server/cmd.go index d0aa9da94d..4b94a78138 100644 --- a/cmd/arvados-server/cmd.go +++ b/cmd/arvados-server/cmd.go @@ -15,6 +15,7 @@ import ( "git.arvados.org/arvados.git/lib/crunchrun" "git.arvados.org/arvados.git/lib/dispatchcloud" "git.arvados.org/arvados.git/lib/install" + "git.arvados.org/arvados.git/lib/lsf" "git.arvados.org/arvados.git/lib/recovercollection" "git.arvados.org/arvados.git/services/ws" ) @@ -33,6 +34,7 @@ var ( "controller": controller.Command, "crunch-run": crunchrun.Command, "dispatch-cloud": dispatchcloud.Command, + "dispatch-lsf": lsf.DispatchCommand, "install": install.Command, "init": install.InitCommand, "recover-collection": recovercollection.Command, diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml index e28d5cbb7f..975b4cd0f4 100644 --- a/lib/config/config.default.yml +++ b/lib/config/config.default.yml @@ -52,6 +52,9 @@ Clusters: DispatchCloud: InternalURLs: {SAMPLE: {}} ExternalURL: "-" + DispatchLSF: + InternalURLs: {SAMPLE: {}} + ExternalURL: "-" Keepproxy: InternalURLs: {SAMPLE: {}} ExternalURL: "" @@ -1012,6 +1015,15 @@ Clusters: # (See http://ruby-doc.org/core-2.2.2/Kernel.html#method-i-format for more.) AssignNodeHostname: "compute%d" + LSF: + # Additional arguments to bsub when submitting Arvados + # containers as LSF jobs. + BsubArgumentsList: [] + + # Use sudo to switch to this user account when submitting LSF + # jobs. + BsubSudoUser: "crunch" + JobsAPI: # Enable the legacy 'jobs' API (crunch v1). This value must be a string. # diff --git a/lib/config/export.go b/lib/config/export.go index 8753b52f27..7adb50ec37 100644 --- a/lib/config/export.go +++ b/lib/config/export.go @@ -120,6 +120,7 @@ var whitelist = map[string]bool{ "Containers.JobsAPI.GitInternalDir": false, "Containers.Logging": false, "Containers.LogReuseDecisions": false, + "Containers.LSF": false, "Containers.MaxComputeVMs": false, "Containers.MaxDispatchAttempts": false, "Containers.MaxRetryAttempts": true, diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go index b15bf7eebc..dec2505589 100644 --- a/lib/config/generated_config.go +++ b/lib/config/generated_config.go @@ -58,6 +58,9 @@ Clusters: DispatchCloud: InternalURLs: {SAMPLE: {}} ExternalURL: "-" + DispatchLSF: + InternalURLs: {SAMPLE: {}} + ExternalURL: "-" Keepproxy: InternalURLs: {SAMPLE: {}} ExternalURL: "" @@ -1018,6 +1021,15 @@ Clusters: # (See http://ruby-doc.org/core-2.2.2/Kernel.html#method-i-format for more.) AssignNodeHostname: "compute%d" + LSF: + # Additional arguments to bsub when submitting Arvados + # containers as LSF jobs. + BsubArgumentsList: [] + + # Use sudo to switch to this user account when submitting LSF + # jobs. + BsubSudoUser: "crunch" + JobsAPI: # Enable the legacy 'jobs' API (crunch v1). This value must be a string. # diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go new file mode 100644 index 0000000000..ff95d0db29 --- /dev/null +++ b/lib/lsf/dispatch.go @@ -0,0 +1,330 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package lsf + +import ( + "bytes" + "context" + "errors" + "fmt" + "math" + "net/http" + "strings" + "sync" + "time" + + "git.arvados.org/arvados.git/lib/cmd" + "git.arvados.org/arvados.git/lib/dispatchcloud" + "git.arvados.org/arvados.git/lib/service" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/sdk/go/auth" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "git.arvados.org/arvados.git/sdk/go/dispatch" + "git.arvados.org/arvados.git/sdk/go/health" + "github.com/julienschmidt/httprouter" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" +) + +var DispatchCommand cmd.Handler = service.Command(arvados.ServiceNameDispatchLSF, newHandler) + +func newHandler(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler { + ac, err := arvados.NewClientFromConfig(cluster) + if err != nil { + return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err)) + } + d := &dispatcher{ + Cluster: cluster, + Context: ctx, + ArvClient: ac, + AuthToken: token, + Registry: reg, + } + go d.Start() + return d +} + +type dispatcher struct { + Cluster *arvados.Cluster + Context context.Context + ArvClient *arvados.Client + AuthToken string + Registry *prometheus.Registry + + logger logrus.FieldLogger + lsfcli lsfcli + lsfqueue lsfqueue + arvDispatcher *dispatch.Dispatcher + httpHandler http.Handler + + initOnce sync.Once + stop chan struct{} + stopped chan struct{} +} + +// Start starts the dispatcher. Start can be called multiple times +// with no ill effect. +func (disp *dispatcher) Start() { + disp.initOnce.Do(func() { + disp.init() + go func() { + disp.checkLsfQueueForOrphans() + err := disp.arvDispatcher.Run(disp.Context) + if err != nil { + disp.logger.Error(err) + disp.Close() + } + }() + }) +} + +// ServeHTTP implements service.Handler. +func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) { + disp.Start() + disp.httpHandler.ServeHTTP(w, r) +} + +// CheckHealth implements service.Handler. +func (disp *dispatcher) CheckHealth() error { + disp.Start() + select { + case <-disp.stopped: + return errors.New("stopped") + default: + return nil + } +} + +// Done implements service.Handler. +func (disp *dispatcher) Done() <-chan struct{} { + return disp.stopped +} + +// Stop dispatching containers and release resources. Used by tests. +func (disp *dispatcher) Close() { + disp.Start() + select { + case disp.stop <- struct{}{}: + default: + } + <-disp.stopped +} + +func (disp *dispatcher) init() { + disp.logger = ctxlog.FromContext(disp.Context) + disp.lsfcli.logger = disp.logger + disp.lsfqueue = lsfqueue{ + logger: disp.logger, + period: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval), + lsfcli: &disp.lsfcli, + } + disp.ArvClient.AuthToken = disp.AuthToken + disp.stop = make(chan struct{}, 1) + disp.stopped = make(chan struct{}) + + arv, err := arvadosclient.New(disp.ArvClient) + if err != nil { + disp.logger.Fatalf("Error making Arvados client: %v", err) + } + arv.Retries = 25 + arv.ApiToken = disp.AuthToken + disp.arvDispatcher = &dispatch.Dispatcher{ + Arv: arv, + Logger: disp.logger, + BatchSize: disp.Cluster.API.MaxItemsPerResponse, + RunContainer: disp.runContainer, + PollPeriod: time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval), + MinRetryPeriod: time.Duration(disp.Cluster.Containers.MinRetryPeriod), + } + + if disp.Cluster.ManagementToken == "" { + disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "Management API authentication is not configured", http.StatusForbidden) + }) + } else { + mux := httprouter.New() + metricsH := promhttp.HandlerFor(disp.Registry, promhttp.HandlerOpts{ + ErrorLog: disp.logger, + }) + mux.Handler("GET", "/metrics", metricsH) + mux.Handler("GET", "/metrics.json", metricsH) + mux.Handler("GET", "/_health/:check", &health.Handler{ + Token: disp.Cluster.ManagementToken, + Prefix: "/_health/", + Routes: health.Routes{"ping": disp.CheckHealth}, + }) + disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux) + } +} + +func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) { + ctx, cancel := context.WithCancel(disp.Context) + defer cancel() + + if ctr.State != dispatch.Locked { + // already started by prior invocation + } else if _, ok := disp.lsfqueue.JobID(ctr.UUID); !ok { + disp.logger.Printf("Submitting container %s to LSF", ctr.UUID) + cmd := []string{disp.Cluster.Containers.CrunchRunCommand} + cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine) + cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...) + if err := disp.submit(ctr, cmd); err != nil { + var text string + switch err := err.(type) { + case dispatchcloud.ConstraintsNotSatisfiableError: + var logBuf bytes.Buffer + fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err) + if len(err.AvailableTypes) == 0 { + fmt.Fprint(&logBuf, "No instance types are configured.\n") + } else { + fmt.Fprint(&logBuf, "Available instance types:\n") + for _, t := range err.AvailableTypes { + fmt.Fprintf(&logBuf, + "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n", + t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price, + ) + } + } + text = logBuf.String() + disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled) + default: + text = fmt.Sprintf("Error submitting container %s to LSF: %s", ctr.UUID, err) + } + disp.logger.Print(text) + + lr := arvadosclient.Dict{"log": arvadosclient.Dict{ + "object_uuid": ctr.UUID, + "event_type": "dispatch", + "properties": map[string]string{"text": text}}} + disp.arvDispatcher.Arv.Create("logs", lr, nil) + + disp.arvDispatcher.Unlock(ctr.UUID) + return + } + } + + disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State) + defer disp.logger.Printf("Done monitoring container %s", ctr.UUID) + + // If the container disappears from the lsf queue, there is + // no point in waiting for further dispatch updates: just + // clean up and return. + go func(uuid string) { + for ctx.Err() == nil { + if _, ok := disp.lsfqueue.JobID(uuid); !ok { + disp.logger.Printf("container %s job disappeared from LSF queue", uuid) + cancel() + return + } + } + }(ctr.UUID) + + for done := false; !done; { + select { + case <-ctx.Done(): + // Disappeared from lsf queue + if err := disp.arvDispatcher.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil { + disp.logger.Printf("error getting final container state for %s: %s", ctr.UUID, err) + } + switch ctr.State { + case dispatch.Running: + disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled) + case dispatch.Locked: + disp.arvDispatcher.Unlock(ctr.UUID) + } + return + case updated, ok := <-status: + if !ok { + done = true + break + } + if updated.State != ctr.State { + disp.logger.Infof("container %s changed state from %s to %s", ctr.UUID, ctr.State, updated.State) + } + ctr = updated + if ctr.Priority == 0 { + disp.logger.Printf("container %s has state %s, priority %d: cancel lsf job", ctr.UUID, ctr.State, ctr.Priority) + disp.bkill(ctr) + } else { + disp.lsfqueue.SetPriority(ctr.UUID, int64(ctr.Priority)) + } + } + } + disp.logger.Printf("container %s is done", ctr.UUID) + + // Try "bkill" every few seconds until the LSF job disappears + // from the queue. + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for jobid, ok := disp.lsfqueue.JobID(ctr.UUID); ok; _, ok = disp.lsfqueue.JobID(ctr.UUID) { + err := disp.lsfcli.Bkill(jobid) + if err != nil { + disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err) + } + <-ticker.C + } +} + +func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error { + // Start with an empty slice here to ensure append() doesn't + // modify crunchRunCommand's underlying array + var crArgs []string + crArgs = append(crArgs, crunchRunCommand...) + crArgs = append(crArgs, container.UUID) + crScript := execScript(crArgs) + + bsubArgs, err := disp.bsubArgs(container) + if err != nil { + return err + } + return disp.lsfcli.Bsub(crScript, bsubArgs, disp.ArvClient) +} + +func (disp *dispatcher) bkill(ctr arvados.Container) { + if jobid, ok := disp.lsfqueue.JobID(ctr.UUID); !ok { + disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID) + } else if err := disp.lsfcli.Bkill(jobid); err != nil { + disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err) + } +} + +func (disp *dispatcher) bsubArgs(container arvados.Container) ([]string, error) { + args := []string{"bsub"} + args = append(args, disp.Cluster.Containers.LSF.BsubArgumentsList...) + args = append(args, "-J", container.UUID) + args = append(args, disp.bsubConstraintArgs(container)...) + if u := disp.Cluster.Containers.LSF.BsubSudoUser; u != "" { + args = append([]string{"sudo", "-E", "-u", u}, args...) + } + return args, nil +} + +func (disp *dispatcher) bsubConstraintArgs(container arvados.Container) []string { + // TODO: propagate container.SchedulingParameters.Partitions + tmp := int64(math.Ceil(float64(dispatchcloud.EstimateScratchSpace(&container)) / 1048576)) + vcpus := container.RuntimeConstraints.VCPUs + mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+ + container.RuntimeConstraints.KeepCacheRAM+ + int64(disp.Cluster.Containers.ReserveExtraRAM)) / 1048576)) + return []string{ + "-R", fmt.Sprintf("rusage[mem=%dMB:tmp=%dMB] affinity[core(%d)]", mem, tmp, vcpus), + } +} + +func (disp *dispatcher) checkLsfQueueForOrphans() { + disp.logger.Warn("FIXME: checkLsfQueueForOrphans") +} + +func execScript(args []string) []byte { + s := "#!/bin/sh\nexec" + for _, w := range args { + s += ` '` + s += strings.Replace(w, `'`, `'\''`, -1) + s += `'` + } + return []byte(s + "\n") +} diff --git a/lib/lsf/dispatch_test.go b/lib/lsf/dispatch_test.go new file mode 100644 index 0000000000..25db95e10e --- /dev/null +++ b/lib/lsf/dispatch_test.go @@ -0,0 +1,145 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package lsf + +import ( + "context" + "fmt" + "math/rand" + "os/exec" + "strconv" + "sync" + "testing" + "time" + + "git.arvados.org/arvados.git/lib/config" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadostest" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "github.com/prometheus/client_golang/prometheus" + "gopkg.in/check.v1" +) + +func Test(t *testing.T) { + check.TestingT(t) +} + +var _ = check.Suite(&suite{}) + +type suite struct { + disp *dispatcher +} + +func (s *suite) TearDownTest(c *check.C) { + arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil) +} + +func (s *suite) SetUpTest(c *check.C) { + cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load() + c.Assert(err, check.IsNil) + cluster, err := cfg.GetCluster("") + c.Assert(err, check.IsNil) + cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second) + s.disp = newHandler(context.Background(), cluster, arvadostest.Dispatch1Token, prometheus.NewRegistry()).(*dispatcher) + s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd { + return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false") + } +} + +type lsfstub struct { + errorRate float64 +} + +func (stub lsfstub) stubCommand(c *check.C) func(prog string, args ...string) *exec.Cmd { + mtx := sync.Mutex{} + nextjobid := 100 + fakejobq := map[int]string{} + return func(prog string, args ...string) *exec.Cmd { + c.Logf("stubCommand: %q %q", prog, args) + if rand.Float64() < stub.errorRate { + return exec.Command("bash", "-c", "echo >&2 'stub random failure' && false") + } + switch prog { + case "bsub": + c.Assert(args, check.HasLen, 4) + c.Check(args[0], check.Equals, "-J") + switch args[1] { + case arvadostest.LockedContainerUUID: + c.Check(args, check.DeepEquals, []string{"-J", arvadostest.LockedContainerUUID, "-R", "rusage[mem=11701MB:tmp=0MB] affinity[core(4)]"}) + mtx.Lock() + fakejobq[nextjobid] = args[1] + nextjobid++ + mtx.Unlock() + case arvadostest.QueuedContainerUUID: + c.Check(args, check.DeepEquals, []string{"-J", arvadostest.QueuedContainerUUID, "-R", "rusage[mem=11701MB:tmp=45777MB] affinity[core(4)]"}) + mtx.Lock() + fakejobq[nextjobid] = args[1] + nextjobid++ + mtx.Unlock() + default: + c.Errorf("unexpected uuid passed to bsub: args %q", args) + return exec.Command("false") + } + return exec.Command("echo", "submitted job") + case "bjobs": + c.Check(args, check.DeepEquals, []string{"-noheader", "-o", "jobid stat job_name:30"}) + out := "" + for jobid, uuid := range fakejobq { + out += fmt.Sprintf(`%d %s %s\n`, jobid, "RUN", uuid) + } + c.Logf("bjobs out: %q", out) + return exec.Command("printf", out) + case "bkill": + killid, _ := strconv.Atoi(args[0]) + if uuid, ok := fakejobq[killid]; !ok { + return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: No matching job found\n'", killid)) + } else if uuid == "" { + return exec.Command("bash", "-c", fmt.Sprintf("printf >&2 'Job <%d>: Job has already finished\n'", killid)) + } else { + go func() { + time.Sleep(time.Millisecond) + mtx.Lock() + delete(fakejobq, killid) + mtx.Unlock() + }() + return exec.Command("bash", "-c", fmt.Sprintf("printf 'Job <%d> is being terminated\n'", killid)) + } + default: + return exec.Command("bash", "-c", fmt.Sprintf("echo >&2 'stub: command not found: %+q'", prog)) + } + } +} + +func (s *suite) TestSubmit(c *check.C) { + s.disp.lsfcli.stubCommand = lsfstub{errorRate: 0.1}.stubCommand(c) + s.disp.Start() + deadline := time.Now().Add(20 * time.Second) + for range time.NewTicker(time.Second).C { + if time.Now().After(deadline) { + c.Error("timed out") + break + } + // "queuedcontainer" should be running + if _, ok := s.disp.lsfqueue.JobID(arvadostest.QueuedContainerUUID); !ok { + continue + } + // "lockedcontainer" should be cancelled because it + // has priority 0 (no matching container requests) + if _, ok := s.disp.lsfqueue.JobID(arvadostest.LockedContainerUUID); ok { + continue + } + var ctr arvados.Container + if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil { + c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err) + continue + } + if ctr.State != arvados.ContainerStateQueued { + c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State) + continue + } + c.Log("reached desired state") + break + } +} diff --git a/lib/lsf/lsfcli.go b/lib/lsf/lsfcli.go new file mode 100644 index 0000000000..9d712ee97f --- /dev/null +++ b/lib/lsf/lsfcli.go @@ -0,0 +1,92 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package lsf + +import ( + "bytes" + "fmt" + "os" + "os/exec" + "strings" + + "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/sirupsen/logrus" +) + +type bjobsEntry struct { + id int + name string + stat string +} + +type lsfcli struct { + logger logrus.FieldLogger + // (for testing) if non-nil, call stubCommand() instead of + // exec.Command() when running lsf command line programs. + stubCommand func(string, ...string) *exec.Cmd +} + +func (cli lsfcli) command(prog string, args ...string) *exec.Cmd { + if f := cli.stubCommand; f != nil { + return f(prog, args...) + } else { + return exec.Command(prog, args...) + } +} + +func (cli lsfcli) Bsub(script []byte, args []string, arv *arvados.Client) error { + cli.logger.Infof("bsub command %q script %q", args, script) + cmd := cli.command(args[0], args[1:]...) + cmd.Env = append([]string(nil), os.Environ()...) + cmd.Env = append(cmd.Env, "ARVADOS_API_HOST="+arv.APIHost) + cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arv.AuthToken) + if arv.Insecure { + cmd.Env = append(cmd.Env, "ARVADOS_API_HOST_INSECURE=1") + } + cmd.Stdin = bytes.NewReader(script) + out, err := cmd.Output() + cli.logger.WithField("stdout", string(out)).Infof("bsub finished") + return errWithStderr(err) +} + +func (cli lsfcli) Bjobs() ([]bjobsEntry, error) { + cli.logger.Debugf("Bjobs()") + cmd := cli.command("bjobs", "-u", "all", "-noheader", "-o", "jobid stat job_name:30") + buf, err := cmd.Output() + if err != nil { + return nil, errWithStderr(err) + } + var bjobs []bjobsEntry + for _, line := range strings.Split(string(buf), "\n") { + if line == "" { + continue + } + var ent bjobsEntry + if _, err := fmt.Sscan(line, &ent.id, &ent.stat, &ent.name); err != nil { + cli.logger.Warnf("ignoring unparsed line in bjobs output: %q", line) + continue + } + bjobs = append(bjobs, ent) + } + return bjobs, nil +} + +func (cli lsfcli) Bkill(id int) error { + cli.logger.Infof("Bkill(%d)", id) + cmd := cli.command("bkill", fmt.Sprintf("%d", id)) + buf, err := cmd.CombinedOutput() + if err == nil || strings.Index(string(buf), "already finished") >= 0 { + return nil + } else { + return fmt.Errorf("%s (%q)", err, buf) + } +} + +func errWithStderr(err error) error { + if err, ok := err.(*exec.ExitError); ok { + return fmt.Errorf("%s (%q)", err, err.Stderr) + } + return err +} diff --git a/lib/lsf/lsfqueue.go b/lib/lsf/lsfqueue.go new file mode 100644 index 0000000000..65f38690c3 --- /dev/null +++ b/lib/lsf/lsfqueue.go @@ -0,0 +1,81 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package lsf + +import ( + "sync" + "time" + + "github.com/sirupsen/logrus" +) + +type lsfqueue struct { + logger logrus.FieldLogger + period time.Duration + lsfcli *lsfcli + + initOnce sync.Once + mutex sync.Mutex + needUpdate chan bool + updated *sync.Cond + latest map[string]bjobsEntry +} + +// JobID waits for the next queue update (so even a job that was only +// submitted a nanosecond ago will show up) and then returns the LSF +// job ID corresponding to the given container UUID. +func (q *lsfqueue) JobID(uuid string) (int, bool) { + q.initOnce.Do(q.init) + q.mutex.Lock() + defer q.mutex.Unlock() + select { + case q.needUpdate <- true: + default: + // an update is already pending + } + q.updated.Wait() + ent, ok := q.latest[uuid] + q.logger.Debugf("JobID(%q) == %d", uuid, ent.id) + return ent.id, ok +} + +func (q *lsfqueue) SetPriority(uuid string, priority int64) { + q.initOnce.Do(q.init) + q.logger.Debug("SetPriority is not implemented") +} + +func (q *lsfqueue) init() { + q.updated = sync.NewCond(&q.mutex) + q.needUpdate = make(chan bool, 1) + ticker := time.NewTicker(time.Second) + go func() { + for range q.needUpdate { + q.logger.Debug("running bjobs") + ents, err := q.lsfcli.Bjobs() + if err != nil { + q.logger.Warnf("bjobs: %s", err) + // Retry on the next tick, don't wait + // for another new call to JobID(). + select { + case q.needUpdate <- true: + default: + } + <-ticker.C + continue + } + next := make(map[string]bjobsEntry, len(ents)) + for _, ent := range ents { + next[ent.name] = ent + } + q.mutex.Lock() + q.latest = next + q.updated.Broadcast() + q.logger.Debugf("waking up waiters with latest %v", q.latest) + q.mutex.Unlock() + // Limit "bjobs" invocations to 1 per second + <-ticker.C + } + }() +} diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go index 6e59828a3c..844991f41e 100644 --- a/sdk/go/arvados/config.go +++ b/sdk/go/arvados/config.go @@ -329,6 +329,7 @@ type Services struct { Composer Service Controller Service DispatchCloud Service + DispatchLSF Service GitHTTP Service GitSSH Service Health Service @@ -461,6 +462,10 @@ type ContainersConfig struct { AssignNodeHostname string } } + LSF struct { + BsubSudoUser string + BsubArgumentsList []string + } } type CloudVMsConfig struct { @@ -597,6 +602,7 @@ const ( ServiceNameRailsAPI ServiceName = "arvados-api-server" ServiceNameController ServiceName = "arvados-controller" ServiceNameDispatchCloud ServiceName = "arvados-dispatch-cloud" + ServiceNameDispatchLSF ServiceName = "arvados-dispatch-lsf" ServiceNameHealth ServiceName = "arvados-health" ServiceNameWorkbench1 ServiceName = "arvados-workbench1" ServiceNameWorkbench2 ServiceName = "arvados-workbench2" @@ -614,6 +620,7 @@ func (svcs Services) Map() map[ServiceName]Service { ServiceNameRailsAPI: svcs.RailsAPI, ServiceNameController: svcs.Controller, ServiceNameDispatchCloud: svcs.DispatchCloud, + ServiceNameDispatchLSF: svcs.DispatchLSF, ServiceNameHealth: svcs.Health, ServiceNameWorkbench1: svcs.Workbench1, ServiceNameWorkbench2: svcs.Workbench2, diff --git a/sdk/go/arvadostest/fixtures.go b/sdk/go/arvadostest/fixtures.go index 4b7ad6dd59..3de4225d56 100644 --- a/sdk/go/arvadostest/fixtures.go +++ b/sdk/go/arvadostest/fixtures.go @@ -45,6 +45,8 @@ const ( QueuedContainerRequestUUID = "zzzzz-xvhdp-cr4queuedcontnr" QueuedContainerUUID = "zzzzz-dz642-queuedcontainer" + LockedContainerUUID = "zzzzz-dz642-lockedcontainer" + RunningContainerUUID = "zzzzz-dz642-runningcontainr" CompletedContainerUUID = "zzzzz-dz642-compltcontainer" -- 2.30.2