19175: Merge branch 'main' into 19175-doc-refactor-multi-host-installation
[arvados.git] / lib / dispatchcloud / worker / pool.go
index 6a74280ca452e9b365f6a976f96e04ef03edc7e6..66e0bfee910a236b46980f2db4b7c30850b3a759 100644 (file)
@@ -13,6 +13,7 @@ import (
        "fmt"
        "io"
        "io/ioutil"
+       mathrand "math/rand"
        "sort"
        "strings"
        "sync"
@@ -103,6 +104,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                instanceSetID:                  instanceSetID,
                instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
                newExecutor:                    newExecutor,
+               cluster:                        cluster,
                bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
                runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
                imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
@@ -121,6 +123,8 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                systemRootToken:                cluster.SystemRootToken,
                installPublicKey:               installPublicKey,
                tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
+               runnerCmdDefault:               cluster.Containers.CrunchRunCommand,
+               runnerArgs:                     append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
                stop:                           make(chan bool),
        }
        wp.registerMetrics(reg)
@@ -142,6 +146,7 @@ type Pool struct {
        instanceSetID                  cloud.InstanceSetID
        instanceSet                    *throttledInstanceSet
        newExecutor                    func(cloud.Instance) Executor
+       cluster                        *arvados.Cluster
        bootProbeCommand               string
        runnerSource                   string
        imageID                        cloud.ImageID
@@ -160,6 +165,8 @@ type Pool struct {
        systemRootToken                string
        installPublicKey               ssh.PublicKey
        tagKeyPrefix                   string
+       runnerCmdDefault               string   // crunch-run command to use if not deploying a binary
+       runnerArgs                     []string // extra args passed to crunch-run
 
        // private state
        subscribers  map[<-chan struct{}]chan<- struct{}
@@ -768,6 +775,13 @@ func (wp *Pool) runProbes() {
 
        workers := []cloud.InstanceID{}
        for range probeticker.C {
+               // Add some jitter. Without this, if probeInterval is
+               // a multiple of syncInterval and sync is
+               // instantaneous (as with the loopback driver), the
+               // first few probes race with sync operations and
+               // don't update the workers.
+               time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
+
                workers = workers[:0]
                wp.mtx.Lock()
                for id, wkr := range wp.workers {
@@ -881,7 +895,7 @@ func (wp *Pool) loadRunnerData() error {
        if wp.runnerData != nil {
                return nil
        } else if wp.runnerSource == "" {
-               wp.runnerCmd = "crunch-run"
+               wp.runnerCmd = wp.runnerCmdDefault
                wp.runnerData = []byte{}
                return nil
        }
@@ -894,7 +908,7 @@ func (wp *Pool) loadRunnerData() error {
        }
        wp.runnerData = buf
        wp.runnerMD5 = md5.Sum(buf)
-       wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
+       wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
        return nil
 }