//
// SPDX-License-Identifier: AGPL-3.0
-package main
-
// Dispatcher service for Crunch that submits containers to the slurm queue.
+package dispatchslurm
import (
- "bytes"
"context"
- "flag"
+ "crypto/hmac"
+ "crypto/sha256"
"fmt"
"log"
"math"
+ "net/http"
"os"
"regexp"
"strings"
"time"
- "git.curoverse.com/arvados.git/lib/config"
- "git.curoverse.com/arvados.git/lib/dispatchcloud"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/dispatch"
+ "git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
+ "git.arvados.org/arvados.git/lib/ctrlctx"
+ "git.arvados.org/arvados.git/lib/dispatchcloud"
+ "git.arvados.org/arvados.git/lib/service"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "git.arvados.org/arvados.git/sdk/go/dispatch"
"github.com/coreos/go-systemd/daemon"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
- "gopkg.in/yaml.v2"
)
+var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchSLURM, newHandler)
+
+func newHandler(ctx context.Context, cluster *arvados.Cluster, _ string, _ *prometheus.Registry) service.Handler {
+ logger := ctxlog.FromContext(ctx)
+ disp := &Dispatcher{logger: logger, cluster: cluster}
+ if err := disp.configure(); err != nil {
+ return service.ErrorHandler(ctx, cluster, err)
+ }
+ disp.setup()
+ go func() {
+ disp.err = disp.run()
+ close(disp.done)
+ }()
+ return disp
+}
+
type logger interface {
dispatch.Logger
Fatalf(string, ...interface{})
const initialNiceValue int64 = 10000
-var (
- version = "dev"
-)
-
type Dispatcher struct {
*dispatch.Dispatcher
- logger logrus.FieldLogger
- cluster *arvados.Cluster
- sqCheck *SqueueChecker
- slurm Slurm
+ logger logrus.FieldLogger
+ cluster *arvados.Cluster
+ sqCheck *SqueueChecker
+ slurm Slurm
+ dbConnector ctrlctx.DBConnector
+
+ done chan struct{}
+ err error
Client arvados.Client
}
-func main() {
- logger := logrus.StandardLogger()
- if os.Getenv("DEBUG") != "" {
- logger.SetLevel(logrus.DebugLevel)
- }
- logger.Formatter = &logrus.JSONFormatter{
- TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
- }
- disp := &Dispatcher{logger: logger}
- err := disp.Run(os.Args[0], os.Args[1:])
- if err != nil {
- logrus.Fatalf("%s", err)
- }
+func (disp *Dispatcher) CheckHealth() error {
+ return disp.err
}
-func (disp *Dispatcher) Run(prog string, args []string) error {
- if err := disp.configure(prog, args); err != nil {
- return err
- }
- disp.setup()
- return disp.run()
+func (disp *Dispatcher) Done() <-chan struct{} {
+ return disp.done
+}
+
+func (disp *Dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ http.NotFound(w, r)
}
-// configure() loads config files. Tests skip this.
-func (disp *Dispatcher) configure(prog string, args []string) error {
+// configure() loads config files. Some tests skip this (see
+// StubbedSuite).
+func (disp *Dispatcher) configure() error {
if disp.logger == nil {
disp.logger = logrus.StandardLogger()
}
- flags := flag.NewFlagSet(prog, flag.ExitOnError)
- flags.Usage = func() { usage(flags) }
-
- loader := config.NewLoader(nil, disp.logger)
- loader.SetupFlags(flags)
-
- dumpConfig := flag.Bool(
- "dump-config",
- false,
- "write current configuration to stdout and exit")
- getVersion := flags.Bool(
- "version",
- false,
- "Print version information and exit.")
-
- args = loader.MungeLegacyConfigArgs(logrus.StandardLogger(), args, "-legacy-crunch-dispatch-slurm-config")
-
- // Parse args; omit the first arg which is the command name
- err := flags.Parse(args)
-
- if err == flag.ErrHelp {
- return nil
- }
-
- // Print version information if requested
- if *getVersion {
- fmt.Printf("crunch-dispatch-slurm %s\n", version)
- return nil
- }
-
- disp.logger.Printf("crunch-dispatch-slurm %s started", version)
-
- cfg, err := loader.Load()
- if err != nil {
- return err
- }
-
- if disp.cluster, err = cfg.GetCluster(""); err != nil {
- return fmt.Errorf("config error: %s", err)
- }
+ disp.logger = disp.logger.WithField("ClusterID", disp.cluster.ClusterID)
+ disp.logger.Printf("crunch-dispatch-slurm %s started", cmd.Version.String())
disp.Client.APIHost = disp.cluster.Services.Controller.ExternalURL.Host
disp.Client.AuthToken = disp.cluster.SystemRootToken
disp.Client.Insecure = disp.cluster.TLS.Insecure
+ disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.cluster.PostgreSQL}
if disp.Client.APIHost != "" || disp.Client.AuthToken != "" {
// Copy real configs into env vars so [a]
if disp.Client.Insecure {
os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
}
- os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(disp.Client.KeepServiceURIs, " "))
- os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
+ for k, v := range disp.cluster.Containers.SLURM.SbatchEnvironmentVariables {
+ os.Setenv(k, v)
+ }
} else {
disp.logger.Warnf("Client credentials missing from config, so falling back on environment variables (deprecated).")
}
-
- if *dumpConfig {
- out, err := yaml.Marshal(cfg)
- if err != nil {
- return err
- }
- _, err = os.Stdout.Write(out)
- if err != nil {
- return err
- }
- }
-
return nil
}
// setup() initializes private fields after configure().
func (disp *Dispatcher) setup() {
+ disp.done = make(chan struct{})
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
disp.logger.Fatalf("Error making Arvados client: %v", err)
}
func (disp *Dispatcher) run() error {
+ dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB)
+ defer dblock.Dispatch.Unlock()
defer disp.sqCheck.Stop()
if disp.cluster != nil && len(disp.cluster.InstanceTypes) > 0 {
// Cancelled or Complete. See https://dev.arvados.org/issues/10979
func (disp *Dispatcher) checkSqueueForOrphans() {
for _, uuid := range disp.sqCheck.All() {
- if !containerUuidPattern.MatchString(uuid) {
+ if !containerUuidPattern.MatchString(uuid) || !strings.HasPrefix(uuid, disp.cluster.ClusterID) {
continue
}
err := disp.TrackContainer(uuid)
if disp.cluster == nil {
// no instance types configured
args = append(args, disp.slurmConstraintArgs(container)...)
- } else if it, err := dispatchcloud.ChooseInstanceType(disp.cluster, &container); err == dispatchcloud.ErrInstanceTypesNotConfigured {
+ } else if types, err := dispatchcloud.ChooseInstanceType(disp.cluster, &container); err == dispatchcloud.ErrInstanceTypesNotConfigured {
// ditto
args = append(args, disp.slurmConstraintArgs(container)...)
} else if err != nil {
return nil, err
} else {
- // use instancetype constraint instead of slurm mem/cpu/tmp specs
- args = append(args, "--constraint=instancetype="+it.Name)
+ // use instancetype constraint instead of slurm
+ // mem/cpu/tmp specs (note types[0] is the lowest-cost
+ // suitable instance type)
+ args = append(args, "--constraint=instancetype="+types[0].Name)
}
if len(container.SchedulingParameters.Partitions) > 0 {
// append() here avoids modifying crunchRunCommand's
// underlying array, which is shared with other goroutines.
crArgs := append([]string(nil), crunchRunCommand...)
+ crArgs = append(crArgs, "--runtime-engine="+disp.cluster.Containers.RuntimeEngine)
crArgs = append(crArgs, container.UUID)
- crScript := strings.NewReader(execScript(crArgs))
+
+ h := hmac.New(sha256.New, []byte(disp.cluster.SystemRootToken))
+ fmt.Fprint(h, container.UUID)
+ authsecret := fmt.Sprintf("%x", h.Sum(nil))
+
+ crScript := strings.NewReader(execScript(crArgs, map[string]string{"GatewayAuthSecret": authsecret}))
sbArgs, err := disp.sbatchArgs(container)
if err != nil {
// already in the queue). Cancel the slurm job if the container's
// priority changes to zero or its state indicates it's no longer
// running.
-func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log.Printf("Submitting container %s to slurm", ctr.UUID)
cmd := []string{disp.cluster.Containers.CrunchRunCommand}
cmd = append(cmd, disp.cluster.Containers.CrunchRunArgumentsList...)
- if err := disp.submit(ctr, cmd); err != nil {
- var text string
- if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
- var logBuf bytes.Buffer
- fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
- if len(err.AvailableTypes) == 0 {
- fmt.Fprint(&logBuf, "No instance types are configured.\n")
- } else {
- fmt.Fprint(&logBuf, "Available instance types:\n")
- for _, t := range err.AvailableTypes {
- fmt.Fprintf(&logBuf,
- "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
- t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
- )
- }
- }
- text = logBuf.String()
- disp.UpdateState(ctr.UUID, dispatch.Cancelled)
- } else {
- text = fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
- }
- log.Print(text)
-
- lr := arvadosclient.Dict{"log": arvadosclient.Dict{
- "object_uuid": ctr.UUID,
- "event_type": "dispatch",
- "properties": map[string]string{"text": text}}}
- disp.Arv.Create("logs", lr, nil)
-
- disp.Unlock(ctr.UUID)
- return
+ err := disp.submit(ctr, cmd)
+ if err != nil {
+ return err
}
}
case dispatch.Locked:
disp.Unlock(ctr.UUID)
}
- return
+ return nil
case updated, ok := <-status:
if !ok {
log.Printf("container %s is done: cancel slurm job", ctr.UUID)