19175: Merge branch 'main' into 19175-doc-refactor-multi-host-installation
[arvados.git] / lib / dispatchcloud / worker / pool.go
index 72d5ca56e75bcc7c3fc2893c20f34303e08a6652..66e0bfee910a236b46980f2db4b7c30850b3a759 100644 (file)
@@ -5,12 +5,15 @@
 package worker
 
 import (
+       "crypto/hmac"
        "crypto/md5"
        "crypto/rand"
+       "crypto/sha256"
        "errors"
        "fmt"
        "io"
        "io/ioutil"
+       mathrand "math/rand"
        "sort"
        "strings"
        "sync"
@@ -86,9 +89,8 @@ const (
 func duration(conf arvados.Duration, def time.Duration) time.Duration {
        if conf > 0 {
                return time.Duration(conf)
-       } else {
-               return def
        }
+       return def
 }
 
 // NewPool creates a Pool of workers backed by instanceSet.
@@ -102,6 +104,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                instanceSetID:                  instanceSetID,
                instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
                newExecutor:                    newExecutor,
+               cluster:                        cluster,
                bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
                runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
                imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
@@ -117,8 +120,11 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
                timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
                timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
+               systemRootToken:                cluster.SystemRootToken,
                installPublicKey:               installPublicKey,
                tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
+               runnerCmdDefault:               cluster.Containers.CrunchRunCommand,
+               runnerArgs:                     append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
                stop:                           make(chan bool),
        }
        wp.registerMetrics(reg)
@@ -140,6 +146,7 @@ type Pool struct {
        instanceSetID                  cloud.InstanceSetID
        instanceSet                    *throttledInstanceSet
        newExecutor                    func(cloud.Instance) Executor
+       cluster                        *arvados.Cluster
        bootProbeCommand               string
        runnerSource                   string
        imageID                        cloud.ImageID
@@ -155,8 +162,11 @@ type Pool struct {
        timeoutTERM                    time.Duration
        timeoutSignal                  time.Duration
        timeoutStaleRunLock            time.Duration
+       systemRootToken                string
        installPublicKey               ssh.PublicKey
        tagKeyPrefix                   string
+       runnerCmdDefault               string   // crunch-run command to use if not deploying a binary
+       runnerArgs                     []string // extra args passed to crunch-run
 
        // private state
        subscribers  map[<-chan struct{}]chan<- struct{}
@@ -765,6 +775,13 @@ func (wp *Pool) runProbes() {
 
        workers := []cloud.InstanceID{}
        for range probeticker.C {
+               // Add some jitter. Without this, if probeInterval is
+               // a multiple of syncInterval and sync is
+               // instantaneous (as with the loopback driver), the
+               // first few probes race with sync operations and
+               // don't update the workers.
+               time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
+
                workers = workers[:0]
                wp.mtx.Lock()
                for id, wkr := range wp.workers {
@@ -878,7 +895,7 @@ func (wp *Pool) loadRunnerData() error {
        if wp.runnerData != nil {
                return nil
        } else if wp.runnerSource == "" {
-               wp.runnerCmd = "crunch-run"
+               wp.runnerCmd = wp.runnerCmdDefault
                wp.runnerData = []byte{}
                return nil
        }
@@ -891,7 +908,7 @@ func (wp *Pool) loadRunnerData() error {
        }
        wp.runnerData = buf
        wp.runnerMD5 = md5.Sum(buf)
-       wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
+       wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
        return nil
 }
 
@@ -991,6 +1008,12 @@ func (wp *Pool) waitUntilLoaded() {
        }
 }
 
+func (wp *Pool) gatewayAuthSecret(uuid string) string {
+       h := hmac.New(sha256.New, []byte(wp.systemRootToken))
+       fmt.Fprint(h, uuid)
+       return fmt.Sprintf("%x", h.Sum(nil))
+}
+
 // Return a random string of n hexadecimal digits (n*4 random bits). n
 // must be even.
 func randomHex(n int) string {