15759: Merge branch 'master' into 15759-deploy-crunch-run
authorTom Clegg <tom@tomclegg.ca>
Mon, 30 Dec 2019 15:18:15 +0000 (10:18 -0500)
committerTom Clegg <tom@tomclegg.ca>
Mon, 30 Dec 2019 15:18:15 +0000 (10:18 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

23 files changed:
build/run-build-packages.sh
build/run-tests.sh
cmd/arvados-server/cmd.go
lib/cmd/cmd.go
lib/config/config.default.yml
lib/config/generated_config.go
lib/crunchrun/background.go [moved from services/crunch-run/background.go with 82% similarity]
lib/crunchrun/cgroup.go [moved from services/crunch-run/cgroup.go with 97% similarity]
lib/crunchrun/cgroup_test.go [moved from services/crunch-run/cgroup_test.go with 95% similarity]
lib/crunchrun/copier.go [moved from services/crunch-run/copier.go with 99% similarity]
lib/crunchrun/copier_test.go [moved from services/crunch-run/copier_test.go with 99% similarity]
lib/crunchrun/crunchrun.go [moved from services/crunch-run/crunchrun.go with 95% similarity]
lib/crunchrun/crunchrun_test.go [moved from services/crunch-run/crunchrun_test.go with 99% similarity]
lib/crunchrun/git_mount.go [moved from services/crunch-run/git_mount.go with 99% similarity]
lib/crunchrun/git_mount_test.go [moved from services/crunch-run/git_mount_test.go with 99% similarity]
lib/crunchrun/logging.go [moved from services/crunch-run/logging.go with 99% similarity]
lib/crunchrun/logging_test.go [moved from services/crunch-run/logging_test.go with 99% similarity]
lib/dispatchcloud/dispatcher.go
lib/dispatchcloud/worker/pool.go
lib/dispatchcloud/worker/pool_test.go
lib/dispatchcloud/worker/runner.go
lib/dispatchcloud/worker/worker.go
lib/dispatchcloud/worker/worker_test.go

index d6c8f5ac64ca13431560c56fe5f3d95962d90fb9..3173aa7057eed439e7827aec3f70cc384226f869 100755 (executable)
@@ -294,7 +294,7 @@ package_go_binary services/crunch-dispatch-local crunch-dispatch-local \
     "Dispatch Crunch containers on the local system"
 package_go_binary services/crunch-dispatch-slurm crunch-dispatch-slurm \
     "Dispatch Crunch containers to a SLURM cluster"
-package_go_binary services/crunch-run crunch-run \
+package_go_binary cmd/arvados-server crunch-run \
     "Supervise a single Crunch container"
 package_go_binary services/crunchstat crunchstat \
     "Gather cpu/memory/network statistics of running Crunch jobs"
index 02a6a4044ed9ffb42e163151a576c07c3ac58287..f21861762db80b40535c6d0522a379b12dadca1b 100755 (executable)
@@ -81,6 +81,7 @@ lib/controller/railsproxy
 lib/controller/router
 lib/controller/rpc
 lib/crunchstat
+lib/crunch-run
 lib/cloud
 lib/cloud/azure
 lib/cloud/cloudtest
@@ -104,7 +105,6 @@ services/keep-balance
 services/login-sync
 services/nodemanager
 services/nodemanager_integration
-services/crunch-run
 services/crunch-dispatch-local
 services/crunch-dispatch-slurm
 services/ws
index fe6689862971b22873f013c989c2c9924492370b..d93a8e78fd3f216788584872a7d1bd3f3fd675d2 100644 (file)
@@ -11,6 +11,7 @@ import (
        "git.arvados.org/arvados.git/lib/cmd"
        "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/lib/controller"
+       "git.arvados.org/arvados.git/lib/crunchrun"
        "git.arvados.org/arvados.git/lib/dispatchcloud"
 )
 
@@ -25,6 +26,7 @@ var (
                "config-dump":     config.DumpCommand,
                "config-defaults": config.DumpDefaultsCommand,
                "controller":      controller.Command,
+               "crunch-run":      crunchrun.Command,
                "dispatch-cloud":  dispatchcloud.Command,
        })
 )
