//
// SPDX-License-Identifier: AGPL-3.0
-package main
-
// Dispatcher service for Crunch that submits containers to the slurm queue.
+package dispatchslurm
import (
- "bytes"
"context"
- "flag"
+ "crypto/hmac"
+ "crypto/sha256"
"fmt"
"log"
"math"
+ "net/http"
"os"
"regexp"
"strings"
"time"
- "git.curoverse.com/arvados.git/lib/dispatchcloud"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/config"
- "git.curoverse.com/arvados.git/sdk/go/dispatch"
- "github.com/Sirupsen/logrus"
+ "git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
+ "git.arvados.org/arvados.git/lib/ctrlctx"
+ "git.arvados.org/arvados.git/lib/dispatchcloud"
+ "git.arvados.org/arvados.git/lib/service"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "git.arvados.org/arvados.git/sdk/go/dispatch"
"github.com/coreos/go-systemd/daemon"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
)
+var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchSLURM, newHandler)
+
+func newHandler(ctx context.Context, cluster *arvados.Cluster, _ string, _ *prometheus.Registry) service.Handler {
+ logger := ctxlog.FromContext(ctx)
+ disp := &Dispatcher{logger: logger, cluster: cluster}
+ if err := disp.configure(); err != nil {
+ return service.ErrorHandler(ctx, cluster, err)
+ }
+ disp.setup()
+ go func() {
+ disp.err = disp.run()
+ close(disp.done)
+ }()
+ return disp
+}
+
type logger interface {
dispatch.Logger
Fatalf(string, ...interface{})
const initialNiceValue int64 = 10000
-var (
- version = "dev"
- defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
-)
-
type Dispatcher struct {
*dispatch.Dispatcher
- logger logrus.FieldLogger
- cluster *arvados.Cluster
- sqCheck *SqueueChecker
- slurm Slurm
-
- Client arvados.Client
-
- SbatchArguments []string
- PollPeriod arvados.Duration
- PrioritySpread int64
+ logger logrus.FieldLogger
+ cluster *arvados.Cluster
+ sqCheck *SqueueChecker
+ slurm Slurm
+ dbConnector ctrlctx.DBConnector
- // crunch-run command to invoke. The container UUID will be
- // appended. If nil, []string{"crunch-run"} will be used.
- //
- // Example: []string{"crunch-run", "--cgroup-parent-subsystem=memory"}
- CrunchRunCommand []string
+ done chan struct{}
+ err error
- // Extra RAM to reserve (in Bytes) for SLURM job, in addition
- // to the amount specified in the container's RuntimeConstraints
- ReserveExtraRAM int64
-
- // Minimum time between two attempts to run the same container
- MinRetryPeriod arvados.Duration
-
- // Batch size for container queries
- BatchSize int64
+ Client arvados.Client
}
-func main() {
- logger := logrus.StandardLogger()
- if os.Getenv("DEBUG") != "" {
- logger.SetLevel(logrus.DebugLevel)
- }
- logger.Formatter = &logrus.JSONFormatter{
- TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
- }
- disp := &Dispatcher{logger: logger}
- err := disp.Run(os.Args[0], os.Args[1:])
- if err != nil {
- logrus.Fatalf("%s", err)
- }
+func (disp *Dispatcher) CheckHealth() error {
+ return disp.err
}
-func (disp *Dispatcher) Run(prog string, args []string) error {
- if err := disp.configure(prog, args); err != nil {
- return err
- }
- disp.setup()
- return disp.run()
+func (disp *Dispatcher) Done() <-chan struct{} {
+ return disp.done
}
-// configure() loads config files. Tests skip this.
-func (disp *Dispatcher) configure(prog string, args []string) error {
- flags := flag.NewFlagSet(prog, flag.ExitOnError)
- flags.Usage = func() { usage(flags) }
-
- configPath := flags.String(
- "config",
- defaultConfigPath,
- "`path` to JSON or YAML configuration file")
- dumpConfig := flag.Bool(
- "dump-config",
- false,
- "write current configuration to stdout and exit")
- getVersion := flags.Bool(
- "version",
- false,
- "Print version information and exit.")
- // Parse args; omit the first arg which is the command name
- flags.Parse(args)
-
- // Print version information if requested
- if *getVersion {
- fmt.Printf("crunch-dispatch-slurm %s\n", version)
- return nil
- }
-
- disp.logger.Printf("crunch-dispatch-slurm %s started", version)
-
- err := disp.readConfig(*configPath)
- if err != nil {
- return err
- }
+func (disp *Dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ http.NotFound(w, r)
+}
- if disp.CrunchRunCommand == nil {
- disp.CrunchRunCommand = []string{"crunch-run"}
+// configure() loads config files. Some tests skip this (see
+// StubbedSuite).
+func (disp *Dispatcher) configure() error {
+ if disp.logger == nil {
+ disp.logger = logrus.StandardLogger()
}
+ disp.logger = disp.logger.WithField("ClusterID", disp.cluster.ClusterID)
+ disp.logger.Printf("crunch-dispatch-slurm %s started", cmd.Version.String())
- if disp.PollPeriod == 0 {
- disp.PollPeriod = arvados.Duration(10 * time.Second)
- }
+ disp.Client.APIHost = disp.cluster.Services.Controller.ExternalURL.Host
+ disp.Client.AuthToken = disp.cluster.SystemRootToken
+ disp.Client.Insecure = disp.cluster.TLS.Insecure
+ disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.cluster.PostgreSQL}
if disp.Client.APIHost != "" || disp.Client.AuthToken != "" {
// Copy real configs into env vars so [a]
if disp.Client.Insecure {
os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
}
- os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(disp.Client.KeepServiceURIs, " "))
- os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
+ for k, v := range disp.cluster.Containers.SLURM.SbatchEnvironmentVariables {
+ os.Setenv(k, v)
+ }
} else {
disp.logger.Warnf("Client credentials missing from config, so falling back on environment variables (deprecated).")
}
-
- if *dumpConfig {
- return config.DumpAndExit(disp)
- }
-
- siteConfig, err := arvados.GetConfig(arvados.DefaultConfigFile)
- if os.IsNotExist(err) {
- disp.logger.Warnf("no cluster config (%s), proceeding with no node types defined", err)
- } else if err != nil {
- return fmt.Errorf("error loading config: %s", err)
- } else if disp.cluster, err = siteConfig.GetCluster(""); err != nil {
- return fmt.Errorf("config error: %s", err)
- }
-
return nil
}
// setup() initializes private fields after configure().
func (disp *Dispatcher) setup() {
- if disp.logger == nil {
- disp.logger = logrus.StandardLogger()
- }
+ disp.done = make(chan struct{})
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
disp.logger.Fatalf("Error making Arvados client: %v", err)
disp.slurm = NewSlurmCLI()
disp.sqCheck = &SqueueChecker{
Logger: disp.logger,
- Period: time.Duration(disp.PollPeriod),
- PrioritySpread: disp.PrioritySpread,
+ Period: time.Duration(disp.cluster.Containers.CloudVMs.PollInterval),
+ PrioritySpread: disp.cluster.Containers.SLURM.PrioritySpread,
Slurm: disp.slurm,
}
disp.Dispatcher = &dispatch.Dispatcher{
Arv: arv,
Logger: disp.logger,
- BatchSize: disp.BatchSize,
+ BatchSize: disp.cluster.API.MaxItemsPerResponse,
RunContainer: disp.runContainer,
- PollPeriod: time.Duration(disp.PollPeriod),
- MinRetryPeriod: time.Duration(disp.MinRetryPeriod),
+ PollPeriod: time.Duration(disp.cluster.Containers.CloudVMs.PollInterval),
+ MinRetryPeriod: time.Duration(disp.cluster.Containers.MinRetryPeriod),
}
}
func (disp *Dispatcher) run() error {
+ dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB)
+ defer dblock.Dispatch.Unlock()
defer disp.sqCheck.Stop()
if disp.cluster != nil && len(disp.cluster.InstanceTypes) > 0 {
// Cancelled or Complete. See https://dev.arvados.org/issues/10979
func (disp *Dispatcher) checkSqueueForOrphans() {
for _, uuid := range disp.sqCheck.All() {
- if !containerUuidPattern.MatchString(uuid) {
+ if !containerUuidPattern.MatchString(uuid) || !strings.HasPrefix(uuid, disp.cluster.ClusterID) {
continue
}
err := disp.TrackContainer(uuid)
}
func (disp *Dispatcher) slurmConstraintArgs(container arvados.Container) []string {
- mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM+disp.ReserveExtraRAM) / float64(1048576)))
+ mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+
+ container.RuntimeConstraints.KeepCacheRAM+
+ int64(disp.cluster.Containers.ReserveExtraRAM)) / float64(1048576)))
disk := dispatchcloud.EstimateScratchSpace(&container)
disk = int64(math.Ceil(float64(disk) / float64(1048576)))
func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error) {
var args []string
- args = append(args, disp.SbatchArguments...)
+ args = append(args, disp.cluster.Containers.SLURM.SbatchArgumentsList...)
args = append(args, "--job-name="+container.UUID, fmt.Sprintf("--nice=%d", initialNiceValue), "--no-requeue")
if disp.cluster == nil {
// append() here avoids modifying crunchRunCommand's
// underlying array, which is shared with other goroutines.
crArgs := append([]string(nil), crunchRunCommand...)
+ crArgs = append(crArgs, "--runtime-engine="+disp.cluster.Containers.RuntimeEngine)
crArgs = append(crArgs, container.UUID)
- crScript := strings.NewReader(execScript(crArgs))
+
+ h := hmac.New(sha256.New, []byte(disp.cluster.SystemRootToken))
+ fmt.Fprint(h, container.UUID)
+ authsecret := fmt.Sprintf("%x", h.Sum(nil))
+
+ crScript := strings.NewReader(execScript(crArgs, map[string]string{"GatewayAuthSecret": authsecret}))
sbArgs, err := disp.sbatchArgs(container)
if err != nil {
// already in the queue). Cancel the slurm job if the container's
// priority changes to zero or its state indicates it's no longer
// running.
-func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if ctr.State == dispatch.Locked && !disp.sqCheck.HasUUID(ctr.UUID) {
log.Printf("Submitting container %s to slurm", ctr.UUID)
- if err := disp.submit(ctr, disp.CrunchRunCommand); err != nil {
- var text string
- if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
- var logBuf bytes.Buffer
- fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
- if len(err.AvailableTypes) == 0 {
- fmt.Fprint(&logBuf, "No instance types are configured.\n")
- } else {
- fmt.Fprint(&logBuf, "Available instance types:\n")
- for _, t := range err.AvailableTypes {
- fmt.Fprintf(&logBuf,
- "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
- t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
- )
- }
- }
- text = logBuf.String()
- disp.UpdateState(ctr.UUID, dispatch.Cancelled)
- } else {
- text = fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
- }
- log.Print(text)
-
- lr := arvadosclient.Dict{"log": arvadosclient.Dict{
- "object_uuid": ctr.UUID,
- "event_type": "dispatch",
- "properties": map[string]string{"text": text}}}
- disp.Arv.Create("logs", lr, nil)
-
- disp.Unlock(ctr.UUID)
- return
+ cmd := []string{disp.cluster.Containers.CrunchRunCommand}
+ cmd = append(cmd, disp.cluster.Containers.CrunchRunArgumentsList...)
+ err := disp.submit(ctr, cmd)
+ if err != nil {
+ return err
}
}
case dispatch.Locked:
disp.Unlock(ctr.UUID)
}
- return
+ return nil
case updated, ok := <-status:
if !ok {
log.Printf("container %s is done: cancel slurm job", ctr.UUID)
time.Sleep(time.Second)
}
}
-
-func (disp *Dispatcher) readConfig(path string) error {
- err := config.LoadFile(disp, path)
- if err != nil && os.IsNotExist(err) && path == defaultConfigPath {
- log.Printf("Config not specified. Continue with default configuration.")
- err = nil
- }
- return err
-}