14325: Merge branch 'master'
authorTom Clegg <tclegg@veritasgenetics.com>
Wed, 6 Feb 2019 21:27:57 +0000 (16:27 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Wed, 6 Feb 2019 21:27:57 +0000 (16:27 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

27 files changed:
build/run-build-packages.sh
lib/dispatchcloud/container/queue.go
lib/dispatchcloud/container/queue_test.go [new file with mode: 0644]
lib/dispatchcloud/dispatcher.go
lib/dispatchcloud/dispatcher_test.go
lib/dispatchcloud/instance_set_proxy.go [deleted file]
lib/dispatchcloud/readme_states.txt [new file with mode: 0644]
lib/dispatchcloud/scheduler/interfaces.go
lib/dispatchcloud/scheduler/run_queue.go
lib/dispatchcloud/scheduler/run_queue_test.go
lib/dispatchcloud/ssh_executor/executor.go
lib/dispatchcloud/ssh_executor/executor_test.go
lib/dispatchcloud/test/lame_instance_set.go [deleted file]
lib/dispatchcloud/test/logger.go [new file with mode: 0644]
lib/dispatchcloud/test/ssh_service.go
lib/dispatchcloud/test/stub_driver.go
lib/dispatchcloud/worker/pool.go
lib/dispatchcloud/worker/pool_test.go
lib/dispatchcloud/worker/throttle.go [new file with mode: 0644]
lib/dispatchcloud/worker/throttle_test.go [new file with mode: 0644]
lib/dispatchcloud/worker/worker.go
lib/dispatchcloud/worker/worker_test.go [new file with mode: 0644]
sdk/go/arvados/client.go
sdk/go/arvados/config.go
sdk/go/arvados/container.go
sdk/go/arvadostest/fixtures.go
vendor/vendor.json

index 0919faf37f3056572987c9782fabcdc78530658c..b0199aed9767700a9e891e529a1b6fdc84415c25 100755 (executable)
@@ -303,9 +303,8 @@ package_go_binary cmd/arvados-server arvados-server \
     "Arvados server daemons"
 package_go_binary cmd/arvados-server arvados-controller \
     "Arvados cluster controller daemon"
-# No package until #14325
-#package_go_binary cmd/arvados-server crunch-dispatch-cloud \
-#    "Arvados cluster cloud dispatch"
+package_go_binary cmd/arvados-server crunch-dispatch-cloud \
+    "Arvados cluster cloud dispatch"
 package_go_binary sdk/go/crunchrunner crunchrunner \
     "Crunchrunner executes a command inside a container and uploads the output"
 package_go_binary services/arv-git-httpd arvados-git-httpd \
index 965407e518a1aaf16444d2b3ce6f84af90be3b30..847fe9e27c03e3794f068bd59c7c13031798fb71 100644 (file)
@@ -27,8 +27,8 @@ type APIClient interface {
 type QueueEnt struct {
        // The container to run. Only the UUID, State, Priority, and
        // RuntimeConstraints fields are populated.
-       Container    arvados.Container
-       InstanceType arvados.InstanceType
+       Container    arvados.Container    `json:"container"`
+       InstanceType arvados.InstanceType `json:"instance_type"`
 }
 
 // String implements fmt.Stringer by returning the queued container's
@@ -184,7 +184,9 @@ func (cq *Queue) Update() error {
        cq.mtx.Lock()
        defer cq.mtx.Unlock()
        for uuid, ctr := range next {
-               if _, keep := cq.dontupdate[uuid]; keep {
+               if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
+                       // Don't clobber a local update that happened
+                       // after we started polling.
                        continue
                }
                if cur, ok := cq.current[uuid]; !ok {
@@ -195,11 +197,16 @@ func (cq *Queue) Update() error {
                }
        }
        for uuid := range cq.current {
-               if _, keep := cq.dontupdate[uuid]; keep {
+               if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
+                       // Don't expunge an entry that was
+                       // added/updated locally after we started
+                       // polling.
                        continue
-               } else if _, keep = next[uuid]; keep {
-                       continue
-               } else {
+               } else if _, stillpresent := next[uuid]; !stillpresent {
+                       // Expunge an entry that no longer appears in
+                       // the poll response (evidently it's
+                       // cancelled, completed, deleted, or taken by
+                       // a different dispatcher).
                        delete(cq.current, uuid)
                }
        }
@@ -211,9 +218,44 @@ func (cq *Queue) Update() error {
 
 func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
        it, err := cq.chooseType(&ctr)
-       if err != nil {
-               // FIXME: throttle warnings, cancel after timeout
-               cq.logger.Warnf("cannot run %s", &ctr)
+       if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
+               // We assume here that any chooseType error is a hard
+               // error: it wouldn't help to try again, or to leave
+               // it for a different dispatcher process to attempt.
+               errorString := err.Error()
+               cq.logger.WithField("ContainerUUID", ctr.UUID).Warn("cancel container with no suitable instance type")
+               go func() {
+                       var err error
+                       defer func() {
+                               if err == nil {
+                                       return
+                               }
+                               // On failure, check current container
+                               // state, and don't log the error if
+                               // the failure came from losing a
+                               // race.
+                               var latest arvados.Container
+                               cq.client.RequestAndDecode(&latest, "GET", "arvados/v1/containers/"+ctr.UUID, nil, map[string][]string{"select": {"state"}})
+                               if latest.State == arvados.ContainerStateCancelled {
+                                       return
+                               }
+                               cq.logger.WithField("ContainerUUID", ctr.UUID).WithError(err).Warn("error while trying to cancel unsatisfiable container")
+                       }()
+                       if ctr.State == arvados.ContainerStateQueued {
+                               err = cq.Lock(ctr.UUID)
+                               if err != nil {
+                                       return
+                               }
+                       }
+                       err = cq.setRuntimeError(ctr.UUID, errorString)
+                       if err != nil {
+                               return
+                       }
+                       err = cq.Cancel(ctr.UUID)
+                       if err != nil {
+                               return
+                       }
+               }()
                return
        }
        cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
@@ -229,6 +271,18 @@ func (cq *Queue) Unlock(uuid string) error {
        return cq.apiUpdate(uuid, "unlock")
 }
 
+// setRuntimeError sets runtime_status["error"] to the given value.
+// Container should already have state==Locked or Running.
+func (cq *Queue) setRuntimeError(uuid, errorString string) error {
+       return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]map[string]interface{}{
+               "container": {
+                       "runtime_status": {
+                               "error": errorString,
+                       },
+               },
+       })
+}
+
 // Cancel cancels the given container.
 func (cq *Queue) Cancel(uuid string) error {
        err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
diff --git a/lib/dispatchcloud/container/queue_test.go b/lib/dispatchcloud/container/queue_test.go
new file mode 100644 (file)
index 0000000..91d6535
--- /dev/null
@@ -0,0 +1,131 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package container
+
+import (
+       "errors"
+       "os"
+       "sync"
+       "testing"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "github.com/sirupsen/logrus"
+       check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
+
+var _ = check.Suite(&IntegrationSuite{})
+
+func logger() logrus.FieldLogger {
+       logger := logrus.StandardLogger()
+       if os.Getenv("ARVADOS_DEBUG") != "" {
+               logger.SetLevel(logrus.DebugLevel)
+       }
+       return logger
+}
+
+type IntegrationSuite struct{}
+
+func (suite *IntegrationSuite) TearDownTest(c *check.C) {
+       err := arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
+       c.Check(err, check.IsNil)
+}
+
+func (suite *IntegrationSuite) TestGetLockUnlockCancel(c *check.C) {
+       typeChooser := func(ctr *arvados.Container) (arvados.InstanceType, error) {
+               return arvados.InstanceType{Name: "testType"}, nil
+       }
+
+       client := arvados.NewClientFromEnv()
+       cq := NewQueue(logger(), nil, typeChooser, client)
+
+       err := cq.Update()
+       c.Check(err, check.IsNil)
+
+       ents, threshold := cq.Entries()
+       c.Check(len(ents), check.Not(check.Equals), 0)
+       c.Check(time.Since(threshold) < time.Minute, check.Equals, true)
+       c.Check(time.Since(threshold) > 0, check.Equals, true)
+
+       _, ok := ents[arvadostest.QueuedContainerUUID]
+       c.Check(ok, check.Equals, true)
+
+       var wg sync.WaitGroup
+       for uuid, ent := range ents {
+               c.Check(ent.Container.UUID, check.Equals, uuid)
+               c.Check(ent.InstanceType.Name, check.Equals, "testType")
+               c.Check(ent.Container.State, check.Equals, arvados.ContainerStateQueued)
+               c.Check(ent.Container.Priority > 0, check.Equals, true)
+
+               ctr, ok := cq.Get(uuid)
+               c.Check(ok, check.Equals, true)
+               c.Check(ctr.UUID, check.Equals, uuid)
+
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       err := cq.Unlock(uuid)
+                       c.Check(err, check.NotNil)
+                       err = cq.Lock(uuid)
+                       c.Check(err, check.IsNil)
+                       ctr, ok := cq.Get(uuid)
+                       c.Check(ok, check.Equals, true)
+                       c.Check(ctr.State, check.Equals, arvados.ContainerStateLocked)
+                       err = cq.Lock(uuid)
+                       c.Check(err, check.NotNil)
+                       err = cq.Unlock(uuid)
+                       c.Check(err, check.IsNil)
+                       ctr, ok = cq.Get(uuid)
+                       c.Check(ok, check.Equals, true)
+                       c.Check(ctr.State, check.Equals, arvados.ContainerStateQueued)
+                       err = cq.Unlock(uuid)
+                       c.Check(err, check.NotNil)
+               }()
+       }
+       wg.Wait()
+
+       err = cq.Cancel(arvadostest.CompletedContainerUUID)
+       c.Check(err, check.ErrorMatches, `.*State cannot change from Complete to Cancelled.*`)
+}
+
+func (suite *IntegrationSuite) TestCancelIfNoInstanceType(c *check.C) {
+       errorTypeChooser := func(ctr *arvados.Container) (arvados.InstanceType, error) {
+               return arvados.InstanceType{}, errors.New("no suitable instance type")
+       }
+
+       client := arvados.NewClientFromEnv()
+       cq := NewQueue(logger(), nil, errorTypeChooser, client)
+
+       var ctr arvados.Container
+       err := client.RequestAndDecode(&ctr, "GET", "arvados/v1/containers/"+arvadostest.QueuedContainerUUID, nil, nil)
+       c.Check(err, check.IsNil)
+       c.Check(ctr.State, check.Equals, arvados.ContainerStateQueued)
+
+       cq.Update()
+
+       // Wait for the cancel operation to take effect. Container
+       // will have state=Cancelled or just disappear from the queue.
+       suite.waitfor(c, time.Second, func() bool {
+               err := client.RequestAndDecode(&ctr, "GET", "arvados/v1/containers/"+arvadostest.QueuedContainerUUID, nil, nil)
+               return err == nil && ctr.State == arvados.ContainerStateCancelled
+       })
+       c.Check(ctr.RuntimeStatus["error"], check.Equals, `no suitable instance type`)
+}
+
+func (suite *IntegrationSuite) waitfor(c *check.C, timeout time.Duration, fn func() bool) {
+       defer func() {
+               c.Check(fn(), check.Equals, true)
+       }()
+       deadline := time.Now().Add(timeout)
+       for !fn() && time.Now().Before(deadline) {
+               time.Sleep(timeout / 1000)
+       }
+}
index 2415094ac00290625794d8204449052ad749ad85..3e3f5ee1993f0f0dfb98e4c68e02e80b1ee6f888 100644 (file)
@@ -21,6 +21,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/auth"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "github.com/julienschmidt/httprouter"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/prometheus/client_golang/prometheus/promhttp"
        "github.com/sirupsen/logrus"
@@ -35,6 +36,7 @@ const (
 type pool interface {
        scheduler.WorkerPool
        Instances() []worker.InstanceView
+       SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
        Stop()
 }
 
@@ -87,6 +89,7 @@ func (disp *dispatcher) Close() {
 // Make a worker.Executor for the given instance.
 func (disp *dispatcher) newExecutor(inst cloud.Instance) worker.Executor {
        exr := ssh_executor.New(inst)
+       exr.SetTargetPort(disp.Cluster.CloudVMs.SSHPort)
        exr.SetSigners(disp.sshKey)
        return exr
 }
@@ -125,9 +128,9 @@ func (disp *dispatcher) initialize() {
        if err != nil {
                disp.logger.Fatalf("error initializing driver: %s", err)
        }
-       disp.instanceSet = &instanceSetProxy{instanceSet}
+       disp.instanceSet = instanceSet
        disp.reg = prometheus.NewRegistry()
-       disp.pool = worker.NewPool(disp.logger, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
+       disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
        disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
 
        if disp.Cluster.ManagementToken == "" {
@@ -135,14 +138,17 @@ func (disp *dispatcher) initialize() {
                        http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
                })
        } else {
-               mux := http.NewServeMux()
-               mux.HandleFunc("/arvados/v1/dispatch/containers", disp.apiContainers)
-               mux.HandleFunc("/arvados/v1/dispatch/instances", disp.apiInstances)
+               mux := httprouter.New()
+               mux.HandlerFunc("GET", "/arvados/v1/dispatch/containers", disp.apiContainers)
+               mux.HandlerFunc("GET", "/arvados/v1/dispatch/instances", disp.apiInstances)
+               mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/hold", disp.apiInstanceHold)
+               mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/drain", disp.apiInstanceDrain)
+               mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/run", disp.apiInstanceRun)
                metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
                        ErrorLog: disp.logger,
                })
-               mux.Handle("/metrics", metricsH)
-               mux.Handle("/metrics.json", metricsH)
+               mux.Handler("GET", "/metrics", metricsH)
+               mux.Handler("GET", "/metrics.json", metricsH)
                disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
        }
 }
@@ -169,12 +175,8 @@ func (disp *dispatcher) run() {
 
 // Management API: all active and queued containers.
 func (disp *dispatcher) apiContainers(w http.ResponseWriter, r *http.Request) {
-       if r.Method != "GET" {
-               httpserver.Error(w, "method not allowed", http.StatusMethodNotAllowed)
-               return
-       }
        var resp struct {
-               Items []container.QueueEnt
+               Items []container.QueueEnt `json:"items"`
        }
        qEntries, _ := disp.queue.Entries()
        for _, ent := range qEntries {
@@ -185,13 +187,34 @@ func (disp *dispatcher) apiContainers(w http.ResponseWriter, r *http.Request) {
 
 // Management API: all active instances (cloud VMs).
 func (disp *dispatcher) apiInstances(w http.ResponseWriter, r *http.Request) {
-       if r.Method != "GET" {
-               httpserver.Error(w, "method not allowed", http.StatusMethodNotAllowed)
-               return
-       }
        var resp struct {
-               Items []worker.InstanceView
+               Items []worker.InstanceView `json:"items"`
        }
        resp.Items = disp.pool.Instances()
        json.NewEncoder(w).Encode(resp)
 }
+
+// Management API: set idle behavior to "hold" for specified instance.
+func (disp *dispatcher) apiInstanceHold(w http.ResponseWriter, r *http.Request) {
+       disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorHold)
+}
+
+// Management API: set idle behavior to "drain" for specified instance.
+func (disp *dispatcher) apiInstanceDrain(w http.ResponseWriter, r *http.Request) {
+       disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorDrain)
+}
+
+// Management API: set idle behavior to "run" for specified instance.
+func (disp *dispatcher) apiInstanceRun(w http.ResponseWriter, r *http.Request) {
+       disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorRun)
+}
+
+func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
+       params, _ := r.Context().Value(httprouter.ParamsKey).(httprouter.Params)
+       id := cloud.InstanceID(params.ByName("instance_id"))
+       err := disp.pool.SetIdleBehavior(id, want)
+       if err != nil {
+               httpserver.Error(w, err.Error(), http.StatusNotFound)
+               return
+       }
+}
index 1f94577434c4076fa0eca25cb2432d579631c657..674cacd5663676ead14e72c9543e8bfc12dadc51 100644 (file)
@@ -16,7 +16,6 @@ import (
 
        "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/sirupsen/logrus"
        "golang.org/x/crypto/ssh"
        check "gopkg.in/check.v1"
 )
@@ -24,16 +23,9 @@ import (
 var _ = check.Suite(&DispatcherSuite{})
 
 type DispatcherSuite struct {
-       cluster     *arvados.Cluster
-       instanceSet *test.LameInstanceSet
-       stubDriver  *test.StubDriver
-       disp        *dispatcher
-}
-
-func (s *DispatcherSuite) SetUpSuite(c *check.C) {
-       if os.Getenv("ARVADOS_DEBUG") != "" {
-               logrus.StandardLogger().SetLevel(logrus.DebugLevel)
-       }
+       cluster    *arvados.Cluster
+       stubDriver *test.StubDriver
+       disp       *dispatcher
 }
 
 func (s *DispatcherSuite) SetUpTest(c *check.C) {
@@ -43,17 +35,18 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
 
        _, hostpriv := test.LoadTestKey(c, "test/sshkey_vm")
        s.stubDriver = &test.StubDriver{
-               HostKey:          hostpriv,
-               AuthorizedKeys:   []ssh.PublicKey{dispatchpub},
-               ErrorRateDestroy: 0.1,
+               HostKey:                   hostpriv,
+               AuthorizedKeys:            []ssh.PublicKey{dispatchpub},
+               ErrorRateDestroy:          0.1,
+               MinTimeBetweenCreateCalls: time.Millisecond,
        }
 
        s.cluster = &arvados.Cluster{
                CloudVMs: arvados.CloudVMs{
                        Driver:          "test",
                        SyncInterval:    arvados.Duration(10 * time.Millisecond),
-                       TimeoutIdle:     arvados.Duration(30 * time.Millisecond),
-                       TimeoutBooting:  arvados.Duration(30 * time.Millisecond),
+                       TimeoutIdle:     arvados.Duration(150 * time.Millisecond),
+                       TimeoutBooting:  arvados.Duration(150 * time.Millisecond),
                        TimeoutProbe:    arvados.Duration(15 * time.Millisecond),
                        TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
                },
@@ -228,11 +221,11 @@ func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
 
        type instance struct {
                Instance             string
-               WorkerState          string
+               WorkerState          string `json:"worker_state"`
                Price                float64
-               LastContainerUUID    string
-               ArvadosInstanceType  string
-               ProviderInstanceType string
+               LastContainerUUID    string `json:"last_container_uuid"`
+               ArvadosInstanceType  string `json:"arvados_instance_type"`
+               ProviderInstanceType string `json:"provider_instance_type"`
        }
        type instancesResponse struct {
                Items []instance
@@ -254,8 +247,8 @@ func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
 
        ch := s.disp.pool.Subscribe()
        defer s.disp.pool.Unsubscribe(ch)
-       err := s.disp.pool.Create(test.InstanceType(1))
-       c.Check(err, check.IsNil)
+       ok := s.disp.pool.Create(test.InstanceType(1))
+       c.Check(ok, check.Equals, true)
        <-ch
 
        sr = getInstances()
diff --git a/lib/dispatchcloud/instance_set_proxy.go b/lib/dispatchcloud/instance_set_proxy.go
deleted file mode 100644 (file)
index e728b67..0000000
+++ /dev/null
@@ -1,25 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package dispatchcloud
-
-import (
-       "git.curoverse.com/arvados.git/lib/cloud"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "golang.org/x/crypto/ssh"
-)
-
-type instanceSetProxy struct {
-       cloud.InstanceSet
-}
-
-func (is *instanceSetProxy) Create(it arvados.InstanceType, id cloud.ImageID, tags cloud.InstanceTags, pk ssh.PublicKey) (cloud.Instance, error) {
-       // TODO: return if Create failed recently with a RateLimitError or QuotaError
-       return is.InstanceSet.Create(it, id, tags, pk)
-}
-
-func (is *instanceSetProxy) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
-       // TODO: return if Instances failed recently with a RateLimitError
-       return is.InstanceSet.Instances(tags)
-}
diff --git a/lib/dispatchcloud/readme_states.txt b/lib/dispatchcloud/readme_states.txt
new file mode 100644 (file)
index 0000000..b654bbf
--- /dev/null
@@ -0,0 +1,31 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+# cpan -I -T install Graph::Easy
+# (eval `perl -I ~/perl5/lib/perl5 -Mlocal::lib`; cpan -T install Graph::Easy)
+# graph-easy --as=svg < readme_states.txt
+
+[Nonexistent] - appears in cloud list -> [Unknown]
+[Nonexistent] - create() returns ID -> [Booting]
+[Unknown] - create() returns ID -> [Booting]
+[Unknown] - boot timeout -> [Shutdown]
+[Booting] - boot+run probes succeed -> [Idle]
+[Idle] - idle timeout -> [Shutdown]
+[Idle] - probe timeout -> [Shutdown]
+[Idle] - want=drain -> [Shutdown]
+[Idle] - container starts -> [Running]
+[Running] - container ends -> [Idle]
+[Running] - container ends, want=drain -> [Shutdown]
+[Shutdown] - instance disappears from cloud -> [Gone]
+
+# Layouter fails if we add these
+#[Hold] - want=run -> [Booting]
+#[Hold] - want=drain -> [Shutdown]
+#[Running] - container ends, want=hold -> [Hold]
+#[Unknown] - want=hold -> [Hold]
+#[Booting] - want=hold -> [Hold]
+#[Idle] - want=hold -> [Hold]
+
+# Not worth saying?
+#[Booting] - boot probe succeeds, run probe fails -> [Booting]
index 59700c393523844094a0845922b767cd7da56e31..18cdc94fa52156ceab01d7dbe135d8db20029176 100644 (file)
@@ -13,7 +13,8 @@ import (
 )
 
 // A ContainerQueue is a set of containers that need to be started or
-// stopped. Implemented by container.Queue and test stubs.
+// stopped. Implemented by container.Queue and test stubs. See
+// container.Queue method documentation for details.
 type ContainerQueue interface {
        Entries() (entries map[string]container.QueueEnt, updated time.Time)
        Lock(uuid string) error
@@ -28,13 +29,13 @@ type ContainerQueue interface {
 
 // A WorkerPool asynchronously starts and stops worker VMs, and starts
 // and stops containers on them. Implemented by worker.Pool and test
-// stubs.
+// stubs. See worker.Pool method documentation for details.
 type WorkerPool interface {
        Running() map[string]time.Time
        Unallocated() map[arvados.InstanceType]int
        CountWorkers() map[worker.State]int
        AtQuota() bool
-       Create(arvados.InstanceType) error
+       Create(arvados.InstanceType) bool
        Shutdown(arvados.InstanceType) bool
        StartContainer(arvados.InstanceType, arvados.Container) bool
        KillContainer(uuid string)
index 8e74caef0d565c81e23a3a148f4f53a379450ac9..ecdae7f876786b948019b871d3018e3e42b48901 100644 (file)
@@ -7,7 +7,6 @@ package scheduler
 import (
        "sort"
 
-       "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/sirupsen/logrus"
@@ -62,11 +61,13 @@ tryrun:
                                break tryrun
                        } else {
                                logger.Info("creating new instance")
-                               err := sch.pool.Create(it)
-                               if err != nil {
-                                       if _, ok := err.(cloud.QuotaError); !ok {
-                                               logger.WithError(err).Warn("error creating worker")
-                                       }
+                               if !sch.pool.Create(it) {
+                                       // (Note pool.Create works
+                                       // asynchronously and logs its
+                                       // own failures, so we don't
+                                       // need to log this as a
+                                       // failure.)
+
                                        sch.queue.Unlock(ctr.UUID)
                                        // Don't let lower-priority
                                        // containers starve this one
index 8945f88a14385af1961f080f0e05601a994badeb..7dd6866c0f389e51a393300f9235053159582d51 100644 (file)
@@ -5,19 +5,16 @@
 package scheduler
 
 import (
-       "errors"
+       "sync"
        "time"
 
        "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
        "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/sirupsen/logrus"
        check "gopkg.in/check.v1"
 )
 
 var (
-       logger = logrus.StandardLogger()
-
        // arbitrary example container UUIDs
        uuids = func() (r []string) {
                for i := 0; i < 16; i++ {
@@ -43,36 +40,53 @@ type stubPool struct {
        creates   []arvados.InstanceType
        starts    []string
        shutdowns int
+       sync.Mutex
 }
 
-func (p *stubPool) AtQuota() bool                 { return p.atQuota }
-func (p *stubPool) Subscribe() <-chan struct{}    { return p.notify }
-func (p *stubPool) Unsubscribe(<-chan struct{})   {}
-func (p *stubPool) Running() map[string]time.Time { return p.running }
+func (p *stubPool) AtQuota() bool               { return p.atQuota }
+func (p *stubPool) Subscribe() <-chan struct{}  { return p.notify }
+func (p *stubPool) Unsubscribe(<-chan struct{}) {}
+func (p *stubPool) Running() map[string]time.Time {
+       p.Lock()
+       defer p.Unlock()
+       r := map[string]time.Time{}
+       for k, v := range p.running {
+               r[k] = v
+       }
+       return r
+}
 func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
+       p.Lock()
+       defer p.Unlock()
        r := map[arvados.InstanceType]int{}
        for it, n := range p.unalloc {
                r[it] = n
        }
        return r
 }
-func (p *stubPool) Create(it arvados.InstanceType) error {
+func (p *stubPool) Create(it arvados.InstanceType) bool {
+       p.Lock()
+       defer p.Unlock()
        p.creates = append(p.creates, it)
        if p.canCreate < 1 {
-               return stubQuotaError{errors.New("quota")}
+               return false
        }
        p.canCreate--
        p.unalloc[it]++
-       return nil
+       return true
 }
 func (p *stubPool) KillContainer(uuid string) {
-       p.running[uuid] = time.Now()
+       p.Lock()
+       defer p.Unlock()
+       delete(p.running, uuid)
 }
 func (p *stubPool) Shutdown(arvados.InstanceType) bool {
        p.shutdowns++
        return false
 }
 func (p *stubPool) CountWorkers() map[worker.State]int {
+       p.Lock()
+       defer p.Unlock()
        return map[worker.State]int{
                worker.StateBooting: len(p.unalloc) - len(p.idle),
                worker.StateIdle:    len(p.idle),
@@ -80,6 +94,8 @@ func (p *stubPool) CountWorkers() map[worker.State]int {
        }
 }
 func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
+       p.Lock()
+       defer p.Unlock()
        p.starts = append(p.starts, ctr.UUID)
        if p.idle[it] == 0 {
                return false
@@ -90,6 +106,10 @@ func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container
        return true
 }
 
+func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) {
+       return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
+}
+
 var _ = check.Suite(&SchedulerSuite{})
 
 type SchedulerSuite struct{}
@@ -101,9 +121,7 @@ type SchedulerSuite struct{}
 // create.
 func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
        queue := test.Queue{
-               ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
-                       return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
-               },
+               ChooseType: chooseType,
                Containers: []arvados.Container{
                        {
                                UUID:     test.ContainerUUID(1),
@@ -156,7 +174,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
                running:   map[string]time.Time{},
                canCreate: 0,
        }
-       New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+       New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
        c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
        c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
        c.Check(pool.running, check.HasLen, 1)
@@ -175,9 +193,7 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
                        shouldCreate = append(shouldCreate, test.InstanceType(3))
                }
                queue := test.Queue{
-                       ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
-                               return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
-                       },
+                       ChooseType: chooseType,
                        Containers: []arvados.Container{
                                {
                                        UUID:     test.ContainerUUID(2),
@@ -213,7 +229,7 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
                        starts:    []string{},
                        canCreate: 0,
                }
-               New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+               New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
                c.Check(pool.creates, check.DeepEquals, shouldCreate)
                c.Check(pool.starts, check.DeepEquals, []string{})
                c.Check(pool.shutdowns, check.Not(check.Equals), 0)
@@ -236,9 +252,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
                canCreate: 4,
        }
        queue := test.Queue{
-               ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
-                       return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
-               },
+               ChooseType: chooseType,
                Containers: []arvados.Container{
                        {
                                // create a new worker
@@ -303,7 +317,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
                },
        }
        queue.Update()
-       New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+       New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
        c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
        c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
        running := map[string]bool{}
index b5dba9870dc041bf730e71e2a3f0dd2bca1ab7fc..d0fb54c54cd932df806e0129a0f92d78cd3a9999 100644 (file)
@@ -36,9 +36,10 @@ func New(t cloud.ExecutorTarget) *Executor {
 //
 // An Executor must not be copied.
 type Executor struct {
-       target  cloud.ExecutorTarget
-       signers []ssh.Signer
-       mtx     sync.RWMutex // controls access to instance after creation
+       target     cloud.ExecutorTarget
+       targetPort string
+       signers    []ssh.Signer
+       mtx        sync.RWMutex // controls access to instance after creation
 
        client      *ssh.Client
        clientErr   error
@@ -67,6 +68,17 @@ func (exr *Executor) SetTarget(t cloud.ExecutorTarget) {
        exr.target = t
 }
 
+// SetTargetPort sets the default port (name or number) to connect
+// to. This is used only when the address returned by the target's
+// Address() method does not specify a port. If the given port is
+// empty (or SetTargetPort is not called at all), the default port is
+// "ssh".
+func (exr *Executor) SetTargetPort(port string) {
+       exr.mtx.Lock()
+       defer exr.mtx.Unlock()
+       exr.targetPort = port
+}
+
 // Target returns the current target.
 func (exr *Executor) Target() cloud.ExecutorTarget {
        exr.mtx.RLock()
@@ -76,12 +88,18 @@ func (exr *Executor) Target() cloud.ExecutorTarget {
 
 // Execute runs cmd on the target. If an existing connection is not
 // usable, it sets up a new connection to the current target.
-func (exr *Executor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+func (exr *Executor) Execute(env map[string]string, cmd string, stdin io.Reader) ([]byte, []byte, error) {
        session, err := exr.newSession()
        if err != nil {
                return nil, nil, err
        }
        defer session.Close()
+       for k, v := range env {
+               err = session.Setenv(k, v)
+               if err != nil {
+                       return nil, nil, err
+               }
+       }
        var stdout, stderr bytes.Buffer
        session.Stdin = stdin
        session.Stdout = &stdout
@@ -161,6 +179,14 @@ func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
        if addr == "" {
                return nil, errors.New("instance has no address")
        }
+       if h, p, err := net.SplitHostPort(addr); err != nil || p == "" {
+               // Target address does not specify a port.  Use
+               // targetPort, or "ssh".
+               if p = exr.targetPort; p == "" {
+                       p = "ssh"
+               }
+               addr = net.JoinHostPort(h, p)
+       }
        var receivedKey ssh.PublicKey
        client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
                User: "root",
index 8dabfecad86451d6d3b178b50becffff46fa179e..f8565b4a710705c1367a54f9a954a753ff1d2715 100644 (file)
@@ -6,8 +6,10 @@ package ssh_executor
 
 import (
        "bytes"
+       "fmt"
        "io"
        "io/ioutil"
+       "net"
        "sync"
        "testing"
        "time"
@@ -32,17 +34,71 @@ func (*testTarget) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
        return nil
 }
 
+// Address returns the wrapped SSHService's host, with the port
+// stripped. This ensures the executor won't work until
+// SetTargetPort() is called -- see (*testTarget)Port().
+func (tt *testTarget) Address() string {
+       h, _, err := net.SplitHostPort(tt.SSHService.Address())
+       if err != nil {
+               panic(err)
+       }
+       return h
+}
+
+func (tt *testTarget) Port() string {
+       _, p, err := net.SplitHostPort(tt.SSHService.Address())
+       if err != nil {
+               panic(err)
+       }
+       return p
+}
+
+type mitmTarget struct {
+       test.SSHService
+}
+
+func (*mitmTarget) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
+       return fmt.Errorf("host key failed verification: %#v", key)
+}
+
 type ExecutorSuite struct{}
 
+func (s *ExecutorSuite) TestBadHostKey(c *check.C) {
+       _, hostpriv := test.LoadTestKey(c, "../test/sshkey_vm")
+       clientpub, clientpriv := test.LoadTestKey(c, "../test/sshkey_dispatch")
+       target := &mitmTarget{
+               SSHService: test.SSHService{
+                       Exec: func(map[string]string, string, io.Reader, io.Writer, io.Writer) uint32 {
+                               c.Error("Target Exec func called even though host key verification failed")
+                               return 0
+                       },
+                       HostKey:        hostpriv,
+                       AuthorizedKeys: []ssh.PublicKey{clientpub},
+               },
+       }
+
+       err := target.Start()
+       c.Check(err, check.IsNil)
+       c.Logf("target address %q", target.Address())
+       defer target.Close()
+
+       exr := New(target)
+       exr.SetSigners(clientpriv)
+
+       _, _, err = exr.Execute(nil, "true", nil)
+       c.Check(err, check.ErrorMatches, "host key failed verification: .*")
+}
+
 func (s *ExecutorSuite) TestExecute(c *check.C) {
        command := `foo 'bar' "baz"`
        stdinData := "foobar\nbaz\n"
        _, hostpriv := test.LoadTestKey(c, "../test/sshkey_vm")
        clientpub, clientpriv := test.LoadTestKey(c, "../test/sshkey_dispatch")
        for _, exitcode := range []int{0, 1, 2} {
-               srv := &testTarget{
+               target := &testTarget{
                        SSHService: test.SSHService{
-                               Exec: func(cmd string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+                               Exec: func(env map[string]string, cmd string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+                                       c.Check(env["TESTVAR"], check.Equals, "test value")
                                        c.Check(cmd, check.Equals, command)
                                        var wg sync.WaitGroup
                                        wg.Add(2)
@@ -68,17 +124,33 @@ func (s *ExecutorSuite) TestExecute(c *check.C) {
                                AuthorizedKeys: []ssh.PublicKey{clientpub},
                        },
                }
-               err := srv.Start()
+               err := target.Start()
                c.Check(err, check.IsNil)
-               c.Logf("srv address %q", srv.Address())
-               defer srv.Close()
+               c.Logf("target address %q", target.Address())
+               defer target.Close()
 
-               exr := New(srv)
+               exr := New(target)
                exr.SetSigners(clientpriv)
 
+               // Use the default target port (ssh). Execute will
+               // return a connection error or an authentication
+               // error, depending on whether the test host is
+               // running an SSH server.
+               _, _, err = exr.Execute(nil, command, nil)
+               c.Check(err, check.ErrorMatches, `.*(unable to authenticate|connection refused).*`)
+
+               // Use a bogus target port. Execute will return a
+               // connection error.
+               exr.SetTargetPort("0")
+               _, _, err = exr.Execute(nil, command, nil)
+               c.Check(err, check.ErrorMatches, `.*connection refused.*`)
+
+               // Use the test server's listening port.
+               exr.SetTargetPort(target.Port())
+
                done := make(chan bool)
                go func() {
-                       stdout, stderr, err := exr.Execute(command, bytes.NewBufferString(stdinData))
+                       stdout, stderr, err := exr.Execute(map[string]string{"TESTVAR": "test value"}, command, bytes.NewBufferString(stdinData))
                        if exitcode == 0 {
                                c.Check(err, check.IsNil)
                        } else {
diff --git a/lib/dispatchcloud/test/lame_instance_set.go b/lib/dispatchcloud/test/lame_instance_set.go
deleted file mode 100644 (file)
index baab407..0000000
+++ /dev/null
@@ -1,118 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package test
-
-import (
-       "fmt"
-       "math/rand"
-       "sync"
-
-       "git.curoverse.com/arvados.git/lib/cloud"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "golang.org/x/crypto/ssh"
-)
-
-// LameInstanceSet creates instances that boot but can't run
-// containers.
-type LameInstanceSet struct {
-       Hold chan bool // set to make(chan bool) to hold operations until Release is called
-
-       mtx       sync.Mutex
-       instances map[*lameInstance]bool
-}
-
-// Create returns a new instance.
-func (p *LameInstanceSet) Create(instType arvados.InstanceType, imageID cloud.ImageID, tags cloud.InstanceTags, pubkey ssh.PublicKey) (cloud.Instance, error) {
-       inst := &lameInstance{
-               p:            p,
-               id:           cloud.InstanceID(fmt.Sprintf("lame-%x", rand.Uint64())),
-               providerType: instType.ProviderType,
-       }
-       inst.SetTags(tags)
-       if p.Hold != nil {
-               p.Hold <- true
-       }
-       p.mtx.Lock()
-       defer p.mtx.Unlock()
-       if p.instances == nil {
-               p.instances = map[*lameInstance]bool{}
-       }
-       p.instances[inst] = true
-       return inst, nil
-}
-
-// Instances returns the instances that haven't been destroyed.
-func (p *LameInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
-       p.mtx.Lock()
-       defer p.mtx.Unlock()
-       var instances []cloud.Instance
-       for i := range p.instances {
-               instances = append(instances, i)
-       }
-       return instances, nil
-}
-
-// Stop is a no-op, but exists to satisfy cloud.InstanceSet.
-func (p *LameInstanceSet) Stop() {
-}
-
-// Release n held calls. Blocks if n calls aren't already
-// waiting. Blocks forever if Hold is nil.
-func (p *LameInstanceSet) Release(n int) {
-       for i := 0; i < n; i++ {
-               <-p.Hold
-       }
-}
-
-type lameInstance struct {
-       p            *LameInstanceSet
-       id           cloud.InstanceID
-       providerType string
-       tags         cloud.InstanceTags
-}
-
-func (inst *lameInstance) ID() cloud.InstanceID {
-       return inst.id
-}
-
-func (inst *lameInstance) String() string {
-       return fmt.Sprint(inst.id)
-}
-
-func (inst *lameInstance) ProviderType() string {
-       return inst.providerType
-}
-
-func (inst *lameInstance) Address() string {
-       return "0.0.0.0:1234"
-}
-
-func (inst *lameInstance) SetTags(tags cloud.InstanceTags) error {
-       inst.p.mtx.Lock()
-       defer inst.p.mtx.Unlock()
-       inst.tags = cloud.InstanceTags{}
-       for k, v := range tags {
-               inst.tags[k] = v
-       }
-       return nil
-}
-
-func (inst *lameInstance) Destroy() error {
-       if inst.p.Hold != nil {
-               inst.p.Hold <- true
-       }
-       inst.p.mtx.Lock()
-       defer inst.p.mtx.Unlock()
-       delete(inst.p.instances, inst)
-       return nil
-}
-
-func (inst *lameInstance) Tags() cloud.InstanceTags {
-       return inst.tags
-}
-
-func (inst *lameInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
-       return nil
-}
diff --git a/lib/dispatchcloud/test/logger.go b/lib/dispatchcloud/test/logger.go
new file mode 100644 (file)
index 0000000..a59eeb6
--- /dev/null
@@ -0,0 +1,19 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package test
+
+import (
+       "os"
+
+       "github.com/sirupsen/logrus"
+)
+
+func Logger() logrus.FieldLogger {
+       logger := logrus.StandardLogger()
+       if os.Getenv("ARVADOS_DEBUG") != "" {
+               logger.SetLevel(logrus.DebugLevel)
+       }
+       return logger
+}
index b1e4e03b12ea142e925b45fe689217a499e59bb9..ed5995f4c5f0faa84356c01f2777d1e0a366cbc1 100644 (file)
@@ -32,7 +32,7 @@ func LoadTestKey(c *check.C, fnm string) (ssh.PublicKey, ssh.Signer) {
 
 // An SSHExecFunc handles an "exec" session on a multiplexed SSH
 // connection.
-type SSHExecFunc func(command string, stdin io.Reader, stdout, stderr io.Writer) uint32
+type SSHExecFunc func(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32
 
 // An SSHService accepts SSH connections on an available TCP port and
 // passes clients' "exec" sessions to the provided SSHExecFunc.
@@ -146,22 +146,37 @@ func (ss *SSHService) serveConn(nConn net.Conn, config *ssh.ServerConfig) {
                        log.Printf("accept channel: %s", err)
                        return
                }
-               var execReq struct {
-                       Command string
-               }
+               didExec := false
+               sessionEnv := map[string]string{}
                go func() {
                        for req := range reqs {
-                               if req.Type == "exec" && execReq.Command == "" {
+                               switch {
+                               case didExec:
+                                       // Reject anything after exec
+                                       req.Reply(false, nil)
+                               case req.Type == "exec":
+                                       var execReq struct {
+                                               Command string
+                                       }
                                        req.Reply(true, nil)
                                        ssh.Unmarshal(req.Payload, &execReq)
                                        go func() {
                                                var resp struct {
                                                        Status uint32
                                                }
-                                               resp.Status = ss.Exec(execReq.Command, ch, ch, ch.Stderr())
+                                               resp.Status = ss.Exec(sessionEnv, execReq.Command, ch, ch, ch.Stderr())
                                                ch.SendRequest("exit-status", false, ssh.Marshal(&resp))
                                                ch.Close()
                                        }()
+                                       didExec = true
+                               case req.Type == "env":
+                                       var envReq struct {
+                                               Name  string
+                                               Value string
+                                       }
+                                       req.Reply(true, nil)
+                                       ssh.Unmarshal(req.Payload, &envReq)
+                                       sessionEnv[envReq.Name] = envReq.Value
                                }
                        }
                }()
index f738e206641edebffb8337bc7361511375ed8683..a2231673fcd08c2230259b915adc41a8f9a3ed58 100644 (file)
@@ -41,13 +41,24 @@ type StubDriver struct {
        // Destroy. 0=always succeed, 1=always fail.
        ErrorRateDestroy float64
 
+       // If Create() or Instances() is called too frequently, return
+       // rate-limiting errors.
+       MinTimeBetweenCreateCalls    time.Duration
+       MinTimeBetweenInstancesCalls time.Duration
+
+       // If true, Create and Destroy calls block until Release() is
+       // called.
+       HoldCloudOps bool
+
        instanceSets []*StubInstanceSet
+       holdCloudOps chan bool
 }
 
 // InstanceSet returns a new *StubInstanceSet.
-func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID,
-       logger logrus.FieldLogger) (cloud.InstanceSet, error) {
-
+func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
+       if sd.holdCloudOps == nil {
+               sd.holdCloudOps = make(chan bool)
+       }
        sis := StubInstanceSet{
                driver:  sd,
                servers: map[cloud.InstanceID]*StubVM{},
@@ -63,19 +74,40 @@ func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
        return sd.instanceSets
 }
 
+// ReleaseCloudOps releases n pending Create/Destroy calls. If there
+// are fewer than n blocked calls pending, it waits for the rest to
+// arrive.
+func (sd *StubDriver) ReleaseCloudOps(n int) {
+       for i := 0; i < n; i++ {
+               <-sd.holdCloudOps
+       }
+}
+
 type StubInstanceSet struct {
        driver  *StubDriver
        servers map[cloud.InstanceID]*StubVM
        mtx     sync.RWMutex
        stopped bool
+
+       allowCreateCall    time.Time
+       allowInstancesCall time.Time
 }
 
 func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
+       if sis.driver.HoldCloudOps {
+               sis.driver.holdCloudOps <- true
+       }
        sis.mtx.Lock()
        defer sis.mtx.Unlock()
        if sis.stopped {
                return nil, errors.New("StubInstanceSet: Create called after Stop")
        }
+       if sis.allowCreateCall.After(time.Now()) {
+               return nil, RateLimitError{sis.allowCreateCall}
+       } else {
+               sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
+       }
+
        ak := sis.driver.AuthorizedKeys
        if authKey != nil {
                ak = append([]ssh.PublicKey{authKey}, ak...)
@@ -101,6 +133,11 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
 func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
        sis.mtx.RLock()
        defer sis.mtx.RUnlock()
+       if sis.allowInstancesCall.After(time.Now()) {
+               return nil, RateLimitError{sis.allowInstancesCall}
+       } else {
+               sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
+       }
        var r []cloud.Instance
        for _, ss := range sis.servers {
                r = append(r, ss.Instance())
@@ -117,6 +154,11 @@ func (sis *StubInstanceSet) Stop() {
        sis.stopped = true
 }
 
+type RateLimitError struct{ Retry time.Time }
+
+func (e RateLimitError) Error() string            { return fmt.Sprintf("rate limited until %s", e.Retry) }
+func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
+
 // StubVM is a fake server that runs an SSH service. It represents a
 // VM running in a fake cloud.
 //
@@ -157,7 +199,7 @@ func (svm *StubVM) Instance() stubInstance {
        }
 }
 
-func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
        queue := svm.sis.driver.Queue
        uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
        if eta := svm.Boot.Sub(time.Now()); eta > 0 {
@@ -173,6 +215,12 @@ func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Write
                return 1
        }
        if strings.HasPrefix(command, "crunch-run --detach ") {
+               for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
+                       if env[name] == "" {
+                               fmt.Fprintf(stderr, "%s missing from environment %q\n", name, env)
+                               return 1
+                       }
+               }
                svm.Lock()
                if svm.running == nil {
                        svm.running = map[string]bool{}
@@ -181,7 +229,10 @@ func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Write
                svm.Unlock()
                time.Sleep(svm.CrunchRunDetachDelay)
                fmt.Fprintf(stderr, "starting %s\n", uuid)
-               logger := logrus.WithField("ContainerUUID", uuid)
+               logger := logrus.WithFields(logrus.Fields{
+                       "Instance":      svm.id,
+                       "ContainerUUID": uuid,
+               })
                logger.Printf("[test] starting crunch-run stub")
                go func() {
                        crashluck := math_rand.Float64()
@@ -264,11 +315,14 @@ func (si stubInstance) Address() string {
 }
 
 func (si stubInstance) Destroy() error {
+       sis := si.svm.sis
+       if sis.driver.HoldCloudOps {
+               sis.driver.holdCloudOps <- true
+       }
        if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
                return errors.New("instance could not be destroyed")
        }
        si.svm.SSHService.Close()
-       sis := si.svm.sis
        sis.mtx.Lock()
        defer sis.mtx.Unlock()
        delete(sis.servers, si.svm.id)
index fc3301d8636593cfd79aee008061c9cfafab577a..e6b50629892e62c250ebcf05ba5bb226daec1ce1 100644 (file)
@@ -5,6 +5,7 @@
 package worker
 
 import (
+       "errors"
        "io"
        "sort"
        "strings"
@@ -19,24 +20,25 @@ import (
 
 const (
        tagKeyInstanceType = "InstanceType"
-       tagKeyHold         = "Hold"
+       tagKeyIdleBehavior = "IdleBehavior"
 )
 
 // An InstanceView shows a worker's current state and recent activity.
 type InstanceView struct {
-       Instance             string
-       Price                float64
-       ArvadosInstanceType  string
-       ProviderInstanceType string
-       LastContainerUUID    string
-       LastBusy             time.Time
-       WorkerState          string
+       Instance             cloud.InstanceID `json:"instance"`
+       Price                float64          `json:"price"`
+       ArvadosInstanceType  string           `json:"arvados_instance_type"`
+       ProviderInstanceType string           `json:"provider_instance_type"`
+       LastContainerUUID    string           `json:"last_container_uuid"`
+       LastBusy             time.Time        `json:"last_busy"`
+       WorkerState          string           `json:"worker_state"`
+       IdleBehavior         IdleBehavior     `json:"idle_behavior"`
 }
 
 // An Executor executes shell commands on a remote host.
 type Executor interface {
        // Run cmd on the current target.
-       Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
+       Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
 
        // Use the given target for subsequent operations. The new
        // target is the same host as the previous target, but it
@@ -61,6 +63,13 @@ const (
        defaultTimeoutBooting     = time.Minute * 10
        defaultTimeoutProbe       = time.Minute * 10
        defaultTimeoutShutdown    = time.Second * 10
+
+       // Time after a quota error to try again anyway, even if no
+       // instances have been shutdown.
+       quotaErrorTTL = time.Minute
+
+       // Time between "X failed because rate limiting" messages
+       logRateLimitErrorInterval = time.Second * 10
 )
 
 func duration(conf arvados.Duration, def time.Duration) time.Duration {
@@ -75,10 +84,11 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration {
 //
 // New instances are configured and set up according to the given
 // cluster configuration.
-func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
        wp := &Pool{
                logger:             logger,
-               instanceSet:        instanceSet,
+               arvClient:          arvClient,
+               instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
                newExecutor:        newExecutor,
                bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
                imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
@@ -107,7 +117,8 @@ func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cl
 type Pool struct {
        // configuration
        logger             logrus.FieldLogger
-       instanceSet        cloud.InstanceSet
+       arvClient          *arvados.Client
+       instanceSet        *throttledInstanceSet
        newExecutor        func(cloud.Instance) Executor
        bootProbeCommand   string
        imageID            cloud.ImageID
@@ -132,7 +143,11 @@ type Pool struct {
        mtx          sync.RWMutex
        setupOnce    sync.Once
 
+       throttleCreate    throttle
+       throttleInstances throttle
+
        mInstances         prometheus.Gauge
+       mInstancesPrice    prometheus.Gauge
        mContainersRunning prometheus.Gauge
        mVCPUs             prometheus.Gauge
        mVCPUsInuse        prometheus.Gauge
@@ -140,15 +155,21 @@ type Pool struct {
        mMemoryInuse       prometheus.Gauge
 }
 
-// Subscribe returns a channel that becomes ready whenever a worker's
-// state changes.
+// Subscribe returns a buffered channel that becomes ready after any
+// change to the pool's state that could have scheduling implications:
+// a worker's state changes, a new worker appears, the cloud
+// provider's API rate limiting period ends, etc.
+//
+// Additional events that occur while the channel is already ready
+// will be dropped, so it is OK if the caller services the channel
+// slowly.
 //
 // Example:
 //
 //     ch := wp.Subscribe()
 //     defer wp.Unsubscribe(ch)
 //     for range ch {
-//             // ...try scheduling some work...
+//             tryScheduling(wp)
 //             if done {
 //                     break
 //             }
@@ -171,7 +192,8 @@ func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
 }
 
 // Unallocated returns the number of unallocated (creating + booting +
-// idle + unknown) workers for each instance type.
+// idle + unknown) workers for each instance type.  Workers in
+// hold/drain mode are not included.
 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
        wp.setupOnce.Do(wp.setup)
        wp.mtx.RLock()
@@ -182,7 +204,14 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
                creating[it] = len(times)
        }
        for _, wkr := range wp.workers {
-               if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) {
+               // Skip workers that are not expected to become
+               // available soon. Note len(wkr.running)>0 is not
+               // redundant here: it can be true even in
+               // StateUnknown.
+               if wkr.state == StateShutdown ||
+                       wkr.state == StateRunning ||
+                       wkr.idleBehavior != IdleBehaviorRun ||
+                       len(wkr.running) > 0 {
                        continue
                }
                it := wkr.instType
@@ -212,15 +241,23 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
 // Create a new instance with the given type, and add it to the worker
 // pool. The worker is added immediately; instance creation runs in
 // the background.
-func (wp *Pool) Create(it arvados.InstanceType) error {
+//
+// Create returns false if a pre-existing error state prevents it from
+// even attempting to create a new instance. Those errors are logged
+// by the Pool, so the caller does not need to log anything in such
+// cases.
+func (wp *Pool) Create(it arvados.InstanceType) bool {
        logger := wp.logger.WithField("InstanceType", it.Name)
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
-       if time.Now().Before(wp.atQuotaUntil) {
-               return wp.atQuotaErr
+       if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
+               return false
+       }
+       tags := cloud.InstanceTags{
+               tagKeyInstanceType: it.Name,
+               tagKeyIdleBehavior: string(IdleBehaviorRun),
        }
-       tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
        now := time.Now()
        wp.creating[it] = append(wp.creating[it], now)
        go func() {
@@ -236,17 +273,19 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
                                break
                        }
                }
-               if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
-                       wp.atQuotaErr = err
-                       wp.atQuotaUntil = time.Now().Add(time.Minute)
-               }
                if err != nil {
+                       if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
+                               wp.atQuotaErr = err
+                               wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+                               time.AfterFunc(quotaErrorTTL, wp.notify)
+                       }
                        logger.WithError(err).Error("create failed")
+                       wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
                        return
                }
                wp.updateWorker(inst, it, StateBooting)
        }()
-       return nil
+       return true
 }
 
 // AtQuota returns true if Create is not expected to work at the
@@ -257,6 +296,21 @@ func (wp *Pool) AtQuota() bool {
        return time.Now().Before(wp.atQuotaUntil)
 }
 
+// SetIdleBehavior determines how the indicated instance will behave
+// when it has no containers running.
+func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       wkr, ok := wp.workers[id]
+       if !ok {
+               return errors.New("requested instance does not exist")
+       }
+       wkr.idleBehavior = idleBehavior
+       wkr.saveTags()
+       wkr.shutdownIfIdle()
+       return nil
+}
+
 // Add or update worker attached to the given instance. Use
 // initialState if a new worker is created.
 //
@@ -272,32 +326,46 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
                if initialState == StateBooting && wkr.state == StateUnknown {
                        wkr.state = StateBooting
                }
+               wkr.saveTags()
                return wkr, false
        }
-       if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
-               initialState = StateHold
+
+       // If an instance has a valid IdleBehavior tag when it first
+       // appears, initialize the new worker accordingly (this is how
+       // we restore IdleBehavior that was set by a prior dispatch
+       // process); otherwise, default to "run". After this,
+       // wkr.idleBehavior is the source of truth, and will only be
+       // changed via SetIdleBehavior().
+       idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
+       if !validIdleBehavior[idleBehavior] {
+               idleBehavior = IdleBehaviorRun
        }
+
        logger := wp.logger.WithFields(logrus.Fields{
                "InstanceType": it.Name,
                "Instance":     inst,
        })
-       logger.WithField("State", initialState).Infof("instance appeared in cloud")
+       logger.WithFields(logrus.Fields{
+               "State":        initialState,
+               "IdleBehavior": idleBehavior,
+       }).Infof("instance appeared in cloud")
        now := time.Now()
        wkr := &worker{
-               mtx:      &wp.mtx,
-               wp:       wp,
-               logger:   logger,
-               executor: wp.newExecutor(inst),
-               state:    initialState,
-               instance: inst,
-               instType: it,
-               appeared: now,
-               probed:   now,
-               busy:     now,
-               updated:  now,
-               running:  make(map[string]struct{}),
-               starting: make(map[string]struct{}),
-               probing:  make(chan struct{}, 1),
+               mtx:          &wp.mtx,
+               wp:           wp,
+               logger:       logger,
+               executor:     wp.newExecutor(inst),
+               state:        initialState,
+               idleBehavior: idleBehavior,
+               instance:     inst,
+               instType:     it,
+               appeared:     now,
+               probed:       now,
+               busy:         now,
+               updated:      now,
+               running:      make(map[string]struct{}),
+               starting:     make(map[string]struct{}),
+               probing:      make(chan struct{}, 1),
        }
        wp.workers[id] = wkr
        return wkr, true
@@ -320,7 +388,7 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
                // TODO: shutdown the worker with the longest idle
                // time (Idle) or the earliest create time (Booting)
                for _, wkr := range wp.workers {
-                       if wkr.state == tryState && wkr.instType == it {
+                       if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
                                logger.WithField("Instance", wkr.instance).Info("shutting down")
                                wkr.shutdown()
                                return true
@@ -343,6 +411,12 @@ func (wp *Pool) CountWorkers() map[State]int {
 }
 
 // Running returns the container UUIDs being prepared/run on workers.
+//
+// In the returned map, the time value indicates when the Pool
+// observed that the container process had exited. A container that
+// has not yet exited has a zero time value. The caller should use
+// KillContainer() to garbage-collect the entries for exited
+// containers.
 func (wp *Pool) Running() map[string]time.Time {
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
@@ -411,7 +485,7 @@ func (wp *Pool) kill(wkr *worker, uuid string) {
                "Instance":      wkr.instance,
        })
        logger.Debug("killing process")
-       stdout, stderr, err := wkr.executor.Execute("crunch-run --kill 15 "+uuid, nil)
+       stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
        if err != nil {
                logger.WithFields(logrus.Fields{
                        "stderr": string(stderr),
@@ -444,6 +518,13 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
                Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
        })
        reg.MustRegister(wp.mInstances)
+       wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "instances_price_total",
+               Help:      "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
+       })
+       reg.MustRegister(wp.mInstancesPrice)
        wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
@@ -494,8 +575,10 @@ func (wp *Pool) updateMetrics() {
        wp.mtx.RLock()
        defer wp.mtx.RUnlock()
 
+       var price float64
        var alloc, cpu, cpuInuse, mem, memInuse int64
        for _, wkr := range wp.workers {
+               price += wkr.instType.Price
                cpu += int64(wkr.instType.VCPUs)
                mem += int64(wkr.instType.RAM)
                if len(wkr.running)+len(wkr.starting) == 0 {
@@ -506,6 +589,7 @@ func (wp *Pool) updateMetrics() {
                memInuse += int64(wkr.instType.RAM)
        }
        wp.mInstances.Set(float64(len(wp.workers)))
+       wp.mInstancesPrice.Set(price)
        wp.mContainersRunning.Set(float64(alloc))
        wp.mVCPUs.Set(float64(cpu))
        wp.mMemory.Set(float64(mem))
@@ -588,18 +672,19 @@ func (wp *Pool) Instances() []InstanceView {
        wp.mtx.Lock()
        for _, w := range wp.workers {
                r = append(r, InstanceView{
-                       Instance:             w.instance.String(),
+                       Instance:             w.instance.ID(),
                        Price:                w.instType.Price,
                        ArvadosInstanceType:  w.instType.Name,
                        ProviderInstanceType: w.instType.ProviderType,
                        LastContainerUUID:    w.lastUUID,
                        LastBusy:             w.busy,
                        WorkerState:          w.state.String(),
+                       IdleBehavior:         w.idleBehavior,
                })
        }
        wp.mtx.Unlock()
        sort.Slice(r, func(i, j int) bool {
-               return strings.Compare(r[i].Instance, r[j].Instance) < 0
+               return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
        })
        return r
 }
@@ -624,10 +709,14 @@ func (wp *Pool) notify() {
 
 func (wp *Pool) getInstancesAndSync() error {
        wp.setupOnce.Do(wp.setup)
+       if err := wp.instanceSet.throttleInstances.Error(); err != nil {
+               return err
+       }
        wp.logger.Debug("getting instance list")
        threshold := time.Now()
        instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
        if err != nil {
+               wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
                return err
        }
        wp.sync(threshold, instances)
index 7551caff9547178338e5ba7c900dadfde2c88184..3b66eeb417e7fb310768d950b50b7ad6c7066f0c 100644 (file)
@@ -5,13 +5,14 @@
 package worker
 
 import (
-       "io"
+       "sort"
+       "strings"
        "time"
 
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/sirupsen/logrus"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -31,29 +32,114 @@ var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtai
 
 type PoolSuite struct{}
 
-func (suite *PoolSuite) SetUpSuite(c *check.C) {
-       logrus.StandardLogger().SetLevel(logrus.DebugLevel)
-}
+func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
+       type1 := test.InstanceType(1)
+       type2 := test.InstanceType(2)
+       type3 := test.InstanceType(3)
+       waitForIdle := func(pool *Pool, notify <-chan struct{}) {
+               timeout := time.NewTimer(time.Second)
+               for {
+                       instances := pool.Instances()
+                       sort.Slice(instances, func(i, j int) bool {
+                               return strings.Compare(instances[i].ArvadosInstanceType, instances[j].ArvadosInstanceType) < 0
+                       })
+                       if len(instances) == 3 &&
+                               instances[0].ArvadosInstanceType == type1.Name &&
+                               instances[0].WorkerState == StateIdle.String() &&
+                               instances[1].ArvadosInstanceType == type1.Name &&
+                               instances[1].WorkerState == StateIdle.String() &&
+                               instances[2].ArvadosInstanceType == type2.Name &&
+                               instances[2].WorkerState == StateIdle.String() {
+                               return
+                       }
+                       select {
+                       case <-timeout.C:
+                               c.Logf("pool.Instances() == %#v", instances)
+                               c.Error("timed out")
+                               return
+                       case <-notify:
+                       }
+               }
+       }
 
-func (suite *PoolSuite) TestStartContainer(c *check.C) {
-       // TODO: use an instanceSet stub with an SSH server
-}
+       logger := test.Logger()
+       driver := &test.StubDriver{}
+       is, err := driver.InstanceSet(nil, "", logger)
+       c.Assert(err, check.IsNil)
+
+       newExecutor := func(cloud.Instance) Executor {
+               return stubExecutor{
+                       "crunch-run --list": stubResp{},
+                       "true":              stubResp{},
+               }
+       }
+
+       cluster := &arvados.Cluster{
+               Dispatch: arvados.Dispatch{
+                       MaxProbesPerSecond: 1000,
+                       ProbeInterval:      arvados.Duration(time.Millisecond * 10),
+               },
+               CloudVMs: arvados.CloudVMs{
+                       BootProbeCommand: "true",
+                       SyncInterval:     arvados.Duration(time.Millisecond * 10),
+               },
+               InstanceTypes: arvados.InstanceTypeMap{
+                       type1.Name: type1,
+                       type2.Name: type2,
+                       type3.Name: type3,
+               },
+       }
 
-func (suite *PoolSuite) TestVerifyHostKey(c *check.C) {
-       // TODO: use an instanceSet stub with an SSH server
+       pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
+       notify := pool.Subscribe()
+       defer pool.Unsubscribe(notify)
+       pool.Create(type1)
+       pool.Create(type1)
+       pool.Create(type2)
+       waitForIdle(pool, notify)
+       var heldInstanceID cloud.InstanceID
+       for _, inst := range pool.Instances() {
+               if inst.ArvadosInstanceType == type2.Name {
+                       heldInstanceID = cloud.InstanceID(inst.Instance)
+                       pool.SetIdleBehavior(heldInstanceID, IdleBehaviorHold)
+               }
+       }
+       pool.Stop()
+
+       c.Log("------- starting new pool, waiting to recover state")
+
+       pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
+       notify2 := pool2.Subscribe()
+       defer pool2.Unsubscribe(notify2)
+       waitForIdle(pool2, notify2)
+       for _, inst := range pool2.Instances() {
+               if inst.ArvadosInstanceType == type2.Name {
+                       c.Check(inst.Instance, check.Equals, heldInstanceID)
+                       c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorHold)
+               } else {
+                       c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorRun)
+               }
+       }
+       pool2.Stop()
 }
 
 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
-       lameInstanceSet := &test.LameInstanceSet{Hold: make(chan bool)}
+       logger := test.Logger()
+       driver := test.StubDriver{HoldCloudOps: true}
+       instanceSet, err := driver.InstanceSet(nil, "", logger)
+       c.Assert(err, check.IsNil)
+
        type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
        type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
+       type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
        pool := &Pool{
-               logger:      logrus.StandardLogger(),
-               newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
-               instanceSet: lameInstanceSet,
+               logger:      logger,
+               newExecutor: func(cloud.Instance) Executor { return stubExecutor{} },
+               instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
                instanceTypes: arvados.InstanceTypeMap{
                        type1.Name: type1,
                        type2.Name: type2,
+                       type3.Name: type3,
                },
        }
        notify := pool.Subscribe()
@@ -63,23 +149,42 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
 
        c.Check(pool.Unallocated()[type1], check.Equals, 0)
        c.Check(pool.Unallocated()[type2], check.Equals, 0)
+       c.Check(pool.Unallocated()[type3], check.Equals, 0)
        pool.Create(type2)
        pool.Create(type1)
        pool.Create(type2)
+       pool.Create(type3)
        c.Check(pool.Unallocated()[type1], check.Equals, 1)
        c.Check(pool.Unallocated()[type2], check.Equals, 2)
+       c.Check(pool.Unallocated()[type3], check.Equals, 1)
 
        // Unblock the pending Create calls.
-       go lameInstanceSet.Release(3)
+       go driver.ReleaseCloudOps(4)
 
        // Wait for each instance to either return from its Create
        // call, or show up in a poll.
        suite.wait(c, pool, notify, func() bool {
                pool.mtx.RLock()
                defer pool.mtx.RUnlock()
-               return len(pool.workers) == 3
+               return len(pool.workers) == 4
        })
 
+       // Place type3 node on admin-hold
+       ivs := suite.instancesByType(pool, type3)
+       c.Assert(ivs, check.HasLen, 1)
+       type3instanceID := ivs[0].Instance
+       err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorHold)
+       c.Check(err, check.IsNil)
+
+       // Check admin-hold behavior: refuse to shutdown, and don't
+       // report as Unallocated ("available now or soon").
+       c.Check(pool.Shutdown(type3), check.Equals, false)
+       suite.wait(c, pool, notify, func() bool {
+               return pool.Unallocated()[type3] == 0
+       })
+       c.Check(suite.instancesByType(pool, type3), check.HasLen, 1)
+
+       // Shutdown both type2 nodes
        c.Check(pool.Shutdown(type2), check.Equals, true)
        suite.wait(c, pool, notify, func() bool {
                return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
@@ -99,16 +204,58 @@ func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
                }
                break
        }
+
+       // Shutdown type1 node
        c.Check(pool.Shutdown(type1), check.Equals, true)
        suite.wait(c, pool, notify, func() bool {
-               return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0
+               return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 && pool.Unallocated()[type3] == 0
        })
        select {
        case <-notify2:
        case <-time.After(time.Second):
                c.Error("notify did not receive")
        }
-       go lameInstanceSet.Release(3) // unblock Destroy calls
+
+       // Put type3 node back in service.
+       err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorRun)
+       c.Check(err, check.IsNil)
+       suite.wait(c, pool, notify, func() bool {
+               return pool.Unallocated()[type3] == 1
+       })
+
+       // Check admin-drain behavior: shut down right away, and don't
+       // report as Unallocated.
+       err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorDrain)
+       c.Check(err, check.IsNil)
+       suite.wait(c, pool, notify, func() bool {
+               return pool.Unallocated()[type3] == 0
+       })
+       suite.wait(c, pool, notify, func() bool {
+               ivs := suite.instancesByType(pool, type3)
+               return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String()
+       })
+
+       // Unblock all pending Destroy calls. Pool calls Destroy again
+       // if a node still appears in the provider list after a
+       // previous attempt, so there might be more than 4 Destroy
+       // calls to unblock.
+       go driver.ReleaseCloudOps(4444)
+
+       // Sync until all instances disappear from the provider list.
+       suite.wait(c, pool, notify, func() bool {
+               pool.getInstancesAndSync()
+               return len(pool.Instances()) == 0
+       })
+}
+
+func (suite *PoolSuite) instancesByType(pool *Pool, it arvados.InstanceType) []InstanceView {
+       var ivs []InstanceView
+       for _, iv := range pool.Instances() {
+               if iv.ArvadosInstanceType == it.Name {
+                       ivs = append(ivs, iv)
+               }
+       }
+       return ivs
 }
 
 func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
@@ -123,13 +270,3 @@ func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, rea
        }
        c.Check(ready(), check.Equals, true)
 }
-
-type stubExecutor struct{}
-
-func (*stubExecutor) SetTarget(cloud.ExecutorTarget) {}
-
-func (*stubExecutor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
-       return nil, nil, nil
-}
-
-func (*stubExecutor) Close() {}
diff --git a/lib/dispatchcloud/worker/throttle.go b/lib/dispatchcloud/worker/throttle.go
new file mode 100644 (file)
index 0000000..c5ea793
--- /dev/null
@@ -0,0 +1,68 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+       "fmt"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "github.com/sirupsen/logrus"
+)
+
+type throttle struct {
+       err   error
+       until time.Time
+       mtx   sync.Mutex
+}
+
+// CheckRateLimitError checks whether the given error is a
+// cloud.RateLimitError, and if so, ensures Error() returns a non-nil
+// error until the rate limiting holdoff period expires.
+//
+// If a notify func is given, it will be called after the holdoff
+// period expires.
+func (thr *throttle) CheckRateLimitError(err error, logger logrus.FieldLogger, callType string, notify func()) {
+       rle, ok := err.(cloud.RateLimitError)
+       if !ok {
+               return
+       }
+       until := rle.EarliestRetry()
+       if !until.After(time.Now()) {
+               return
+       }
+       dur := until.Sub(time.Now())
+       logger.WithFields(logrus.Fields{
+               "CallType": callType,
+               "Duration": dur,
+               "ResumeAt": until,
+       }).Info("suspending remote calls due to rate-limit error")
+       thr.ErrorUntil(fmt.Errorf("remote calls are suspended for %s, until %s", dur, until), until, notify)
+}
+
+func (thr *throttle) ErrorUntil(err error, until time.Time, notify func()) {
+       thr.mtx.Lock()
+       defer thr.mtx.Unlock()
+       thr.err, thr.until = err, until
+       if notify != nil {
+               time.AfterFunc(until.Sub(time.Now()), notify)
+       }
+}
+
+func (thr *throttle) Error() error {
+       thr.mtx.Lock()
+       defer thr.mtx.Unlock()
+       if thr.err != nil && time.Now().After(thr.until) {
+               thr.err = nil
+       }
+       return thr.err
+}
+
+type throttledInstanceSet struct {
+       cloud.InstanceSet
+       throttleCreate    throttle
+       throttleInstances throttle
+}
diff --git a/lib/dispatchcloud/worker/throttle_test.go b/lib/dispatchcloud/worker/throttle_test.go
new file mode 100644 (file)
index 0000000..045b617
--- /dev/null
@@ -0,0 +1,32 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+       "errors"
+       "time"
+
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&ThrottleSuite{})
+
+type ThrottleSuite struct{}
+
+func (s *ThrottleSuite) TestRateLimitError(c *check.C) {
+       var t throttle
+       c.Check(t.Error(), check.IsNil)
+       t.ErrorUntil(errors.New("wait"), time.Now().Add(time.Second), nil)
+       c.Check(t.Error(), check.NotNil)
+       t.ErrorUntil(nil, time.Now(), nil)
+       c.Check(t.Error(), check.IsNil)
+
+       notified := false
+       t.ErrorUntil(errors.New("wait"), time.Now().Add(time.Millisecond), func() { notified = true })
+       c.Check(t.Error(), check.NotNil)
+       time.Sleep(time.Millisecond * 10)
+       c.Check(t.Error(), check.IsNil)
+       c.Check(notified, check.Equals, true)
+}
index db6bc185b1af2895117046c6bbd9254f25b9ea0a..a24747267615b9b0d0d0c8851271e92bdb85087c 100644 (file)
@@ -6,6 +6,7 @@ package worker
 
 import (
        "bytes"
+       "fmt"
        "strings"
        "sync"
        "time"
@@ -15,6 +16,11 @@ import (
        "github.com/sirupsen/logrus"
 )
 
+const (
+       // TODO: configurable
+       maxPingFailTime = 10 * time.Minute
+)
+
 // State indicates whether a worker is available to do work, and (if
 // not) whether/when it is expected to become ready.
 type State int
@@ -25,12 +31,6 @@ const (
        StateIdle                  // instance booted, no containers are running
        StateRunning               // instance is running one or more containers
        StateShutdown              // worker has stopped monitoring the instance
-       StateHold                  // running, but not available to run new containers
-)
-
-const (
-       // TODO: configurable
-       maxPingFailTime = 10 * time.Minute
 )
 
 var stateString = map[State]string{
@@ -39,7 +39,6 @@ var stateString = map[State]string{
        StateIdle:     "idle",
        StateRunning:  "running",
        StateShutdown: "shutdown",
-       StateHold:     "hold",
 }
 
 // String implements fmt.Stringer.
@@ -53,26 +52,42 @@ func (s State) MarshalText() ([]byte, error) {
        return []byte(stateString[s]), nil
 }
 
+// IdleBehavior indicates the behavior desired when a node becomes idle.
+type IdleBehavior string
+
+const (
+       IdleBehaviorRun   IdleBehavior = "run"   // run containers, or shutdown on idle timeout
+       IdleBehaviorHold  IdleBehavior = "hold"  // don't shutdown or run more containers
+       IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
+)
+
+var validIdleBehavior = map[IdleBehavior]bool{
+       IdleBehaviorRun:   true,
+       IdleBehaviorHold:  true,
+       IdleBehaviorDrain: true,
+}
+
 type worker struct {
        logger   logrus.FieldLogger
        executor Executor
        wp       *Pool
 
-       mtx       sync.Locker // must be wp's Locker.
-       state     State
-       instance  cloud.Instance
-       instType  arvados.InstanceType
-       vcpus     int64
-       memory    int64
-       appeared  time.Time
-       probed    time.Time
-       updated   time.Time
-       busy      time.Time
-       destroyed time.Time
-       lastUUID  string
-       running   map[string]struct{} // remember to update state idle<->running when this changes
-       starting  map[string]struct{} // remember to update state idle<->running when this changes
-       probing   chan struct{}
+       mtx          sync.Locker // must be wp's Locker.
+       state        State
+       idleBehavior IdleBehavior
+       instance     cloud.Instance
+       instType     arvados.InstanceType
+       vcpus        int64
+       memory       int64
+       appeared     time.Time
+       probed       time.Time
+       updated      time.Time
+       busy         time.Time
+       destroyed    time.Time
+       lastUUID     string
+       running      map[string]struct{} // remember to update state idle<->running when this changes
+       starting     map[string]struct{} // remember to update state idle<->running when this changes
+       probing      chan struct{}
 }
 
 // caller must have lock.
@@ -86,7 +101,11 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
        wkr.starting[ctr.UUID] = struct{}{}
        wkr.state = StateRunning
        go func() {
-               stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
+               env := map[string]string{
+                       "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
+                       "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
+               }
+               stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
                wkr.mtx.Lock()
                defer wkr.mtx.Unlock()
                now := time.Now()
@@ -126,57 +145,94 @@ func (wkr *worker) ProbeAndUpdate() {
        }
 }
 
-// should be called in a new goroutine
+// probeAndUpdate calls probeBooted and/or probeRunning if needed, and
+// updates state accordingly.
+//
+// In StateUnknown: Call both probeBooted and probeRunning.
+// In StateBooting: Call probeBooted; if successful, call probeRunning.
+// In StateRunning: Call probeRunning.
+// In StateIdle: Call probeRunning.
+// In StateShutdown: Do nothing.
+//
+// If both probes succeed, wkr.state changes to
+// StateIdle/StateRunning.
+//
+// If probeRunning succeeds, wkr.running is updated. (This means
+// wkr.running might be non-empty even in StateUnknown, if the boot
+// probe failed.)
+//
+// probeAndUpdate should be called in a new goroutine.
 func (wkr *worker) probeAndUpdate() {
        wkr.mtx.Lock()
        updated := wkr.updated
-       needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
-       needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
+       initialState := wkr.state
        wkr.mtx.Unlock()
-       if !needProbeBooted && !needProbeRunning {
-               return
-       }
 
        var (
+               booted   bool
                ctrUUIDs []string
                ok       bool
-               stderr   []byte
+               stderr   []byte // from probeBooted
        )
-       if needProbeBooted {
-               ok, stderr = wkr.probeBooted()
-               wkr.mtx.Lock()
-               if ok || wkr.state == StateRunning || wkr.state == StateIdle {
-                       wkr.logger.Info("instance booted; will try probeRunning")
-                       needProbeRunning = true
+
+       switch initialState {
+       case StateShutdown:
+               return
+       case StateIdle, StateRunning:
+               booted = true
+       case StateUnknown, StateBooting:
+       default:
+               panic(fmt.Sprintf("unknown state %s", initialState))
+       }
+
+       probeStart := time.Now()
+       logger := wkr.logger.WithField("ProbeStart", probeStart)
+
+       if !booted {
+               booted, stderr = wkr.probeBooted()
+               if !booted {
+                       // Pretend this probe succeeded if another
+                       // concurrent attempt succeeded.
+                       wkr.mtx.Lock()
+                       booted = wkr.state == StateRunning || wkr.state == StateIdle
+                       wkr.mtx.Unlock()
+               }
+               if booted {
+                       logger.Info("instance booted; will try probeRunning")
                }
-               wkr.mtx.Unlock()
        }
-       if needProbeRunning {
-               ctrUUIDs, ok, stderr = wkr.probeRunning()
+       if booted || wkr.state == StateUnknown {
+               ctrUUIDs, ok = wkr.probeRunning()
        }
-       logger := wkr.logger.WithField("stderr", string(stderr))
        wkr.mtx.Lock()
        defer wkr.mtx.Unlock()
-       if !ok {
+       if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
                if wkr.state == StateShutdown && wkr.updated.After(updated) {
                        // Skip the logging noise if shutdown was
                        // initiated during probe.
                        return
                }
-               dur := time.Since(wkr.probed)
-               logger := logger.WithFields(logrus.Fields{
-                       "Duration": dur,
-                       "State":    wkr.state,
-               })
-               if wkr.state == StateBooting && !needProbeRunning {
-                       // If we know the instance has never passed a
-                       // boot probe, it's not noteworthy that it
-                       // hasn't passed this probe.
-                       logger.Debug("new instance not responding")
-               } else {
-                       logger.Info("instance not responding")
+               // Using the start time of the probe as the timeout
+               // threshold ensures we always initiate at least one
+               // probe attempt after the boot/probe timeout expires
+               // (otherwise, a slow probe failure could cause us to
+               // shutdown an instance even though it did in fact
+               // boot/recover before the timeout expired).
+               dur := probeStart.Sub(wkr.probed)
+               if wkr.shutdownIfBroken(dur) {
+                       // stderr from failed run-probes will have
+                       // been logged already, but boot-probe
+                       // failures are normal so they are logged only
+                       // at Debug level. This is our chance to log
+                       // some evidence about why the node never
+                       // booted, even in non-debug mode.
+                       if !booted {
+                               logger.WithFields(logrus.Fields{
+                                       "Duration": dur,
+                                       "stderr":   string(stderr),
+                               }).Info("boot failed")
+                       }
                }
-               wkr.shutdownIfBroken(dur)
                return
        }
 
@@ -201,11 +257,21 @@ func (wkr *worker) probeAndUpdate() {
                // advantage of the non-busy state, though.
                wkr.busy = updateTime
        }
-       running := map[string]struct{}{}
        changed := false
+
+       // Build a new "running" map. Set changed=true if it differs
+       // from the existing map (wkr.running) to ensure the scheduler
+       // gets notified below.
+       running := map[string]struct{}{}
        for _, uuid := range ctrUUIDs {
                running[uuid] = struct{}{}
                if _, ok := wkr.running[uuid]; !ok {
+                       if _, ok := wkr.starting[uuid]; !ok {
+                               // We didn't start it -- it must have
+                               // been started by a previous
+                               // dispatcher process.
+                               logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
+                       }
                        changed = true
                }
        }
@@ -216,38 +282,63 @@ func (wkr *worker) probeAndUpdate() {
                        changed = true
                }
        }
-       if wkr.state == StateUnknown || wkr.state == StateBooting {
+
+       // Update state if this was the first successful boot-probe.
+       if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
+               // Note: this will change again below if
+               // len(wkr.starting)+len(wkr.running) > 0.
                wkr.state = StateIdle
                changed = true
        }
-       if changed {
-               wkr.running = running
-               if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
-                       wkr.state = StateRunning
-               } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
-                       wkr.state = StateIdle
-               }
-               wkr.updated = updateTime
-               go wkr.wp.notify()
+
+       // If wkr.state and wkr.running aren't changing then there's
+       // no need to log anything, notify the scheduler, move state
+       // back and forth between idle/running, etc.
+       if !changed {
+               return
+       }
+
+       // Log whenever a run-probe reveals crunch-run processes
+       // appearing/disappearing before boot-probe succeeds.
+       if wkr.state == StateUnknown && len(running) != len(wkr.running) {
+               logger.WithFields(logrus.Fields{
+                       "RunningContainers": len(running),
+                       "State":             wkr.state,
+               }).Info("crunch-run probe succeeded, but boot probe is still failing")
+       }
+
+       wkr.running = running
+       if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
+               wkr.state = StateRunning
+       } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
+               wkr.state = StateIdle
        }
+       wkr.updated = updateTime
+       if booted && (initialState == StateUnknown || initialState == StateBooting) {
+               logger.WithFields(logrus.Fields{
+                       "RunningContainers": len(running),
+                       "State":             wkr.state,
+               }).Info("probes succeeded, instance is in service")
+       }
+       go wkr.wp.notify()
 }
 
-func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
+func (wkr *worker) probeRunning() (running []string, ok bool) {
        cmd := "crunch-run --list"
-       stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+       stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
        if err != nil {
                wkr.logger.WithFields(logrus.Fields{
                        "Command": cmd,
                        "stdout":  string(stdout),
                        "stderr":  string(stderr),
                }).WithError(err).Warn("probe failed")
-               return nil, false, stderr
+               return nil, false
        }
        stdout = bytes.TrimRight(stdout, "\n")
        if len(stdout) == 0 {
-               return nil, true, stderr
+               return nil, true
        }
-       return strings.Split(string(stdout), "\n"), true, stderr
+       return strings.Split(string(stdout), "\n"), true
 }
 
 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
@@ -255,7 +346,7 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
        if cmd == "" {
                cmd = "true"
        }
-       stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+       stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
        logger := wkr.logger.WithFields(logrus.Fields{
                "Command": cmd,
                "stdout":  string(stdout),
@@ -270,16 +361,17 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
 }
 
 // caller must have lock.
-func (wkr *worker) shutdownIfBroken(dur time.Duration) {
-       if wkr.state == StateHold {
-               return
+func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
+       if wkr.idleBehavior == IdleBehaviorHold {
+               // Never shut down.
+               return false
        }
        label, threshold := "", wkr.wp.timeoutProbe
-       if wkr.state == StateBooting {
+       if wkr.state == StateUnknown || wkr.state == StateBooting {
                label, threshold = "new ", wkr.wp.timeoutBooting
        }
        if dur < threshold {
-               return
+               return false
        }
        wkr.logger.WithFields(logrus.Fields{
                "Duration": dur,
@@ -287,23 +379,35 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) {
                "State":    wkr.state,
        }).Warnf("%sinstance unresponsive, shutting down", label)
        wkr.shutdown()
+       return true
 }
 
 // caller must have lock.
 func (wkr *worker) shutdownIfIdle() bool {
-       if wkr.state != StateIdle {
+       if wkr.idleBehavior == IdleBehaviorHold {
+               // Never shut down.
                return false
        }
        age := time.Since(wkr.busy)
-       if age < wkr.wp.timeoutIdle {
+
+       old := age >= wkr.wp.timeoutIdle
+       draining := wkr.idleBehavior == IdleBehaviorDrain
+       shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
+               (draining && wkr.state == StateBooting)
+       if !shouldShutdown {
                return false
        }
-       wkr.logger.WithField("Age", age).Info("shutdown idle worker")
+
+       wkr.logger.WithFields(logrus.Fields{
+               "State":        wkr.state,
+               "Age":          age,
+               "IdleBehavior": wkr.idleBehavior,
+       }).Info("shutdown idle worker")
        wkr.shutdown()
        return true
 }
 
-// caller must have lock
+// caller must have lock.
 func (wkr *worker) shutdown() {
        now := time.Now()
        wkr.updated = now
@@ -318,3 +422,27 @@ func (wkr *worker) shutdown() {
                }
        }()
 }
+
+// Save worker tags to cloud provider metadata, if they don't already
+// match. Caller must have lock.
+func (wkr *worker) saveTags() {
+       instance := wkr.instance
+       have := instance.Tags()
+       want := cloud.InstanceTags{
+               tagKeyInstanceType: wkr.instType.Name,
+               tagKeyIdleBehavior: string(wkr.idleBehavior),
+       }
+       go func() {
+               for k, v := range want {
+                       if v == have[k] {
+                               continue
+                       }
+                       err := instance.SetTags(want)
+                       if err != nil {
+                               wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags")
+                       }
+                       break
+
+               }
+       }()
+}
diff --git a/lib/dispatchcloud/worker/worker_test.go b/lib/dispatchcloud/worker/worker_test.go
new file mode 100644 (file)
index 0000000..2eb5255
--- /dev/null
@@ -0,0 +1,239 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+       "errors"
+       "io"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&WorkerSuite{})
+
+type WorkerSuite struct{}
+
+func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
+       logger := test.Logger()
+       bootTimeout := time.Minute
+       probeTimeout := time.Second
+
+       is, err := (&test.StubDriver{}).InstanceSet(nil, "", logger)
+       c.Assert(err, check.IsNil)
+       inst, err := is.Create(arvados.InstanceType{}, "", nil, nil)
+       c.Assert(err, check.IsNil)
+
+       type trialT struct {
+               testCaseComment string // displayed in test output to help identify failure case
+               age             time.Duration
+               state           State
+               running         int
+               starting        int
+               respBoot        stubResp // zero value is success
+               respRun         stubResp // zero value is success + nothing running
+               expectState     State
+               expectRunning   int
+       }
+
+       errFail := errors.New("failed")
+       respFail := stubResp{"", "command failed\n", errFail}
+       respContainerRunning := stubResp{"zzzzz-dz642-abcdefghijklmno\n", "", nil}
+       for _, trial := range []trialT{
+               {
+                       testCaseComment: "Unknown, probes fail",
+                       state:           StateUnknown,
+                       respBoot:        respFail,
+                       respRun:         respFail,
+                       expectState:     StateUnknown,
+               },
+               {
+                       testCaseComment: "Unknown, boot probe fails, but one container is running",
+                       state:           StateUnknown,
+                       respBoot:        respFail,
+                       respRun:         respContainerRunning,
+                       expectState:     StateUnknown,
+                       expectRunning:   1,
+               },
+               {
+                       testCaseComment: "Unknown, boot probe fails, previously running container has exited",
+                       state:           StateUnknown,
+                       running:         1,
+                       respBoot:        respFail,
+                       expectState:     StateUnknown,
+                       expectRunning:   0,
+               },
+               {
+                       testCaseComment: "Unknown, boot timeout exceeded, boot probe fails",
+                       state:           StateUnknown,
+                       age:             bootTimeout + time.Second,
+                       respBoot:        respFail,
+                       respRun:         respFail,
+                       expectState:     StateShutdown,
+               },
+               {
+                       testCaseComment: "Unknown, boot timeout exceeded, boot probe succeeds but crunch-run fails",
+                       state:           StateUnknown,
+                       age:             bootTimeout * 2,
+                       respRun:         respFail,
+                       expectState:     StateShutdown,
+               },
+               {
+                       testCaseComment: "Unknown, boot timeout exceeded, boot probe fails but crunch-run succeeds",
+                       state:           StateUnknown,
+                       age:             bootTimeout * 2,
+                       respBoot:        respFail,
+                       expectState:     StateShutdown,
+               },
+               {
+                       testCaseComment: "Unknown, boot timeout exceeded, boot probe fails but container is running",
+                       state:           StateUnknown,
+                       age:             bootTimeout * 2,
+                       respBoot:        respFail,
+                       respRun:         respContainerRunning,
+                       expectState:     StateUnknown,
+                       expectRunning:   1,
+               },
+               {
+                       testCaseComment: "Booting, boot probe fails, run probe fails",
+                       state:           StateBooting,
+                       respBoot:        respFail,
+                       respRun:         respFail,
+                       expectState:     StateBooting,
+               },
+               {
+                       testCaseComment: "Booting, boot probe fails, run probe succeeds (but isn't expected to be called)",
+                       state:           StateBooting,
+                       respBoot:        respFail,
+                       expectState:     StateBooting,
+               },
+               {
+                       testCaseComment: "Booting, boot probe succeeds, run probe fails",
+                       state:           StateBooting,
+                       respRun:         respFail,
+                       expectState:     StateBooting,
+               },
+               {
+                       testCaseComment: "Booting, boot probe succeeds, run probe succeeds",
+                       state:           StateBooting,
+                       expectState:     StateIdle,
+               },
+               {
+                       testCaseComment: "Booting, boot probe succeeds, run probe succeeds, container is running",
+                       state:           StateBooting,
+                       respRun:         respContainerRunning,
+                       expectState:     StateRunning,
+                       expectRunning:   1,
+               },
+               {
+                       testCaseComment: "Booting, boot timeout exceeded",
+                       state:           StateBooting,
+                       age:             bootTimeout * 2,
+                       respRun:         respFail,
+                       expectState:     StateShutdown,
+               },
+               {
+                       testCaseComment: "Idle, probe timeout exceeded, one container running",
+                       state:           StateIdle,
+                       age:             probeTimeout * 2,
+                       respRun:         respContainerRunning,
+                       expectState:     StateRunning,
+                       expectRunning:   1,
+               },
+               {
+                       testCaseComment: "Idle, probe timeout exceeded, one container running, probe fails",
+                       state:           StateIdle,
+                       age:             probeTimeout * 2,
+                       running:         1,
+                       respRun:         respFail,
+                       expectState:     StateShutdown,
+                       expectRunning:   1,
+               },
+               {
+                       testCaseComment: "Idle, probe timeout exceeded, nothing running, probe fails",
+                       state:           StateIdle,
+                       age:             probeTimeout * 2,
+                       respRun:         respFail,
+                       expectState:     StateShutdown,
+               },
+               {
+                       testCaseComment: "Running, one container still running",
+                       state:           StateRunning,
+                       running:         1,
+                       respRun:         respContainerRunning,
+                       expectState:     StateRunning,
+                       expectRunning:   1,
+               },
+               {
+                       testCaseComment: "Running, container has exited",
+                       state:           StateRunning,
+                       running:         1,
+                       expectState:     StateIdle,
+                       expectRunning:   0,
+               },
+               {
+                       testCaseComment: "Running, probe timeout exceeded, nothing running, new container being started",
+                       state:           StateRunning,
+                       age:             probeTimeout * 2,
+                       starting:        1,
+                       expectState:     StateRunning,
+               },
+       } {
+               c.Logf("------- %#v", trial)
+               ctime := time.Now().Add(-trial.age)
+               exr := stubExecutor{
+                       "bootprobe":         trial.respBoot,
+                       "crunch-run --list": trial.respRun,
+               }
+               wp := &Pool{
+                       newExecutor:      func(cloud.Instance) Executor { return exr },
+                       bootProbeCommand: "bootprobe",
+                       timeoutBooting:   bootTimeout,
+                       timeoutProbe:     probeTimeout,
+                       exited:           map[string]time.Time{},
+               }
+               wkr := &worker{
+                       logger:   logger,
+                       executor: exr,
+                       wp:       wp,
+                       mtx:      &wp.mtx,
+                       state:    trial.state,
+                       instance: inst,
+                       appeared: ctime,
+                       busy:     ctime,
+                       probed:   ctime,
+                       updated:  ctime,
+               }
+               if trial.running > 0 {
+                       wkr.running = map[string]struct{}{"zzzzz-dz642-abcdefghijklmno": struct{}{}}
+               }
+               if trial.starting > 0 {
+                       wkr.starting = map[string]struct{}{"zzzzz-dz642-abcdefghijklmno": struct{}{}}
+               }
+               wkr.probeAndUpdate()
+               c.Check(wkr.state, check.Equals, trial.expectState)
+               c.Check(len(wkr.running), check.Equals, trial.expectRunning)
+       }
+}
+
+type stubResp struct {
+       stdout string
+       stderr string
+       err    error
+}
+type stubExecutor map[string]stubResp
+
+func (se stubExecutor) SetTarget(cloud.ExecutorTarget) {}
+func (se stubExecutor) Close()                         {}
+func (se stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) {
+       resp, ok := se[cmd]
+       if !ok {
+               return nil, []byte("command not found\n"), errors.New("command not found")
+       }
+       return []byte(resp.stdout), []byte(resp.stderr), resp.err
+}
index cca9f9bf1be8e946b7b9594f1ed839e92aa73485..787e01ab8f7dc8be892e7c754bca4a29cba84b13 100644 (file)
@@ -210,14 +210,19 @@ func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.
        if err != nil {
                return err
        }
-       if (method == "GET" || body != nil) && urlValues != nil {
-               // FIXME: what if params don't fit in URL
+       if urlValues == nil {
+               // Nothing to send
+       } else if method == "GET" || method == "HEAD" || body != nil {
+               // Must send params in query part of URL (FIXME: what
+               // if resulting URL is too long?)
                u, err := url.Parse(urlString)
                if err != nil {
                        return err
                }
                u.RawQuery = urlValues.Encode()
                urlString = u.String()
+       } else {
+               body = strings.NewReader(urlValues.Encode())
        }
        req, err := http.NewRequest(method, urlString, body)
        if err != nil {
index bfa86abf6a48a1fcf30eda2618bdfaf28b0a2efb..1154f922ba45f0b76484e08ac182276c147641db 100644 (file)
@@ -121,7 +121,12 @@ type CloudVMs struct {
        // and ready to run containers, e.g., "mount | grep
        // /encrypted-tmp"
        BootProbeCommand string
-       SyncInterval     Duration
+
+       // Listening port (name or number) of SSH servers on worker
+       // VMs
+       SSHPort string
+
+       SyncInterval Duration
 
        // Maximum idle time before automatic shutdown
        TimeoutIdle Duration
index 02a0d76decbad272baee737282b5087a72a33c60..fb095481bb07b2aa97489a17347e56c65166b356 100644 (file)
@@ -8,21 +8,22 @@ import "time"
 
 // Container is an arvados#container resource.
 type Container struct {
-       UUID                 string               `json:"uuid"`
-       CreatedAt            time.Time            `json:"created_at"`
-       Command              []string             `json:"command"`
-       ContainerImage       string               `json:"container_image"`
-       Cwd                  string               `json:"cwd"`
-       Environment          map[string]string    `json:"environment"`
-       LockedByUUID         string               `json:"locked_by_uuid"`
-       Mounts               map[string]Mount     `json:"mounts"`
-       Output               string               `json:"output"`
-       OutputPath           string               `json:"output_path"`
-       Priority             int64                `json:"priority"`
-       RuntimeConstraints   RuntimeConstraints   `json:"runtime_constraints"`
-       State                ContainerState       `json:"state"`
-       SchedulingParameters SchedulingParameters `json:"scheduling_parameters"`
-       ExitCode             int                  `json:"exit_code"`
+       UUID                 string                 `json:"uuid"`
+       CreatedAt            time.Time              `json:"created_at"`
+       Command              []string               `json:"command"`
+       ContainerImage       string                 `json:"container_image"`
+       Cwd                  string                 `json:"cwd"`
+       Environment          map[string]string      `json:"environment"`
+       LockedByUUID         string                 `json:"locked_by_uuid"`
+       Mounts               map[string]Mount       `json:"mounts"`
+       Output               string                 `json:"output"`
+       OutputPath           string                 `json:"output_path"`
+       Priority             int64                  `json:"priority"`
+       RuntimeConstraints   RuntimeConstraints     `json:"runtime_constraints"`
+       State                ContainerState         `json:"state"`
+       SchedulingParameters SchedulingParameters   `json:"scheduling_parameters"`
+       ExitCode             int                    `json:"exit_code"`
+       RuntimeStatus        map[string]interface{} `json:"runtime_status"`
 }
 
 // Container is an arvados#container resource.
index e0f2483131a98a64856116bda8c14b4de7bd7051..4f648e9b437e7b5eead7abf4b0db302011725cb7 100644 (file)
@@ -41,6 +41,10 @@ const (
        QueuedContainerRequestUUID = "zzzzz-xvhdp-cr4queuedcontnr"
        QueuedContainerUUID        = "zzzzz-dz642-queuedcontainer"
 
+       RunningContainerUUID = "zzzzz-dz642-runningcontainr"
+
+       CompletedContainerUUID = "zzzzz-dz642-compltcontainer"
+
        ArvadosRepoUUID = "zzzzz-s0uqq-arvadosrepo0123"
        ArvadosRepoName = "arvados"
        FooRepoUUID     = "zzzzz-s0uqq-382brsig8rp3666"
index ec5fa21e2a61b6e5d462246e898d7257a22f9d66..1b24804b056e2cb2caef666ae695649d935613cd 100644 (file)
                        "revision": "2bb1b664bcff821e02b2a0644cd29c7e824d54f8",
                        "revisionTime": "2015-08-17T12:26:01Z"
                },
+               {
+                       "checksumSHA1": "X7g98YfLr+zM7aN76AZvAfpZyfk=",
+                       "path": "github.com/julienschmidt/httprouter",
+                       "revision": "adbc77eec0d91467376ca515bc3a14b8434d0f18",
+                       "revisionTime": "2018-04-11T15:45:01Z"
+               },
                {
                        "checksumSHA1": "oX6jFQD74oOApvDIhOzW2dXpg5Q=",
                        "path": "github.com/kevinburke/ssh_config",