//
// SPDX-License-Identifier: AGPL-3.0
-package main
-
// Dispatcher service for Crunch that submits containers to the slurm queue.
+package dispatchslurm
import (
"context"
- "flag"
+ "crypto/hmac"
+ "crypto/sha256"
"fmt"
"log"
"math"
+ "net/http"
"os"
"regexp"
"strings"
"time"
- "git.arvados.org/arvados.git/lib/config"
+ "git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
+ "git.arvados.org/arvados.git/lib/ctrlctx"
"git.arvados.org/arvados.git/lib/dispatchcloud"
+ "git.arvados.org/arvados.git/lib/service"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/dispatch"
"github.com/coreos/go-systemd/daemon"
- "github.com/ghodss/yaml"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
+var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchSLURM, newHandler)
+
+func newHandler(ctx context.Context, cluster *arvados.Cluster, _ string, _ *prometheus.Registry) service.Handler {
+ logger := ctxlog.FromContext(ctx)
+ disp := &Dispatcher{logger: logger, cluster: cluster}
+ if err := disp.configure(); err != nil {
+ return service.ErrorHandler(ctx, cluster, err)
+ }
+ disp.setup()
+ go func() {
+ disp.err = disp.run()
+ close(disp.done)
+ }()
+ return disp
+}
+
type logger interface {
dispatch.Logger
Fatalf(string, ...interface{})
const initialNiceValue int64 = 10000
-var (
- version = "dev"
-)
-
type Dispatcher struct {
*dispatch.Dispatcher
- logger logrus.FieldLogger
- cluster *arvados.Cluster
- sqCheck *SqueueChecker
- slurm Slurm
+ logger logrus.FieldLogger
+ cluster *arvados.Cluster
+ sqCheck *SqueueChecker
+ slurm Slurm
+ dbConnector ctrlctx.DBConnector
+
+ done chan struct{}
+ err error
Client arvados.Client
}
-func main() {
- logger := logrus.StandardLogger()
- if os.Getenv("DEBUG") != "" {
- logger.SetLevel(logrus.DebugLevel)
- }
- logger.Formatter = &logrus.JSONFormatter{
- TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
- }
- disp := &Dispatcher{logger: logger}
- err := disp.Run(os.Args[0], os.Args[1:])
- if err != nil {
- logrus.Fatalf("%s", err)
- }
+func (disp *Dispatcher) CheckHealth() error {
+ return disp.err
}
-func (disp *Dispatcher) Run(prog string, args []string) error {
- if err := disp.configure(prog, args); err != nil {
- return err
- }
- disp.setup()
- return disp.run()
+func (disp *Dispatcher) Done() <-chan struct{} {
+ return disp.done
}
-// configure() loads config files. Tests skip this.
-func (disp *Dispatcher) configure(prog string, args []string) error {
+func (disp *Dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ http.NotFound(w, r)
+}
+
+// configure() loads config files. Some tests skip this (see
+// StubbedSuite).
+func (disp *Dispatcher) configure() error {
if disp.logger == nil {
disp.logger = logrus.StandardLogger()
}
- flags := flag.NewFlagSet(prog, flag.ExitOnError)
- flags.Usage = func() { usage(flags) }
-
- loader := config.NewLoader(nil, disp.logger)
- loader.SetupFlags(flags)
-
- dumpConfig := flag.Bool(
- "dump-config",
- false,
- "write current configuration to stdout and exit")
- getVersion := flags.Bool(
- "version",
- false,
- "Print version information and exit.")
-
- args = loader.MungeLegacyConfigArgs(logrus.StandardLogger(), args, "-legacy-crunch-dispatch-slurm-config")
-
- // Parse args; omit the first arg which is the command name
- err := flags.Parse(args)
-
- if err == flag.ErrHelp {
- return nil
- }
-
- // Print version information if requested
- if *getVersion {
- fmt.Printf("crunch-dispatch-slurm %s\n", version)
- return nil
- }
-
- disp.logger.Printf("crunch-dispatch-slurm %s started", version)
-
- cfg, err := loader.Load()
- if err != nil {
- return err
- }
-
- if disp.cluster, err = cfg.GetCluster(""); err != nil {
- return fmt.Errorf("config error: %s", err)
- }
+ disp.logger = disp.logger.WithField("ClusterID", disp.cluster.ClusterID)
+ disp.logger.Printf("crunch-dispatch-slurm %s started", cmd.Version.String())
disp.Client.APIHost = disp.cluster.Services.Controller.ExternalURL.Host
disp.Client.AuthToken = disp.cluster.SystemRootToken
disp.Client.Insecure = disp.cluster.TLS.Insecure
+ disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.cluster.PostgreSQL}
if disp.Client.APIHost != "" || disp.Client.AuthToken != "" {
// Copy real configs into env vars so [a]
if disp.Client.Insecure {
os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
}
- os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
for k, v := range disp.cluster.Containers.SLURM.SbatchEnvironmentVariables {
os.Setenv(k, v)
}
} else {
disp.logger.Warnf("Client credentials missing from config, so falling back on environment variables (deprecated).")
}
-
- if *dumpConfig {
- out, err := yaml.Marshal(cfg)
- if err != nil {
- return err
- }
- _, err = os.Stdout.Write(out)
- if err != nil {
- return err
- }
- }
-
return nil
}
// setup() initializes private fields after configure().
func (disp *Dispatcher) setup() {
+ disp.done = make(chan struct{})
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
disp.logger.Fatalf("Error making Arvados client: %v", err)
}
func (disp *Dispatcher) run() error {
+ dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB)
+ defer dblock.Dispatch.Unlock()
defer disp.sqCheck.Stop()
if disp.cluster != nil && len(disp.cluster.InstanceTypes) > 0 {
if disp.cluster == nil {
// no instance types configured
args = append(args, disp.slurmConstraintArgs(container)...)
- } else if it, err := dispatchcloud.ChooseInstanceType(disp.cluster, &container); err == dispatchcloud.ErrInstanceTypesNotConfigured {
+ } else if types, err := dispatchcloud.ChooseInstanceType(disp.cluster, &container); err == dispatchcloud.ErrInstanceTypesNotConfigured {
// ditto
args = append(args, disp.slurmConstraintArgs(container)...)
} else if err != nil {
return nil, err
} else {
- // use instancetype constraint instead of slurm mem/cpu/tmp specs
- args = append(args, "--constraint=instancetype="+it.Name)
+ // use instancetype constraint instead of slurm
+ // mem/cpu/tmp specs (note types[0] is the lowest-cost
+ // suitable instance type)
+ args = append(args, "--constraint=instancetype="+types[0].Name)
}
if len(container.SchedulingParameters.Partitions) > 0 {
crArgs := append([]string(nil), crunchRunCommand...)
crArgs = append(crArgs, "--runtime-engine="+disp.cluster.Containers.RuntimeEngine)
crArgs = append(crArgs, container.UUID)
- crScript := strings.NewReader(execScript(crArgs))
+
+ h := hmac.New(sha256.New, []byte(disp.cluster.SystemRootToken))
+ fmt.Fprint(h, container.UUID)
+ authsecret := fmt.Sprintf("%x", h.Sum(nil))
+
+ crScript := strings.NewReader(execScript(crArgs, map[string]string{"GatewayAuthSecret": authsecret}))
sbArgs, err := disp.sbatchArgs(container)
if err != nil {