X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/443a0b96316ed46600dc5035193adae6ac4d1f74..a8fe18b2b941f31896033bb58b9056e7731581b7:/lib/dispatchcloud/dispatcher_test.go diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go index 1f94577434..00157b75c6 100644 --- a/lib/dispatchcloud/dispatcher_test.go +++ b/lib/dispatchcloud/dispatcher_test.go @@ -5,6 +5,7 @@ package dispatchcloud import ( + "context" "encoding/json" "io/ioutil" "math/rand" @@ -16,7 +17,8 @@ import ( "git.curoverse.com/arvados.git/lib/dispatchcloud/test" "git.curoverse.com/arvados.git/sdk/go/arvados" - "github.com/sirupsen/logrus" + "git.curoverse.com/arvados.git/sdk/go/arvadostest" + "git.curoverse.com/arvados.git/sdk/go/ctxlog" "golang.org/x/crypto/ssh" check "gopkg.in/check.v1" ) @@ -24,45 +26,46 @@ import ( var _ = check.Suite(&DispatcherSuite{}) type DispatcherSuite struct { - cluster *arvados.Cluster - instanceSet *test.LameInstanceSet - stubDriver *test.StubDriver - disp *dispatcher -} - -func (s *DispatcherSuite) SetUpSuite(c *check.C) { - if os.Getenv("ARVADOS_DEBUG") != "" { - logrus.StandardLogger().SetLevel(logrus.DebugLevel) - } + ctx context.Context + cancel context.CancelFunc + cluster *arvados.Cluster + stubDriver *test.StubDriver + disp *dispatcher } func (s *DispatcherSuite) SetUpTest(c *check.C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) + s.ctx = ctxlog.Context(s.ctx, ctxlog.TestLogger(c)) dispatchpub, _ := test.LoadTestKey(c, "test/sshkey_dispatch") dispatchprivraw, err := ioutil.ReadFile("test/sshkey_dispatch") c.Assert(err, check.IsNil) _, hostpriv := test.LoadTestKey(c, "test/sshkey_vm") s.stubDriver = &test.StubDriver{ - HostKey: hostpriv, - AuthorizedKeys: []ssh.PublicKey{dispatchpub}, - ErrorRateDestroy: 0.1, + HostKey: hostpriv, + AuthorizedKeys: []ssh.PublicKey{dispatchpub}, + ErrorRateDestroy: 0.1, + MinTimeBetweenCreateCalls: time.Millisecond, } s.cluster = &arvados.Cluster{ CloudVMs: arvados.CloudVMs{ - Driver: "test", - SyncInterval: arvados.Duration(10 * time.Millisecond), - TimeoutIdle: arvados.Duration(30 * time.Millisecond), - TimeoutBooting: arvados.Duration(30 * time.Millisecond), - TimeoutProbe: arvados.Duration(15 * time.Millisecond), - TimeoutShutdown: arvados.Duration(5 * time.Millisecond), + Driver: "test", + SyncInterval: arvados.Duration(10 * time.Millisecond), + TimeoutIdle: arvados.Duration(150 * time.Millisecond), + TimeoutBooting: arvados.Duration(150 * time.Millisecond), + TimeoutProbe: arvados.Duration(15 * time.Millisecond), + TimeoutShutdown: arvados.Duration(5 * time.Millisecond), + MaxCloudOpsPerSecond: 500, }, Dispatch: arvados.Dispatch{ - PrivateKey: dispatchprivraw, + PrivateKey: string(dispatchprivraw), PollInterval: arvados.Duration(5 * time.Millisecond), ProbeInterval: arvados.Duration(5 * time.Millisecond), StaleLockTimeout: arvados.Duration(5 * time.Millisecond), MaxProbesPerSecond: 1000, + TimeoutSignal: arvados.Duration(3 * time.Millisecond), + TimeoutTERM: arvados.Duration(20 * time.Millisecond), }, InstanceTypes: arvados.InstanceTypeMap{ test.InstanceType(1).Name: test.InstanceType(1), @@ -79,14 +82,27 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) { DispatchCloud: arvados.SystemServiceInstance{Listen: ":"}, }, }, + Services: arvados.Services{ + Controller: arvados.Service{ExternalURL: arvados.URL{Scheme: "https", Host: os.Getenv("ARVADOS_API_HOST")}}, + }, + } + + arvClient, err := arvados.NewClientFromConfig(s.cluster) + c.Check(err, check.IsNil) + + s.disp = &dispatcher{ + Cluster: s.cluster, + Context: s.ctx, + ArvClient: arvClient, + AuthToken: arvadostest.AdminToken, } - s.disp = &dispatcher{Cluster: s.cluster} // Test cases can modify s.cluster before calling // initialize(), and then modify private state before calling // go run(). } func (s *DispatcherSuite) TearDownTest(c *check.C) { + s.cancel() s.disp.Close() } @@ -121,17 +137,20 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { for _, ctr := range queue.Containers { waiting[ctr.UUID] = struct{}{} } - executeContainer := func(ctr arvados.Container) int { + finishContainer := func(ctr arvados.Container) { mtx.Lock() defer mtx.Unlock() if _, ok := waiting[ctr.UUID]; !ok { - c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", ctr.UUID) - return 1 + c.Errorf("container completed twice: %s", ctr.UUID) + return } delete(waiting, ctr.UUID) if len(waiting) == 0 { close(done) } + } + executeContainer := func(ctr arvados.Container) int { + finishContainer(ctr) return int(rand.Uint32() & 0x3) } n := 0 @@ -141,11 +160,14 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond)))) stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond))) stubvm.ExecuteContainer = executeContainer + stubvm.CrashRunningContainer = finishContainer switch n % 7 { case 0: stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond) case 1: stubvm.CrunchRunMissing = true + case 2: + stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)) * time.Millisecond) default: stubvm.CrunchRunCrashRate = 0.1 } @@ -163,7 +185,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting) } - deadline := time.Now().Add(time.Second) + deadline := time.Now().Add(5 * time.Second) for range time.NewTicker(10 * time.Millisecond).C { insts, err := s.stubDriver.InstanceSets()[0].Instances(nil) c.Check(err, check.IsNil) @@ -228,11 +250,11 @@ func (s *DispatcherSuite) TestInstancesAPI(c *check.C) { type instance struct { Instance string - WorkerState string + WorkerState string `json:"worker_state"` Price float64 - LastContainerUUID string - ArvadosInstanceType string - ProviderInstanceType string + LastContainerUUID string `json:"last_container_uuid"` + ArvadosInstanceType string `json:"arvados_instance_type"` + ProviderInstanceType string `json:"provider_instance_type"` } type instancesResponse struct { Items []instance @@ -254,11 +276,17 @@ func (s *DispatcherSuite) TestInstancesAPI(c *check.C) { ch := s.disp.pool.Subscribe() defer s.disp.pool.Unsubscribe(ch) - err := s.disp.pool.Create(test.InstanceType(1)) - c.Check(err, check.IsNil) + ok := s.disp.pool.Create(test.InstanceType(1)) + c.Check(ok, check.Equals, true) <-ch - sr = getInstances() + for deadline := time.Now().Add(time.Second); time.Now().Before(deadline); { + sr = getInstances() + if len(sr.Items) > 0 { + break + } + time.Sleep(time.Millisecond) + } c.Assert(len(sr.Items), check.Equals, 1) c.Check(sr.Items[0].Instance, check.Matches, "stub.*") c.Check(sr.Items[0].WorkerState, check.Equals, "booting")