package dispatchcloud
import (
+ "context"
"encoding/json"
"io/ioutil"
"math/rand"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- "github.com/sirupsen/logrus"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"golang.org/x/crypto/ssh"
check "gopkg.in/check.v1"
)
var _ = check.Suite(&DispatcherSuite{})
type DispatcherSuite struct {
- cluster *arvados.Cluster
- instanceSet *test.LameInstanceSet
- stubDriver *test.StubDriver
- disp *dispatcher
-}
-
-func (s *DispatcherSuite) SetUpSuite(c *check.C) {
- if os.Getenv("ARVADOS_DEBUG") != "" {
- logrus.StandardLogger().SetLevel(logrus.DebugLevel)
- }
+ ctx context.Context
+ cancel context.CancelFunc
+ cluster *arvados.Cluster
+ stubDriver *test.StubDriver
+ disp *dispatcher
}
func (s *DispatcherSuite) SetUpTest(c *check.C) {
+ s.ctx, s.cancel = context.WithCancel(context.Background())
+ s.ctx = ctxlog.Context(s.ctx, ctxlog.TestLogger(c))
dispatchpub, _ := test.LoadTestKey(c, "test/sshkey_dispatch")
dispatchprivraw, err := ioutil.ReadFile("test/sshkey_dispatch")
c.Assert(err, check.IsNil)
_, hostpriv := test.LoadTestKey(c, "test/sshkey_vm")
s.stubDriver = &test.StubDriver{
- HostKey: hostpriv,
- AuthorizedKeys: []ssh.PublicKey{dispatchpub},
- ErrorRateDestroy: 0.1,
+ HostKey: hostpriv,
+ AuthorizedKeys: []ssh.PublicKey{dispatchpub},
+ ErrorRateDestroy: 0.1,
+ MinTimeBetweenCreateCalls: time.Millisecond,
}
s.cluster = &arvados.Cluster{
CloudVMs: arvados.CloudVMs{
- Driver: "test",
- SyncInterval: arvados.Duration(10 * time.Millisecond),
- TimeoutIdle: arvados.Duration(30 * time.Millisecond),
- TimeoutBooting: arvados.Duration(30 * time.Millisecond),
- TimeoutProbe: arvados.Duration(15 * time.Millisecond),
- TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
+ Driver: "test",
+ SyncInterval: arvados.Duration(10 * time.Millisecond),
+ TimeoutIdle: arvados.Duration(150 * time.Millisecond),
+ TimeoutBooting: arvados.Duration(150 * time.Millisecond),
+ TimeoutProbe: arvados.Duration(15 * time.Millisecond),
+ TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
+ MaxCloudOpsPerSecond: 500,
},
Dispatch: arvados.Dispatch{
- PrivateKey: dispatchprivraw,
+ PrivateKey: string(dispatchprivraw),
PollInterval: arvados.Duration(5 * time.Millisecond),
ProbeInterval: arvados.Duration(5 * time.Millisecond),
StaleLockTimeout: arvados.Duration(5 * time.Millisecond),
MaxProbesPerSecond: 1000,
+ TimeoutSignal: arvados.Duration(3 * time.Millisecond),
+ TimeoutTERM: arvados.Duration(20 * time.Millisecond),
},
InstanceTypes: arvados.InstanceTypeMap{
test.InstanceType(1).Name: test.InstanceType(1),
DispatchCloud: arvados.SystemServiceInstance{Listen: ":"},
},
},
+ Services: arvados.Services{
+ Controller: arvados.Service{ExternalURL: arvados.URL{Scheme: "https", Host: os.Getenv("ARVADOS_API_HOST")}},
+ },
+ }
+
+ arvClient, err := arvados.NewClientFromConfig(s.cluster)
+ c.Check(err, check.IsNil)
+
+ s.disp = &dispatcher{
+ Cluster: s.cluster,
+ Context: s.ctx,
+ ArvClient: arvClient,
+ AuthToken: arvadostest.AdminToken,
}
- s.disp = &dispatcher{Cluster: s.cluster}
// Test cases can modify s.cluster before calling
// initialize(), and then modify private state before calling
// go run().
}
func (s *DispatcherSuite) TearDownTest(c *check.C) {
+ s.cancel()
s.disp.Close()
}
for _, ctr := range queue.Containers {
waiting[ctr.UUID] = struct{}{}
}
- executeContainer := func(ctr arvados.Container) int {
+ finishContainer := func(ctr arvados.Container) {
mtx.Lock()
defer mtx.Unlock()
if _, ok := waiting[ctr.UUID]; !ok {
- c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", ctr.UUID)
- return 1
+ c.Errorf("container completed twice: %s", ctr.UUID)
+ return
}
delete(waiting, ctr.UUID)
if len(waiting) == 0 {
close(done)
}
+ }
+ executeContainer := func(ctr arvados.Container) int {
+ finishContainer(ctr)
return int(rand.Uint32() & 0x3)
}
n := 0
stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
stubvm.ExecuteContainer = executeContainer
+ stubvm.CrashRunningContainer = finishContainer
switch n % 7 {
case 0:
stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
case 1:
stubvm.CrunchRunMissing = true
+ case 2:
+ stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)) * time.Millisecond)
default:
stubvm.CrunchRunCrashRate = 0.1
}
c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
}
- deadline := time.Now().Add(time.Second)
+ deadline := time.Now().Add(5 * time.Second)
for range time.NewTicker(10 * time.Millisecond).C {
insts, err := s.stubDriver.InstanceSets()[0].Instances(nil)
c.Check(err, check.IsNil)
type instance struct {
Instance string
- WorkerState string
+ WorkerState string `json:"worker_state"`
Price float64
- LastContainerUUID string
- ArvadosInstanceType string
- ProviderInstanceType string
+ LastContainerUUID string `json:"last_container_uuid"`
+ ArvadosInstanceType string `json:"arvados_instance_type"`
+ ProviderInstanceType string `json:"provider_instance_type"`
}
type instancesResponse struct {
Items []instance
ch := s.disp.pool.Subscribe()
defer s.disp.pool.Unsubscribe(ch)
- err := s.disp.pool.Create(test.InstanceType(1))
- c.Check(err, check.IsNil)
+ ok := s.disp.pool.Create(test.InstanceType(1))
+ c.Check(ok, check.Equals, true)
<-ch
- sr = getInstances()
+ for deadline := time.Now().Add(time.Second); time.Now().Before(deadline); {
+ sr = getInstances()
+ if len(sr.Items) > 0 {
+ break
+ }
+ time.Sleep(time.Millisecond)
+ }
c.Assert(len(sr.Items), check.Equals, 1)
c.Check(sr.Items[0].Instance, check.Matches, "stub.*")
c.Check(sr.Items[0].WorkerState, check.Equals, "booting")