X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0e1e46da333f606b8b3293367641818fd3cc9ff4..442a871e7f3476938d0ebb3adbe3b9a7742f03ad:/lib/dispatchcloud/worker/pool.go diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index 014ab93bfe..0ee36a96ff 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -25,6 +25,7 @@ const ( tagKeyInstanceType = "InstanceType" tagKeyIdleBehavior = "IdleBehavior" tagKeyInstanceSecret = "InstanceSecret" + tagKeyInstanceSetID = "InstanceSetID" ) // An InstanceView shows a worker's current state and recent activity. @@ -91,25 +92,27 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration { // // New instances are configured and set up according to the given // cluster configuration. -func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool { +func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool { wp := &Pool{ logger: logger, arvClient: arvClient, + instanceSetID: instanceSetID, instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, newExecutor: newExecutor, - bootProbeCommand: cluster.CloudVMs.BootProbeCommand, - imageID: cloud.ImageID(cluster.CloudVMs.ImageID), + bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand, + imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID), instanceTypes: cluster.InstanceTypes, - maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond, - probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval), - syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval), - timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle), - timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting), - timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe), - timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown), - timeoutTERM: duration(cluster.Dispatch.TimeoutTERM, defaultTimeoutTERM), - timeoutSignal: duration(cluster.Dispatch.TimeoutSignal, defaultTimeoutSignal), + maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond, + probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval), + syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval), + timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle), + timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting), + timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe), + timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown), + timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM), + timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal), installPublicKey: installPublicKey, + tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix, stop: make(chan bool), } wp.registerMetrics(reg) @@ -128,6 +131,7 @@ type Pool struct { // configuration logger logrus.FieldLogger arvClient *arvados.Client + instanceSetID cloud.InstanceSetID instanceSet *throttledInstanceSet newExecutor func(cloud.Instance) Executor bootProbeCommand string @@ -143,6 +147,7 @@ type Pool struct { timeoutTERM time.Duration timeoutSignal time.Duration installPublicKey ssh.PublicKey + tagKeyPrefix string // private state subscribers map[<-chan struct{}]chan<- struct{} @@ -281,9 +286,10 @@ func (wp *Pool) Create(it arvados.InstanceType) bool { go func() { defer wp.notify() tags := cloud.InstanceTags{ - tagKeyInstanceType: it.Name, - tagKeyIdleBehavior: string(IdleBehaviorRun), - tagKeyInstanceSecret: secret, + wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID), + wp.tagKeyPrefix + tagKeyInstanceType: it.Name, + wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun), + wp.tagKeyPrefix + tagKeyInstanceSecret: secret, } initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename)) inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey) @@ -338,7 +344,8 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) // // Caller must have lock. func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) { - inst = tagVerifier{inst} + secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret] + inst = tagVerifier{inst, secret} id := inst.ID() if wkr := wp.workers[id]; wkr != nil { wkr.executor.SetTarget(inst) @@ -349,7 +356,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*wor } state := StateUnknown - if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok { + if _, ok := wp.creating[secret]; ok { state = StateBooting } @@ -359,7 +366,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*wor // process); otherwise, default to "run". After this, // wkr.idleBehavior is the source of truth, and will only be // changed via SetIdleBehavior(). - idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior]) + idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior]) if !validIdleBehavior[idleBehavior] { idleBehavior = IdleBehaviorRun } @@ -728,7 +735,7 @@ func (wp *Pool) getInstancesAndSync() error { } wp.logger.Debug("getting instance list") threshold := time.Now() - instances, err := wp.instanceSet.Instances(cloud.InstanceTags{}) + instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)}) if err != nil { wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify) return err @@ -748,7 +755,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { notify := false for _, inst := range instances { - itTag := inst.Tags()[tagKeyInstanceType] + itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType] it, ok := wp.instanceTypes[itTag] if !ok { wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)