X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/fc0445570096282a9f10b2883a6bbd6a30c7aae1..e44725a3792df227f189f88ffb2cd1dbf0e93489:/lib/dispatchcloud/worker/pool.go diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index 3abcba6c73..1d600e3702 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -106,6 +106,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe newExecutor: newExecutor, cluster: cluster, bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand, + instanceInitCommand: cloud.InitCommand(cluster.Containers.CloudVMs.InstanceInitCommand), runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary, imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID), instanceTypes: cluster.InstanceTypes, @@ -149,6 +150,7 @@ type Pool struct { newExecutor func(cloud.Instance) Executor cluster *arvados.Cluster bootProbeCommand string + instanceInitCommand cloud.InitCommand runnerSource string imageID cloud.ImageID instanceTypes map[string]arvados.InstanceType @@ -197,6 +199,8 @@ type Pool struct { mTimeFromShutdownToGone prometheus.Summary mTimeFromQueueToCrunchRun prometheus.Summary mRunProbeDuration *prometheus.SummaryVec + mProbeAgeMax prometheus.Gauge + mProbeAgeMedian prometheus.Gauge } type createCall struct { @@ -345,7 +349,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool { wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun), wp.tagKeyPrefix + tagKeyInstanceSecret: secret, } - initCmd := TagVerifier{nil, secret, nil}.InitCommand() + initCmd := TagVerifier{nil, secret, nil}.InitCommand() + "\n" + wp.instanceInitCommand inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey) wp.mtx.Lock() defer wp.mtx.Unlock() @@ -397,10 +401,15 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) func (wp *Pool) reportSSHConnected(inst cloud.Instance) { wp.mtx.Lock() defer wp.mtx.Unlock() - wkr := wp.workers[inst.ID()] + wkr, ok := wp.workers[inst.ID()] + if !ok { + // race: inst was removed from the pool + return + } if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() { - // the node is not in booting state (can happen if a-d-c is restarted) OR - // this is not the first SSH connection + // the node is not in booting state (can happen if + // a-d-c is restarted) OR this is not the first SSH + // connection return } @@ -621,6 +630,20 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) { Help: "Number of containers reported running by cloud VMs.", }) reg.MustRegister(wp.mContainersRunning) + wp.mProbeAgeMax = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "probe_age_seconds_max", + Help: "Maximum number of seconds since an instance's most recent successful probe.", + }) + reg.MustRegister(wp.mProbeAgeMax) + wp.mProbeAgeMedian = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "probe_age_seconds_median", + Help: "Median number of seconds since an instance's most recent successful probe.", + }) + reg.MustRegister(wp.mProbeAgeMedian) wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "arvados", Subsystem: "dispatchcloud", @@ -733,6 +756,8 @@ func (wp *Pool) updateMetrics() { cpu := map[string]int64{} mem := map[string]int64{} var running int64 + now := time.Now() + var probed []time.Time for _, wkr := range wp.workers { var cat string switch { @@ -752,6 +777,7 @@ func (wp *Pool) updateMetrics() { cpu[cat] += int64(wkr.instType.VCPUs) mem[cat] += int64(wkr.instType.RAM) running += int64(len(wkr.running) + len(wkr.starting)) + probed = append(probed, wkr.probed) } for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} { wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat]) @@ -768,6 +794,15 @@ func (wp *Pool) updateMetrics() { wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v)) } wp.mContainersRunning.Set(float64(running)) + + if len(probed) == 0 { + wp.mProbeAgeMax.Set(0) + wp.mProbeAgeMedian.Set(0) + } else { + sort.Slice(probed, func(i, j int) bool { return probed[i].Before(probed[j]) }) + wp.mProbeAgeMax.Set(now.Sub(probed[0]).Seconds()) + wp.mProbeAgeMedian.Set(now.Sub(probed[len(probed)/2]).Seconds()) + } } func (wp *Pool) runProbes() { @@ -873,6 +908,9 @@ func (wp *Pool) Instances() []InstanceView { // KillInstance destroys a cloud VM instance. It returns an error if // the given instance does not exist. func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error { + wp.setupOnce.Do(wp.setup) + wp.mtx.Lock() + defer wp.mtx.Unlock() wkr, ok := wp.workers[id] if !ok { return errors.New("instance not found")