Merge branch '18947-githttpd'
[arvados.git] / services / crunch-dispatch-local / crunch-dispatch-local.go
index c1d6d562a2c963209be94a590fcd14bc8eea272b..c33c2358ca3b7612ef92498edbef9a5562d6b9f5 100644 (file)
@@ -10,7 +10,6 @@ import (
        "context"
        "flag"
        "fmt"
-       "log"
        "os"
        "os/exec"
        "os/signal"
@@ -18,27 +17,32 @@ import (
        "syscall"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/dispatch"
-       arvadosVersion "git.curoverse.com/arvados.git/sdk/go/version"
+       "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/dispatch"
+       "github.com/sirupsen/logrus"
 )
 
-func main() {
-       err := doMain()
-       if err != nil {
-               log.Fatalf("%q", err)
-       }
-}
+var version = "dev"
 
 var (
        runningCmds      map[string]*exec.Cmd
        runningCmdsMutex sync.Mutex
        waitGroup        sync.WaitGroup
-       crunchRunCommand *string
+       crunchRunCommand string
 )
 
-func doMain() error {
+func main() {
+       baseLogger := logrus.StandardLogger()
+       if os.Getenv("DEBUG") != "" {
+               baseLogger.SetLevel(logrus.DebugLevel)
+       }
+       baseLogger.Formatter = &logrus.JSONFormatter{
+               TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
+       }
+
        flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
 
        pollInterval := flags.Int(
@@ -46,7 +50,7 @@ func doMain() error {
                10,
                "Interval in seconds to poll for queued containers")
 
-       crunchRunCommand = flags.String(
+       flags.StringVar(&crunchRunCommand,
                "crunch-run-command",
                "/usr/bin/crunch-run",
                "Crunch command to run container")
@@ -56,42 +60,79 @@ func doMain() error {
                false,
                "Print version information and exit.")
 
-       // Parse args; omit the first arg which is the command name
-       flags.Parse(os.Args[1:])
+       if ok, code := cmd.ParseFlags(flags, os.Args[0], os.Args[1:], "", os.Stderr); !ok {
+               os.Exit(code)
+       }
 
        // Print version information if requested
        if *getVersion {
-               fmt.Printf("Version: %s\n", arvadosVersion.GetVersion())
-               os.Exit(0)
+               fmt.Printf("crunch-dispatch-local %s\n", version)
+               return
+       }
+
+       loader := config.NewLoader(nil, baseLogger)
+       cfg, err := loader.Load()
+       if err != nil {
+               fmt.Fprintf(os.Stderr, "error loading config: %s\n", err)
+               os.Exit(1)
+       }
+       cluster, err := cfg.GetCluster("")
+       if err != nil {
+               fmt.Fprintf(os.Stderr, "config error: %s\n", err)
+               os.Exit(1)
        }
 
-       log.Printf("crunch-dispatch-local %q started", arvadosVersion.GetVersion())
+       logger := baseLogger.WithField("ClusterID", cluster.ClusterID)
+       logger.Printf("crunch-dispatch-local %s started", version)
 
        runningCmds = make(map[string]*exec.Cmd)
 
+       var client arvados.Client
+       client.APIHost = cluster.Services.Controller.ExternalURL.Host
+       client.AuthToken = cluster.SystemRootToken
+       client.Insecure = cluster.TLS.Insecure
+
+       if client.APIHost != "" || client.AuthToken != "" {
+               // Copy real configs into env vars so [a]
+               // MakeArvadosClient() uses them, and [b] they get
+               // propagated to crunch-run via SLURM.
+               os.Setenv("ARVADOS_API_HOST", client.APIHost)
+               os.Setenv("ARVADOS_API_TOKEN", client.AuthToken)
+               os.Setenv("ARVADOS_API_HOST_INSECURE", "")
+               if client.Insecure {
+                       os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
+               }
+               os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
+       } else {
+               logger.Warnf("Client credentials missing from config, so falling back on environment variables (deprecated).")
+       }
+
        arv, err := arvadosclient.MakeArvadosClient()
        if err != nil {
-               log.Printf("Error making Arvados client: %v", err)
-               return err
+               logger.Errorf("error making Arvados client: %v", err)
+               os.Exit(1)
        }
        arv.Retries = 25
 
+       ctx, cancel := context.WithCancel(context.Background())
+
        dispatcher := dispatch.Dispatcher{
+               Logger:       logger,
                Arv:          arv,
-               RunContainer: run,
+               RunContainer: (&LocalRun{startFunc, make(chan bool, 8), ctx, cluster}).run,
                PollPeriod:   time.Duration(*pollInterval) * time.Second,
        }
 
-       ctx, cancel := context.WithCancel(context.Background())
        err = dispatcher.Run(ctx)
        if err != nil {
-               return err
+               logger.Error(err)
+               return
        }
 
        c := make(chan os.Signal, 1)
        signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
        sig := <-c
-       log.Printf("Received %s, shutting down", sig)
+       logger.Printf("Received %s, shutting down", sig)
        signal.Stop(c)
 
        cancel()
@@ -105,15 +146,18 @@ func doMain() error {
 
        // Wait for all running crunch jobs to complete / terminate
        waitGroup.Wait()
-
-       return nil
 }
 
 func startFunc(container arvados.Container, cmd *exec.Cmd) error {
        return cmd.Start()
 }
 
-var startCmd = startFunc
+type LocalRun struct {
+       startCmd         func(container arvados.Container, cmd *exec.Cmd) error
+       concurrencyLimit chan bool
+       ctx              context.Context
+       cluster          *arvados.Cluster
+}
 
 // Run a container.
 //
@@ -123,29 +167,51 @@ var startCmd = startFunc
 //
 // If the container is in any other state, or is not Complete/Cancelled after
 // crunch-run terminates, mark the container as Cancelled.
-func run(dispatcher *dispatch.Dispatcher,
+func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
        container arvados.Container,
-       status <-chan arvados.Container) {
+       status <-chan arvados.Container) error {
 
        uuid := container.UUID
 
        if container.State == dispatch.Locked {
+
+               select {
+               case lr.concurrencyLimit <- true:
+                       break
+               case <-lr.ctx.Done():
+                       return lr.ctx.Err()
+               }
+
+               defer func() { <-lr.concurrencyLimit }()
+
+               select {
+               case c := <-status:
+                       // Check for state updates after possibly
+                       // waiting to be ready-to-run
+                       if c.Priority == 0 {
+                               goto Finish
+                       }
+               default:
+                       break
+               }
+
                waitGroup.Add(1)
+               defer waitGroup.Done()
 
-               cmd := exec.Command(*crunchRunCommand, uuid)
+               cmd := exec.Command(crunchRunCommand, "--runtime-engine="+lr.cluster.Containers.RuntimeEngine, uuid)
                cmd.Stdin = nil
                cmd.Stderr = os.Stderr
                cmd.Stdout = os.Stderr
 
-               log.Printf("Starting container %v", uuid)
+               dispatcher.Logger.Printf("starting container %v", uuid)
 
                // Add this crunch job to the list of runningCmds only if we
                // succeed in starting crunch-run.
 
                runningCmdsMutex.Lock()
-               if err := startCmd(container, cmd); err != nil {
+               if err := lr.startCmd(container, cmd); err != nil {
                        runningCmdsMutex.Unlock()
-                       log.Printf("Error starting %v for %v: %q", *crunchRunCommand, uuid, err)
+                       dispatcher.Logger.Warnf("error starting %q for %s: %s", crunchRunCommand, uuid, err)
                        dispatcher.UpdateState(uuid, dispatch.Cancelled)
                } else {
                        runningCmds[uuid] = cmd
@@ -156,9 +222,9 @@ func run(dispatcher *dispatch.Dispatcher,
 
                        go func() {
                                if _, err := cmd.Process.Wait(); err != nil {
-                                       log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
+                                       dispatcher.Logger.Warnf("error while waiting for crunch job to finish for %v: %q", uuid, err)
                                }
-                               log.Printf("sending done")
+                               dispatcher.Logger.Debugf("sending done")
                                done <- struct{}{}
                        }()
 
@@ -170,31 +236,32 @@ func run(dispatcher *dispatch.Dispatcher,
                                case c := <-status:
                                        // Interrupt the child process if priority changes to 0
                                        if (c.State == dispatch.Locked || c.State == dispatch.Running) && c.Priority == 0 {
-                                               log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
+                                               dispatcher.Logger.Printf("sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
                                                cmd.Process.Signal(os.Interrupt)
                                        }
                                }
                        }
                        close(done)
 
-                       log.Printf("Finished container run for %v", uuid)
+                       dispatcher.Logger.Printf("finished container run for %v", uuid)
 
                        // Remove the crunch job from runningCmds
                        runningCmdsMutex.Lock()
                        delete(runningCmds, uuid)
                        runningCmdsMutex.Unlock()
                }
-               waitGroup.Done()
        }
 
+Finish:
+
        // If the container is not finalized, then change it to "Cancelled".
        err := dispatcher.Arv.Get("containers", uuid, nil, &container)
        if err != nil {
-               log.Printf("Error getting final container state: %v", err)
+               dispatcher.Logger.Warnf("error getting final container state: %v", err)
        }
        if container.State == dispatch.Locked || container.State == dispatch.Running {
-               log.Printf("After %s process termination, container state for %v is %q.  Updating it to %q",
-                       *crunchRunCommand, container.State, uuid, dispatch.Cancelled)
+               dispatcher.Logger.Warnf("after %q process termination, container state for %v is %q; updating it to %q",
+                       crunchRunCommand, uuid, container.State, dispatch.Cancelled)
                dispatcher.UpdateState(uuid, dispatch.Cancelled)
        }
 
@@ -202,5 +269,6 @@ func run(dispatcher *dispatch.Dispatcher,
        for range status {
        }
 
-       log.Printf("Finalized container %v", uuid)
+       dispatcher.Logger.Printf("finalized container %v", uuid)
+       return nil
 }