package worker
import (
- "io"
"time"
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- "github.com/Sirupsen/logrus"
+ "github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
lameInstanceSet := &test.LameInstanceSet{Hold: make(chan bool)}
type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
+ type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
pool := &Pool{
logger: logrus.StandardLogger(),
- newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
- instanceSet: lameInstanceSet,
+ newExecutor: func(cloud.Instance) Executor { return stubExecutor{} },
+ instanceSet: &throttledInstanceSet{InstanceSet: lameInstanceSet},
instanceTypes: arvados.InstanceTypeMap{
type1.Name: type1,
type2.Name: type2,
+ type3.Name: type3,
},
}
notify := pool.Subscribe()
c.Check(pool.Unallocated()[type1], check.Equals, 0)
c.Check(pool.Unallocated()[type2], check.Equals, 0)
+ c.Check(pool.Unallocated()[type3], check.Equals, 0)
pool.Create(type2)
pool.Create(type1)
pool.Create(type2)
+ pool.Create(type3)
c.Check(pool.Unallocated()[type1], check.Equals, 1)
c.Check(pool.Unallocated()[type2], check.Equals, 2)
- // Unblock the pending Create calls and (before calling Sync!)
- // wait for the pool to process the returned instances.
- go lameInstanceSet.Release(3)
+ c.Check(pool.Unallocated()[type3], check.Equals, 1)
+
+ // Unblock the pending Create calls.
+ go lameInstanceSet.Release(4)
+
+ // Wait for each instance to either return from its Create
+ // call, or show up in a poll.
suite.wait(c, pool, notify, func() bool {
- list, err := lameInstanceSet.Instances(nil)
- return err == nil && len(list) == 3
+ pool.mtx.RLock()
+ defer pool.mtx.RUnlock()
+ return len(pool.workers) == 4
})
- c.Check(pool.Unallocated()[type1], check.Equals, 1)
- c.Check(pool.Unallocated()[type2], check.Equals, 2)
- pool.getInstancesAndSync()
- // Returned counts can be temporarily higher than 1 and 2 if
- // poll ran before Create() returned.
- c.Check(pool.Unallocated()[type1], check.Not(less), 1)
- c.Check(pool.Unallocated()[type2], check.Not(less), 2)
+ // Place type3 node on admin-hold
+ ivs := suite.instancesByType(pool, type3)
+ c.Assert(ivs, check.HasLen, 1)
+ type3instanceID := ivs[0].Instance
+ err := pool.SetIdleBehavior(type3instanceID, IdleBehaviorHold)
+ c.Check(err, check.IsNil)
+
+ // Check admin-hold behavior: refuse to shutdown, and don't
+ // report as Unallocated ("available now or soon").
+ c.Check(pool.Shutdown(type3), check.Equals, false)
suite.wait(c, pool, notify, func() bool {
- return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 2
+ return pool.Unallocated()[type3] == 0
})
+ c.Check(suite.instancesByType(pool, type3), check.HasLen, 1)
+ // Shutdown both type2 nodes
c.Check(pool.Shutdown(type2), check.Equals, true)
suite.wait(c, pool, notify, func() bool {
return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
}
break
}
+
+ // Shutdown type1 node
c.Check(pool.Shutdown(type1), check.Equals, true)
suite.wait(c, pool, notify, func() bool {
- return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0
+ return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 && pool.Unallocated()[type3] == 0
})
select {
case <-notify2:
case <-time.After(time.Second):
c.Error("notify did not receive")
}
- go lameInstanceSet.Release(3) // unblock Destroy calls
+
+ // Put type3 node back in service.
+ err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorRun)
+ c.Check(err, check.IsNil)
+ suite.wait(c, pool, notify, func() bool {
+ return pool.Unallocated()[type3] == 1
+ })
+
+ // Check admin-drain behavior: shut down right away, and don't
+ // report as Unallocated.
+ err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorDrain)
+ c.Check(err, check.IsNil)
+ suite.wait(c, pool, notify, func() bool {
+ return pool.Unallocated()[type3] == 0
+ })
+ suite.wait(c, pool, notify, func() bool {
+ ivs := suite.instancesByType(pool, type3)
+ return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String()
+ })
+
+ // Unblock all pending Destroy calls. Pool calls Destroy again
+ // if a node still appears in the provider list after a
+ // previous attempt, so there might be more than 4 Destroy
+ // calls to unblock.
+ go lameInstanceSet.Release(4444)
+
+ // Sync until all instances disappear from the provider list.
+ suite.wait(c, pool, notify, func() bool {
+ pool.getInstancesAndSync()
+ return len(pool.Instances()) == 0
+ })
+}
+
+func (suite *PoolSuite) instancesByType(pool *Pool, it arvados.InstanceType) []InstanceView {
+ var ivs []InstanceView
+ for _, iv := range pool.Instances() {
+ if iv.ArvadosInstanceType == it.Name {
+ ivs = append(ivs, iv)
+ }
+ }
+ return ivs
}
func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
}
c.Check(ready(), check.Equals, true)
}
-
-type stubExecutor struct{}
-
-func (*stubExecutor) SetTarget(cloud.ExecutorTarget) {}
-
-func (*stubExecutor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
- return nil, nil, nil
-}
-
-func (*stubExecutor) Close() {}