X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/aabff656faab97bcc70bb3d4a5001068f7d395c8..bcad695db9a1c3aac5807faa153086e653107f51:/lib/dispatchcloud/worker/pool.go diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index c6eaeae2b6..66e0bfee91 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -5,12 +5,15 @@ package worker import ( + "crypto/hmac" "crypto/md5" "crypto/rand" + "crypto/sha256" "errors" "fmt" "io" "io/ioutil" + mathrand "math/rand" "sort" "strings" "sync" @@ -86,9 +89,8 @@ const ( func duration(conf arvados.Duration, def time.Duration) time.Duration { if conf > 0 { return time.Duration(conf) - } else { - return def } + return def } // NewPool creates a Pool of workers backed by instanceSet. @@ -102,6 +104,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe instanceSetID: instanceSetID, instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, newExecutor: newExecutor, + cluster: cluster, bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand, runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary, imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID), @@ -117,8 +120,11 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM), timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal), timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock), + systemRootToken: cluster.SystemRootToken, installPublicKey: installPublicKey, tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix, + runnerCmdDefault: cluster.Containers.CrunchRunCommand, + runnerArgs: append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...), stop: make(chan bool), } wp.registerMetrics(reg) @@ -140,6 +146,7 @@ type Pool struct { instanceSetID cloud.InstanceSetID instanceSet *throttledInstanceSet newExecutor func(cloud.Instance) Executor + cluster *arvados.Cluster bootProbeCommand string runnerSource string imageID cloud.ImageID @@ -155,8 +162,11 @@ type Pool struct { timeoutTERM time.Duration timeoutSignal time.Duration timeoutStaleRunLock time.Duration + systemRootToken string installPublicKey ssh.PublicKey tagKeyPrefix string + runnerCmdDefault string // crunch-run command to use if not deploying a binary + runnerArgs []string // extra args passed to crunch-run // private state subscribers map[<-chan struct{}]chan<- struct{} @@ -184,6 +194,7 @@ type Pool struct { mTimeToReadyForContainer prometheus.Summary mTimeFromShutdownToGone prometheus.Summary mTimeFromQueueToCrunchRun prometheus.Summary + mRunProbeDuration *prometheus.SummaryVec } type createCall struct { @@ -682,6 +693,14 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) { Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, }) reg.MustRegister(wp.mTimeFromQueueToCrunchRun) + wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "instances_run_probe_duration_seconds", + Help: "Number of seconds per runProbe call.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }, []string{"outcome"}) + reg.MustRegister(wp.mRunProbeDuration) } func (wp *Pool) runMetrics() { @@ -756,6 +775,13 @@ func (wp *Pool) runProbes() { workers := []cloud.InstanceID{} for range probeticker.C { + // Add some jitter. Without this, if probeInterval is + // a multiple of syncInterval and sync is + // instantaneous (as with the loopback driver), the + // first few probes race with sync operations and + // don't update the workers. + time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23))) + workers = workers[:0] wp.mtx.Lock() for id, wkr := range wp.workers { @@ -869,7 +895,7 @@ func (wp *Pool) loadRunnerData() error { if wp.runnerData != nil { return nil } else if wp.runnerSource == "" { - wp.runnerCmd = "crunch-run" + wp.runnerCmd = wp.runnerCmdDefault wp.runnerData = []byte{} return nil } @@ -882,7 +908,7 @@ func (wp *Pool) loadRunnerData() error { } wp.runnerData = buf wp.runnerMD5 = md5.Sum(buf) - wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5) + wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5) return nil } @@ -982,6 +1008,12 @@ func (wp *Pool) waitUntilLoaded() { } } +func (wp *Pool) gatewayAuthSecret(uuid string) string { + h := hmac.New(sha256.New, []byte(wp.systemRootToken)) + fmt.Fprint(h, uuid) + return fmt.Sprintf("%x", h.Sum(nil)) +} + // Return a random string of n hexadecimal digits (n*4 random bits). n // must be even. func randomHex(n int) string {