index 24b69f0cc5c529fe45b5be6f4ae6698cff607ff9..611c95d2340a3b2da47b8a7cbcfff2a3aad9af8c 100644 (file)
@@ -37,6 +37,10 @@ var version = "dev"
 
 type versionCommand struct{}
 
+func (versionCommand) String() string {
+       return fmt.Sprintf("%s (%s)", version, runtime.Version())
+}
+
 func (versionCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
        prog = regexp.MustCompile(` -*version$`).ReplaceAllLiteralString(prog, "")
        fmt.Fprintf(stdout, "%s %s (%s)\n", prog, version, runtime.Version())
@@ -61,9 +65,21 @@ type Multi map[string]Handler
 
 func (m Multi) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
        _, basename := filepath.Split(prog)
-       basename = strings.TrimPrefix(basename, "arvados-")
-       basename = strings.TrimPrefix(basename, "crunch-")
-       if cmd, ok := m[basename]; ok {
+       if i := strings.Index(basename, "~"); i >= 0 {
+               // drop "~anything" suffix (arvados-dispatch-cloud's
+               // DeployRunnerBinary feature relies on this)
+               basename = basename[:i]
+       }
+       cmd, ok := m[basename]
+       if !ok {
+               // "controller" command exists, and binary is named "arvados-controller"
+               cmd, ok = m[strings.TrimPrefix(basename, "arvados-")]
+       }
+       if !ok {
+               // "dispatch-slurm" command exists, and binary is named "crunch-dispatch-slurm"
+               cmd, ok = m[strings.TrimPrefix(basename, "crunch-")]
+       }
+       if ok {
                return cmd.RunCommand(prog, args, stdin, stdout, stderr)
        } else if len(args) < 1 {
                fmt.Fprintf(stderr, "usage: %s command [args]\n", prog)
index 7779d7ebf2a26b11a0ca4225a6cdfe865abd8a3d..ccc6343e6b35963ea84a3d590dfdae87057b91a1 100644 (file)
@@ -788,6 +788,16 @@ Clusters:
         # Worker VM image ID.
         ImageID: ""
 
+        # An executable file (located on the dispatcher host) to be
+        # copied to cloud instances at runtime and used as the
+        # container runner/supervisor. The default value is the
+        # dispatcher program itself.
+        #
+        # Use the empty string to disable this step: nothing will be
+        # copied, and cloud instances are assumed to have a suitable
+        # version of crunch-run installed.
+        DeployRunnerBinary: "/proc/self/exe"
+
         # Tags to add on all resources (VMs, NICs, disks) created by
         # the container dispatcher. (Arvados's own tags --
         # InstanceType, IdleBehavior, and InstanceSecret -- will also
index 3e58d249bf4dee7526ea1bbab74b259b85d2d4ab..3d4c18e283739130744b64a69fff8aad94abd617 100644 (file)
@@ -794,6 +794,16 @@ Clusters:
         # Worker VM image ID.
         ImageID: ""
 
+        # An executable file (located on the dispatcher host) to be
+        # copied to cloud instances at runtime and used as the
+        # container runner/supervisor. The default value is the
+        # dispatcher program itself.
+        #
+        # Use the empty string to disable this step: nothing will be
+        # copied, and cloud instances are assumed to have a suitable
+        # version of crunch-run installed.
+        DeployRunnerBinary: "/proc/self/exe"
+
         # Tags to add on all resources (VMs, NICs, disks) created by
         # the container dispatcher. (Arvados's own tags --
         # InstanceType, IdleBehavior, and InstanceSecret -- will also
similarity index 82%
rename from services/crunch-run/background.go
rename to lib/crunchrun/background.go
index 852ccb6ece3979385423f3ceb55fb437f164c6aa..bf039afa0ad53799183607fe9795b5556f615bad 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
        "encoding/json"
@@ -36,10 +36,10 @@ type procinfo struct {
 //
 // Stdout and stderr in the child process are sent to the systemd
 // journal using the systemd-cat program.
-func Detach(uuid string, args []string, stdout, stderr io.Writer) int {
-       return exitcode(stderr, detach(uuid, args, stdout, stderr))
+func Detach(uuid string, prog string, args []string, stdout, stderr io.Writer) int {
+       return exitcode(stderr, detach(uuid, prog, args, stdout, stderr))
 }
-func detach(uuid string, args []string, stdout, stderr io.Writer) error {
+func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) error {
        lockfile, err := func() (*os.File, error) {
                // We must hold the dir-level lock between
                // opening/creating the lockfile and acquiring LOCK_EX
@@ -68,7 +68,31 @@ func detach(uuid string, args []string, stdout, stderr io.Writer) error {
        defer lockfile.Close()
        lockfile.Truncate(0)
 
-       cmd := exec.Command("systemd-cat", append([]string{"--identifier=crunch-run", args[0], "-no-detach"}, args[1:]...)...)
+       execargs := append([]string{"-no-detach"}, args...)
+       if strings.HasSuffix(prog, " crunch-run") {
+               // invoked as "/path/to/arvados-server crunch-run"
+               // (see arvados/lib/cmd.Multi)
+               execargs = append([]string{strings.TrimSuffix(prog, " crunch-run"), "crunch-run"}, execargs...)
+       } else {
+               // invoked as "/path/to/crunch-run"
+               execargs = append([]string{prog}, execargs...)
+       }
+       execargs = append([]string{
+               // Here, if the inner systemd-cat can't exec
+               // crunch-run, it writes an error message to stderr,
+               // and the outer systemd-cat writes it to the journal
+               // where the operator has a chance to discover it. (If
+               // we only used one systemd-cat command, it would be
+               // up to us to report the error -- but we are going to
+               // detach and exit, not wait for something to appear
+               // on stderr.)  Note these systemd-cat calls don't
+               // result in additional processes -- they just connect
+               // stderr/stdout to sockets and call exec().
+               "systemd-cat", "--identifier=crunch-run",
+               "systemd-cat", "--identifier=crunch-run",
+       }, execargs...)
+
+       cmd := exec.Command(execargs[0], execargs[1:]...)
        // Child inherits lockfile.
        cmd.ExtraFiles = []*os.File{lockfile}
        // Ensure child isn't interrupted even if we receive signals
similarity index 97%
rename from services/crunch-run/cgroup.go
rename to lib/crunchrun/cgroup.go
index 9e52de52aad2e8c94689685b341f8ca1210dbb3c..0b254f5bd7870dcaa188df80ad9552fa1eda83ed 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
        "bytes"
similarity index 95%
rename from services/crunch-run/cgroup_test.go
rename to lib/crunchrun/cgroup_test.go
index 95adf7484d4558d4544a12ef8b2b98dfb2a1327a..b43479a3b4ae02379cff1f60d33c532a510a7f46 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
        . "gopkg.in/check.v1"
similarity index 99%
rename from services/crunch-run/copier.go
rename to lib/crunchrun/copier.go
index e5711cc7362a5e11bd394f07afc5af2cd8924367..b1497277f2d52971d7a2bbe4c24e90e583500360 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
        "encoding/json"
similarity index 99%
rename from services/crunch-run/copier_test.go
rename to lib/crunchrun/copier_test.go
index d767bc4639c912986f8ee923281e1d13e7bcb342..777b715d76dd8bb57e9d5b34309ee70b356df888 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
        "io"
similarity index 95%
rename from services/crunch-run/crunchrun.go
rename to lib/crunchrun/crunchrun.go
index 0a19d17f4b275c7b8e4aed8854c0ddffffeb59d4..b0a4007f74f87fe12be18046583ba23cc29fa9ea 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
        "bytes"
@@ -27,6 +27,7 @@ import (
        "syscall"
        "time"
 
+       "git.arvados.org/arvados.git/lib/cmd"
        "git.arvados.org/arvados.git/lib/crunchstat"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
@@ -40,7 +41,9 @@ import (
        dockerclient "github.com/docker/docker/client"
 )
 
-var version = "dev"
+type command struct{}
+
+var Command = command{}
 
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
 type IArvadosClient interface {
@@ -1527,7 +1530,7 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, err
 
 // Run the full container lifecycle.
 func (runner *ContainerRunner) Run() (err error) {
-       runner.CrunchLog.Printf("crunch-run %s started", version)
+       runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String())
        runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
 
        hostname, hosterr := os.Hostname()
@@ -1758,83 +1761,93 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
        return cr, nil
 }
 
-func main() {
-       statInterval := flag.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
-       cgroupRoot := flag.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
-       cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
-       cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
-       caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
-       detach := flag.Bool("detach", false, "Detach from parent process and run in the background")
-       stdinEnv := flag.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
-       sleep := flag.Duration("sleep", 0, "Delay before starting (testing use only)")
-       kill := flag.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
-       list := flag.Bool("list", false, "List UUIDs of existing crunch-run processes")
-       enableNetwork := flag.String("container-enable-networking", "default",
+func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       flags := flag.NewFlagSet(prog, flag.ContinueOnError)
+       statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
+       cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
+       cgroupParent := flags.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
+       cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
+       caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
+       detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
+       stdinEnv := flags.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
+       sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
+       kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
+       list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
+       enableNetwork := flags.String("container-enable-networking", "default",
                `Specify if networking should be enabled for container.  One of 'default', 'always':
        default: only enable networking if container requests it.
        always:  containers always have networking enabled
        `)
-       networkMode := flag.String("container-network-mode", "default",
+       networkMode := flags.String("container-network-mode", "default",
                `Set networking mode for container.  Corresponds to Docker network mode (--net).
        `)
-       memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
-       getVersion := flag.Bool("version", false, "Print version information and exit.")
-       flag.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+       memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
+       flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
 
        ignoreDetachFlag := false
-       if len(os.Args) > 1 && os.Args[1] == "-no-detach" {
+       if len(args) > 0 && args[0] == "-no-detach" {
                // This process was invoked by a parent process, which
                // has passed along its own arguments, including
                // -detach, after the leading -no-detach flag.  Strip
                // the leading -no-detach flag (it's not recognized by
-               // flag.Parse()) and ignore the -detach flag that
+               // flags.Parse()) and ignore the -detach flag that
                // comes later.
-               os.Args = append([]string{os.Args[0]}, os.Args[2:]...)
+               args = args[1:]
                ignoreDetachFlag = true
        }
 
-       flag.Parse()
+       if err := flags.Parse(args); err == flag.ErrHelp {
+               return 0
+       } else if err != nil {
+               log.Print(err)
+               return 1
+       }
 
        if *stdinEnv && !ignoreDetachFlag {
                // Load env vars on stdin if asked (but not in a
                // detached child process, in which case stdin is
                // /dev/null).
-               loadEnv(os.Stdin)
+               err := loadEnv(os.Stdin)
+               if err != nil {
+                       log.Print(err)
+                       return 1
+               }
        }
 
+       containerId := flags.Arg(0)
+
        switch {
        case *detach && !ignoreDetachFlag:
-               os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr))
+               return Detach(containerId, prog, args, os.Stdout, os.Stderr)
        case *kill >= 0:
-               os.Exit(KillProcess(flag.Arg(0), syscall.Signal(*kill), os.Stdout, os.Stderr))
+               return KillProcess(containerId, syscall.Signal(*kill), os.Stdout, os.Stderr)
        case *list:
-               os.Exit(ListProcesses(os.Stdout, os.Stderr))
+               return ListProcesses(os.Stdout, os.Stderr)
        }
 
-       // Print version information if requested
-       if *getVersion {
-               fmt.Printf("crunch-run %s\n", version)
-               return
+       if containerId == "" {
+               log.Printf("usage: %s [options] UUID", prog)
+               return 1
        }
 
-       log.Printf("crunch-run %s started", version)
+       log.Printf("crunch-run %s started", cmd.Version.String())
        time.Sleep(*sleep)
 
-       containerId := flag.Arg(0)
-
        if *caCertsPath != "" {
                arvadosclient.CertFiles = []string{*caCertsPath}
        }
 
        api, err := arvadosclient.MakeArvadosClient()
        if err != nil {
-               log.Fatalf("%s: %v", containerId, err)
+               log.Printf("%s: %v", containerId, err)
+               return 1
        }
        api.Retries = 8
 
        kc, kcerr := keepclient.MakeKeepClient(api)
        if kcerr != nil {
-               log.Fatalf("%s: %v", containerId, kcerr)
+               log.Printf("%s: %v", containerId, kcerr)
+               return 1
        }
        kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
        kc.Retries = 4
@@ -1845,18 +1858,20 @@ func main() {
 
        cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerId)
        if err != nil {
-               log.Fatal(err)
+               log.Print(err)
+               return 1
        }
        if dockererr != nil {
                cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
                cr.checkBrokenNode(dockererr)
                cr.CrunchLog.Close()
-               os.Exit(1)
+               return 1
        }
 
        parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerId+".")
        if tmperr != nil {
-               log.Fatalf("%s: %v", containerId, tmperr)
+               log.Printf("%s: %v", containerId, tmperr)
+               return 1
        }
 
        cr.parentTemp = parentTemp
@@ -1889,24 +1904,27 @@ func main() {
        }
 
        if runerr != nil {
-               log.Fatalf("%s: %v", containerId, runerr)
+               log.Printf("%s: %v", containerId, runerr)
+               return 1
        }
+       return 0
 }
 
-func loadEnv(rdr io.Reader) {
+func loadEnv(rdr io.Reader) error {
        buf, err := ioutil.ReadAll(rdr)
        if err != nil {
-               log.Fatalf("read stdin: %s", err)
+               return fmt.Errorf("read stdin: %s", err)
        }
        var env map[string]string
        err = json.Unmarshal(buf, &env)
        if err != nil {
-               log.Fatalf("decode stdin: %s", err)
+               return fmt.Errorf("decode stdin: %s", err)
        }
        for k, v := range env {
                err = os.Setenv(k, v)
                if err != nil {
-                       log.Fatalf("setenv(%q): %s", k, err)
+                       return fmt.Errorf("setenv(%q): %s", k, err)
                }
        }
+       return nil
 }
similarity index 99%
rename from services/crunch-run/crunchrun_test.go
rename to lib/crunchrun/crunchrun_test.go
index 636926a8d0a06054c3cca22fe2ab72e2d4df67d4..e8c7660d1aee39424f88d2d6de0e2b3a213baa36 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
        "bufio"
similarity index 99%
rename from services/crunch-run/git_mount.go
rename to lib/crunchrun/git_mount.go
index 4fcba1379446eb2378b36d5c7eb65ea5fbb673f5..92bb6d11d94a3f0ad49095db5d45692f0dc9f8b7 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
        "fmt"
similarity index 99%
rename from services/crunch-run/git_mount_test.go
rename to lib/crunchrun/git_mount_test.go
index 608917808a6e57f0371d1deee66992c2d12a54ca..e39beaa943832487f23c4215b697becc6a47dc02 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
        "io/ioutil"
similarity index 99%
rename from services/crunch-run/logging.go
rename to lib/crunchrun/logging.go
index 01cab69cd0558b62b01e45b68b934d86c5a4abc1..d5de184e5c41bf8e9dc434fe226d8d5ed17fa1f9 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
        "bufio"
similarity index 99%
rename from services/crunch-run/logging_test.go
rename to lib/crunchrun/logging_test.go
index 4a2944f6463bc6231bc9de316e57dc3718aaace9..fab333b433c04d52663f203d888f745fd02346c3 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package crunchrun
 
 import (
        "fmt"
index ce7dc07301daf99f27a2a422ea1641c4e4d92902..19c8f6e0b489309fdb5ac363c96fd1b5beeaa24d 100644 (file)
@@ -37,6 +37,7 @@ const (
 
 type pool interface {
        scheduler.WorkerPool
+       CheckHealth() error
        Instances() []worker.InstanceView
        SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
        KillInstance(id cloud.InstanceID, reason string) error
@@ -78,7 +79,7 @@ func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 // CheckHealth implements service.Handler.
 func (disp *dispatcher) CheckHealth() error {
        disp.Start()
-       return nil
+       return disp.pool.CheckHealth()
 }
 
 // Stop dispatching containers and release resources. Typically used
index 649910f63ca9c76c194b0e9155e36e516cfeb584..6bc269c7e9dd2fabb77bd1cdeee21ed4c07d61c0 100644 (file)
@@ -5,10 +5,12 @@
 package worker
 
 import (
+       "crypto/md5"
        "crypto/rand"
        "errors"
        "fmt"
        "io"
+       "io/ioutil"
        "sort"
        "strings"
        "sync"
@@ -135,6 +137,7 @@ type Pool struct {
        instanceSet        *throttledInstanceSet
        newExecutor        func(cloud.Instance) Executor
        bootProbeCommand   string
+       runnerSource       string
        imageID            cloud.ImageID
        instanceTypes      map[string]arvados.InstanceType
        syncInterval       time.Duration
@@ -160,6 +163,9 @@ type Pool struct {
        stop         chan bool
        mtx          sync.RWMutex
        setupOnce    sync.Once
+       runnerData   []byte
+       runnerMD5    [md5.Size]byte
+       runnerCmd    string
 
        throttleCreate    throttle
        throttleInstances throttle
@@ -177,6 +183,14 @@ type createCall struct {
        instanceType arvados.InstanceType
 }
 
+func (wp *Pool) CheckHealth() error {
+       wp.setupOnce.Do(wp.setup)
+       if err := wp.loadRunnerData(); err != nil {
+               return fmt.Errorf("error loading runner binary: %s", err)
+       }
+       return nil
+}
+
 // Subscribe returns a buffered channel that becomes ready after any
 // change to the pool's state that could have scheduling implications:
 // a worker's state changes, a new worker appears, the cloud
@@ -276,6 +290,10 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
 func (wp *Pool) Create(it arvados.InstanceType) bool {
        logger := wp.logger.WithField("InstanceType", it.Name)
        wp.setupOnce.Do(wp.setup)
+       if wp.loadRunnerData() != nil {
+               // Boot probe is certain to fail.
+               return false
+       }
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
@@ -743,6 +761,36 @@ func (wp *Pool) setup() {
        wp.exited = map[string]time.Time{}
        wp.workers = map[cloud.InstanceID]*worker{}
        wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
+       wp.loadRunnerData()
+}
+
+// Load the runner program to be deployed on worker nodes into
+// wp.runnerData, if necessary. Errors are logged.
+//
+// If auto-deploy is disabled, len(wp.runnerData) will be 0.
+//
+// Caller must not have lock.
+func (wp *Pool) loadRunnerData() error {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       if wp.runnerData != nil {
+               return nil
+       } else if wp.runnerSource == "" {
+               wp.runnerCmd = "crunch-run"
+               wp.runnerData = []byte{}
+               return nil
+       }
+       logger := wp.logger.WithField("source", wp.runnerSource)
+       logger.Debug("loading runner")
+       buf, err := ioutil.ReadFile(wp.runnerSource)
+       if err != nil {
+               logger.WithError(err).Error("failed to load runner program")
+               return err
+       }
+       wp.runnerData = buf
+       wp.runnerMD5 = md5.Sum(buf)
+       wp.runnerCmd = fmt.Sprintf("/var/run/arvados/crunch-run~%x", wp.runnerMD5)
+       return nil
 }
 
 func (wp *Pool) notify() {
index 8b2415b402f5885decbc5f9f0e389bd4a389d369..1948c1e874859f2d8355115b3671f2c5ef0ae32d 100644 (file)
@@ -70,9 +70,11 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
        c.Assert(err, check.IsNil)
 
        newExecutor := func(cloud.Instance) Executor {
-               return stubExecutor{
-                       "crunch-run --list": stubResp{},
-                       "true":              stubResp{},
+               return &stubExecutor{
+                       response: map[string]stubResp{
+                               "crunch-run --list": stubResp{},
+                               "true":              stubResp{},
+                       },
                }
        }
 
@@ -155,7 +157,7 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
        type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
        pool := &Pool{
                logger:      logger,
-               newExecutor: func(cloud.Instance) Executor { return stubExecutor{} },
+               newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
                instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
                instanceTypes: arvados.InstanceTypeMap{
                        type1.Name: type1,
index e819a6036b341d5d2bbe28a242292296b5b36cd2..47521213427610a531fa269df32b046b17ba72aa 100644 (file)
@@ -20,6 +20,7 @@ type remoteRunner struct {
        uuid          string
        executor      Executor
        envJSON       json.RawMessage
+       runnerCmd     string
        remoteUser    string
        timeoutTERM   time.Duration
        timeoutSignal time.Duration
@@ -59,6 +60,7 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
                uuid:          uuid,
                executor:      wkr.executor,
                envJSON:       envJSON,
+               runnerCmd:     wkr.wp.runnerCmd,
                remoteUser:    wkr.instance.RemoteUser(),
                timeoutTERM:   wkr.wp.timeoutTERM,
                timeoutSignal: wkr.wp.timeoutSignal,
@@ -76,7 +78,7 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
 // assume the remote process _might_ have started, at least until it
 // probes the worker and finds otherwise.
 func (rr *remoteRunner) Start() {
-       cmd := "crunch-run --detach --stdin-env '" + rr.uuid + "'"
+       cmd := rr.runnerCmd + " --detach --stdin-env '" + rr.uuid + "'"
        if rr.remoteUser != "root" {
                cmd = "sudo " + cmd
        }
@@ -136,7 +138,7 @@ func (rr *remoteRunner) Kill(reason string) {
 func (rr *remoteRunner) kill(sig syscall.Signal) {
        logger := rr.logger.WithField("Signal", int(sig))
        logger.Info("sending signal")
-       cmd := fmt.Sprintf("crunch-run --kill %d %s", sig, rr.uuid)
+       cmd := fmt.Sprintf(rr.runnerCmd+" --kill %d %s", sig, rr.uuid)
        if rr.remoteUser != "root" {
                cmd = "sudo " + cmd
        }
index cda5a14b1aae19a7cbbbab377f074543f2a87ac9..a455a7d06d88770080fa42e62a19cb878e5050d6 100644 (file)
@@ -5,7 +5,9 @@
 package worker
 
 import (
+       "bytes"
        "fmt"
+       "path/filepath"
        "strings"
        "sync"
        "time"
@@ -318,7 +320,7 @@ func (wkr *worker) probeAndUpdate() {
 }
 
 func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
-       cmd := "crunch-run --list"
+       cmd := wkr.wp.runnerCmd + " --list"
        if u := wkr.instance.RemoteUser(); u != "root" {
                cmd = "sudo " + cmd
        }
@@ -358,9 +360,41 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
                return false, stderr
        }
        logger.Info("boot probe succeeded")
+       if err = wkr.wp.loadRunnerData(); err != nil {
+               wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary")
+               return false, stderr
+       } else if len(wkr.wp.runnerData) == 0 {
+               // Assume crunch-run is already installed
+       } else if _, stderr2, err := wkr.copyRunnerData(); err != nil {
+               wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary")
+               return false, stderr2
+       } else {
+               wkr.logger.Info("runner binary OK")
+               stderr = append(stderr, stderr2...)
+       }
        return true, stderr
 }
 
+func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) {
+       hash := fmt.Sprintf("%x", wkr.wp.runnerMD5)
+       dstdir, _ := filepath.Split(wkr.wp.runnerCmd)
+
+       stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil)
+       if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+"  "+wkr.wp.runnerCmd+"\n")) {
+               return
+       }
+
+       // Note touch+chmod come before writing data, to avoid the
+       // possibility of md5 being correct while file mode is
+       // incorrect.
+       cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0700 "$dstfile"; cat >"$dstfile"`
+       if wkr.instance.RemoteUser() != "root" {
+               cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'`
+       }
+       stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData))
+       return
+}
+
 // caller must have lock.
 func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
        if wkr.idleBehavior == IdleBehaviorHold {
index 4f8f77bff6de95f820ca38d81a0151f31a3bf86d..a4c2a6370f3d5ce3803484b18ac811abec7e6bc1 100644 (file)
@@ -5,8 +5,12 @@
 package worker
 
 import (
+       "bytes"
+       "crypto/md5"
        "errors"
+       "fmt"
        "io"
+       "strings"
        "time"
 
        "git.arvados.org/arvados.git/lib/cloud"
@@ -38,7 +42,11 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
                running         int
                starting        int
                respBoot        stubResp // zero value is success
+               respDeploy      stubResp // zero value is success
                respRun         stubResp // zero value is success + nothing running
+               respRunDeployed stubResp
+               deployRunner    []byte
+               expectStdin     []byte
                expectState     State
                expectRunning   int
        }
@@ -46,7 +54,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
        errFail := errors.New("failed")
        respFail := stubResp{"", "command failed\n", errFail}
        respContainerRunning := stubResp{"zzzzz-dz642-abcdefghijklmno\n", "", nil}
-       for _, trial := range []trialT{
+       for idx, trial := range []trialT{
                {
                        testCaseComment: "Unknown, probes fail",
                        state:           StateUnknown,
@@ -185,12 +193,40 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
                        starting:        1,
                        expectState:     StateRunning,
                },
+               {
+                       testCaseComment: "Booting, boot probe succeeds, deployRunner succeeds, run probe succeeds",
+                       state:           StateBooting,
+                       deployRunner:    []byte("ELF"),
+                       expectStdin:     []byte("ELF"),
+                       respRun:         respFail,
+                       respRunDeployed: respContainerRunning,
+                       expectRunning:   1,
+                       expectState:     StateRunning,
+               },
+               {
+                       testCaseComment: "Booting, boot probe succeeds, deployRunner fails",
+                       state:           StateBooting,
+                       deployRunner:    []byte("ELF"),
+                       respDeploy:      respFail,
+                       expectStdin:     []byte("ELF"),
+                       expectState:     StateBooting,
+               },
+               {
+                       testCaseComment: "Booting, boot probe succeeds, deployRunner skipped, run probe succeeds",
+                       state:           StateBooting,
+                       deployRunner:    nil,
+                       respDeploy:      respFail,
+                       expectState:     StateIdle,
+               },
        } {
-               c.Logf("------- %#v", trial)
+               c.Logf("------- trial %d: %#v", idx, trial)
                ctime := time.Now().Add(-trial.age)
-               exr := stubExecutor{
-                       "bootprobe":         trial.respBoot,
-                       "crunch-run --list": trial.respRun,
+               exr := &stubExecutor{
+                       response: map[string]stubResp{
+                               "bootprobe":         trial.respBoot,
+                               "crunch-run --list": trial.respRun,
+                               "{deploy}":          trial.respDeploy,
+                       },
                }
                wp := &Pool{
                        arvClient:        ac,
@@ -199,6 +235,14 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
                        timeoutBooting:   bootTimeout,
                        timeoutProbe:     probeTimeout,
                        exited:           map[string]time.Time{},
+                       runnerCmd:        "crunch-run",
+                       runnerData:       trial.deployRunner,
+                       runnerMD5:        md5.Sum(trial.deployRunner),
+               }
+               if trial.deployRunner != nil {
+                       svHash := md5.Sum(trial.deployRunner)
+                       wp.runnerCmd = fmt.Sprintf("/var/run/arvados/crunch-run~%x", svHash)
+                       exr.response[wp.runnerCmd+" --list"] = trial.respRunDeployed
                }
                wkr := &worker{
                        logger:   logger,
@@ -226,6 +270,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
                wkr.probeAndUpdate()
                c.Check(wkr.state, check.Equals, trial.expectState)
                c.Check(len(wkr.running), check.Equals, trial.expectRunning)
+               c.Check(exr.stdin.String(), check.Equals, string(trial.expectStdin))
        }
 }
 
@@ -234,14 +279,27 @@ type stubResp struct {
        stderr string
        err    error
 }
-type stubExecutor map[string]stubResp
 
-func (se stubExecutor) SetTarget(cloud.ExecutorTarget) {}
-func (se stubExecutor) Close()                         {}
-func (se stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) {
-       resp, ok := se[cmd]
+type stubExecutor struct {
+       response map[string]stubResp
+       stdin    bytes.Buffer
+}
+
+func (se *stubExecutor) SetTarget(cloud.ExecutorTarget) {}
+func (se *stubExecutor) Close()                         {}
+func (se *stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) {
+       if stdin != nil {
+               _, err = io.Copy(&se.stdin, stdin)
+               if err != nil {
+                       return nil, []byte(err.Error()), err
+               }
+       }
+       resp, ok := se.response[cmd]
+       if !ok && strings.Contains(cmd, `; cat >"$dstfile"`) {
+               resp, ok = se.response["{deploy}"]
+       }
        if !ok {
-               return nil, []byte("command not found\n"), errors.New("command not found")
+               return nil, []byte(fmt.Sprintf("%s: command not found\n", cmd)), errors.New("command not found")
        }
        return []byte(resp.stdout), []byte(resp.stderr), resp.err
 }