X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/02f779feef0138420f8d7dc908eba040bc2dd904..58e6402a72e9ac1a210b2d318591f973a37e1e57:/lib/dispatchcloud/worker/pool.go diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index a25ed60150..66e0bfee91 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -5,12 +5,15 @@ package worker import ( + "crypto/hmac" "crypto/md5" "crypto/rand" + "crypto/sha256" "errors" "fmt" "io" "io/ioutil" + mathrand "math/rand" "sort" "strings" "sync" @@ -101,6 +104,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe instanceSetID: instanceSetID, instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, newExecutor: newExecutor, + cluster: cluster, bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand, runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary, imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID), @@ -116,8 +120,11 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM), timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal), timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock), + systemRootToken: cluster.SystemRootToken, installPublicKey: installPublicKey, tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix, + runnerCmdDefault: cluster.Containers.CrunchRunCommand, + runnerArgs: append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...), stop: make(chan bool), } wp.registerMetrics(reg) @@ -139,6 +146,7 @@ type Pool struct { instanceSetID cloud.InstanceSetID instanceSet *throttledInstanceSet newExecutor func(cloud.Instance) Executor + cluster *arvados.Cluster bootProbeCommand string runnerSource string imageID cloud.ImageID @@ -154,8 +162,11 @@ type Pool struct { timeoutTERM time.Duration timeoutSignal time.Duration timeoutStaleRunLock time.Duration + systemRootToken string installPublicKey ssh.PublicKey tagKeyPrefix string + runnerCmdDefault string // crunch-run command to use if not deploying a binary + runnerArgs []string // extra args passed to crunch-run // private state subscribers map[<-chan struct{}]chan<- struct{} @@ -764,6 +775,13 @@ func (wp *Pool) runProbes() { workers := []cloud.InstanceID{} for range probeticker.C { + // Add some jitter. Without this, if probeInterval is + // a multiple of syncInterval and sync is + // instantaneous (as with the loopback driver), the + // first few probes race with sync operations and + // don't update the workers. + time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23))) + workers = workers[:0] wp.mtx.Lock() for id, wkr := range wp.workers { @@ -877,7 +895,7 @@ func (wp *Pool) loadRunnerData() error { if wp.runnerData != nil { return nil } else if wp.runnerSource == "" { - wp.runnerCmd = "crunch-run" + wp.runnerCmd = wp.runnerCmdDefault wp.runnerData = []byte{} return nil } @@ -890,7 +908,7 @@ func (wp *Pool) loadRunnerData() error { } wp.runnerData = buf wp.runnerMD5 = md5.Sum(buf) - wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5) + wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5) return nil } @@ -990,6 +1008,12 @@ func (wp *Pool) waitUntilLoaded() { } } +func (wp *Pool) gatewayAuthSecret(uuid string) string { + h := hmac.New(sha256.New, []byte(wp.systemRootToken)) + fmt.Fprint(h, uuid) + return fmt.Sprintf("%x", h.Sum(nil)) +} + // Return a random string of n hexadecimal digits (n*4 random bits). n // must be even. func randomHex(n int) string {