"Arvados server daemons"
package_go_binary cmd/arvados-server arvados-controller \
"Arvados cluster controller daemon"
-# No package until #14325
-#package_go_binary cmd/arvados-server crunch-dispatch-cloud \
-# "Arvados cluster cloud dispatch"
+package_go_binary cmd/arvados-server crunch-dispatch-cloud \
+ "Arvados cluster cloud dispatch"
package_go_binary sdk/go/crunchrunner crunchrunner \
"Crunchrunner executes a command inside a container and uploads the output"
package_go_binary services/arv-git-httpd arvados-git-httpd \
type QueueEnt struct {
// The container to run. Only the UUID, State, Priority, and
// RuntimeConstraints fields are populated.
- Container arvados.Container
- InstanceType arvados.InstanceType
+ Container arvados.Container `json:"container"`
+ InstanceType arvados.InstanceType `json:"instance_type"`
}
// String implements fmt.Stringer by returning the queued container's
cq.mtx.Lock()
defer cq.mtx.Unlock()
for uuid, ctr := range next {
- if _, keep := cq.dontupdate[uuid]; keep {
+ if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
+ // Don't clobber a local update that happened
+ // after we started polling.
continue
}
if cur, ok := cq.current[uuid]; !ok {
}
}
for uuid := range cq.current {
- if _, keep := cq.dontupdate[uuid]; keep {
+ if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
+ // Don't expunge an entry that was
+ // added/updated locally after we started
+ // polling.
continue
- } else if _, keep = next[uuid]; keep {
- continue
- } else {
+ } else if _, stillpresent := next[uuid]; !stillpresent {
+ // Expunge an entry that no longer appears in
+ // the poll response (evidently it's
+ // cancelled, completed, deleted, or taken by
+ // a different dispatcher).
delete(cq.current, uuid)
}
}
func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
it, err := cq.chooseType(&ctr)
- if err != nil {
- // FIXME: throttle warnings, cancel after timeout
- cq.logger.Warnf("cannot run %s", &ctr)
+ if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
+ // We assume here that any chooseType error is a hard
+ // error: it wouldn't help to try again, or to leave
+ // it for a different dispatcher process to attempt.
+ errorString := err.Error()
+ cq.logger.WithField("ContainerUUID", ctr.UUID).Warn("cancel container with no suitable instance type")
+ go func() {
+ var err error
+ defer func() {
+ if err == nil {
+ return
+ }
+ // On failure, check current container
+ // state, and don't log the error if
+ // the failure came from losing a
+ // race.
+ var latest arvados.Container
+ cq.client.RequestAndDecode(&latest, "GET", "arvados/v1/containers/"+ctr.UUID, nil, map[string][]string{"select": {"state"}})
+ if latest.State == arvados.ContainerStateCancelled {
+ return
+ }
+ cq.logger.WithField("ContainerUUID", ctr.UUID).WithError(err).Warn("error while trying to cancel unsatisfiable container")
+ }()
+ if ctr.State == arvados.ContainerStateQueued {
+ err = cq.Lock(ctr.UUID)
+ if err != nil {
+ return
+ }
+ }
+ err = cq.setRuntimeError(ctr.UUID, errorString)
+ if err != nil {
+ return
+ }
+ err = cq.Cancel(ctr.UUID)
+ if err != nil {
+ return
+ }
+ }()
return
}
cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
return cq.apiUpdate(uuid, "unlock")
}
+// setRuntimeError sets runtime_status["error"] to the given value.
+// Container should already have state==Locked or Running.
+func (cq *Queue) setRuntimeError(uuid, errorString string) error {
+ return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]map[string]interface{}{
+ "container": {
+ "runtime_status": {
+ "error": errorString,
+ },
+ },
+ })
+}
+
// Cancel cancels the given container.
func (cq *Queue) Cancel(uuid string) error {
err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package container
+
+import (
+ "errors"
+ "os"
+ "sync"
+ "testing"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "github.com/sirupsen/logrus"
+ check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
+
+var _ = check.Suite(&IntegrationSuite{})
+
+func logger() logrus.FieldLogger {
+ logger := logrus.StandardLogger()
+ if os.Getenv("ARVADOS_DEBUG") != "" {
+ logger.SetLevel(logrus.DebugLevel)
+ }
+ return logger
+}
+
+type IntegrationSuite struct{}
+
+func (suite *IntegrationSuite) TearDownTest(c *check.C) {
+ err := arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil)
+ c.Check(err, check.IsNil)
+}
+
+func (suite *IntegrationSuite) TestGetLockUnlockCancel(c *check.C) {
+ typeChooser := func(ctr *arvados.Container) (arvados.InstanceType, error) {
+ return arvados.InstanceType{Name: "testType"}, nil
+ }
+
+ client := arvados.NewClientFromEnv()
+ cq := NewQueue(logger(), nil, typeChooser, client)
+
+ err := cq.Update()
+ c.Check(err, check.IsNil)
+
+ ents, threshold := cq.Entries()
+ c.Check(len(ents), check.Not(check.Equals), 0)
+ c.Check(time.Since(threshold) < time.Minute, check.Equals, true)
+ c.Check(time.Since(threshold) > 0, check.Equals, true)
+
+ _, ok := ents[arvadostest.QueuedContainerUUID]
+ c.Check(ok, check.Equals, true)
+
+ var wg sync.WaitGroup
+ for uuid, ent := range ents {
+ c.Check(ent.Container.UUID, check.Equals, uuid)
+ c.Check(ent.InstanceType.Name, check.Equals, "testType")
+ c.Check(ent.Container.State, check.Equals, arvados.ContainerStateQueued)
+ c.Check(ent.Container.Priority > 0, check.Equals, true)
+
+ ctr, ok := cq.Get(uuid)
+ c.Check(ok, check.Equals, true)
+ c.Check(ctr.UUID, check.Equals, uuid)
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ err := cq.Unlock(uuid)
+ c.Check(err, check.NotNil)
+ err = cq.Lock(uuid)
+ c.Check(err, check.IsNil)
+ ctr, ok := cq.Get(uuid)
+ c.Check(ok, check.Equals, true)
+ c.Check(ctr.State, check.Equals, arvados.ContainerStateLocked)
+ err = cq.Lock(uuid)
+ c.Check(err, check.NotNil)
+ err = cq.Unlock(uuid)
+ c.Check(err, check.IsNil)
+ ctr, ok = cq.Get(uuid)
+ c.Check(ok, check.Equals, true)
+ c.Check(ctr.State, check.Equals, arvados.ContainerStateQueued)
+ err = cq.Unlock(uuid)
+ c.Check(err, check.NotNil)
+ }()
+ }
+ wg.Wait()
+
+ err = cq.Cancel(arvadostest.CompletedContainerUUID)
+ c.Check(err, check.ErrorMatches, `.*State cannot change from Complete to Cancelled.*`)
+}
+
+func (suite *IntegrationSuite) TestCancelIfNoInstanceType(c *check.C) {
+ errorTypeChooser := func(ctr *arvados.Container) (arvados.InstanceType, error) {
+ return arvados.InstanceType{}, errors.New("no suitable instance type")
+ }
+
+ client := arvados.NewClientFromEnv()
+ cq := NewQueue(logger(), nil, errorTypeChooser, client)
+
+ var ctr arvados.Container
+ err := client.RequestAndDecode(&ctr, "GET", "arvados/v1/containers/"+arvadostest.QueuedContainerUUID, nil, nil)
+ c.Check(err, check.IsNil)
+ c.Check(ctr.State, check.Equals, arvados.ContainerStateQueued)
+
+ cq.Update()
+
+ // Wait for the cancel operation to take effect. Container
+ // will have state=Cancelled or just disappear from the queue.
+ suite.waitfor(c, time.Second, func() bool {
+ err := client.RequestAndDecode(&ctr, "GET", "arvados/v1/containers/"+arvadostest.QueuedContainerUUID, nil, nil)
+ return err == nil && ctr.State == arvados.ContainerStateCancelled
+ })
+ c.Check(ctr.RuntimeStatus["error"], check.Equals, `no suitable instance type`)
+}
+
+func (suite *IntegrationSuite) waitfor(c *check.C, timeout time.Duration, fn func() bool) {
+ defer func() {
+ c.Check(fn(), check.Equals, true)
+ }()
+ deadline := time.Now().Add(timeout)
+ for !fn() && time.Now().Before(deadline) {
+ time.Sleep(timeout / 1000)
+ }
+}
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/auth"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
type pool interface {
scheduler.WorkerPool
Instances() []worker.InstanceView
+ SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
Stop()
}
// Make a worker.Executor for the given instance.
func (disp *dispatcher) newExecutor(inst cloud.Instance) worker.Executor {
exr := ssh_executor.New(inst)
+ exr.SetTargetPort(disp.Cluster.CloudVMs.SSHPort)
exr.SetSigners(disp.sshKey)
return exr
}
if err != nil {
disp.logger.Fatalf("error initializing driver: %s", err)
}
- disp.instanceSet = &instanceSetProxy{instanceSet}
+ disp.instanceSet = instanceSet
disp.reg = prometheus.NewRegistry()
- disp.pool = worker.NewPool(disp.logger, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
+ disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
if disp.Cluster.ManagementToken == "" {
http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
})
} else {
- mux := http.NewServeMux()
- mux.HandleFunc("/arvados/v1/dispatch/containers", disp.apiContainers)
- mux.HandleFunc("/arvados/v1/dispatch/instances", disp.apiInstances)
+ mux := httprouter.New()
+ mux.HandlerFunc("GET", "/arvados/v1/dispatch/containers", disp.apiContainers)
+ mux.HandlerFunc("GET", "/arvados/v1/dispatch/instances", disp.apiInstances)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/hold", disp.apiInstanceHold)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/drain", disp.apiInstanceDrain)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/run", disp.apiInstanceRun)
metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
ErrorLog: disp.logger,
})
- mux.Handle("/metrics", metricsH)
- mux.Handle("/metrics.json", metricsH)
+ mux.Handler("GET", "/metrics", metricsH)
+ mux.Handler("GET", "/metrics.json", metricsH)
disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
}
}
// Management API: all active and queued containers.
func (disp *dispatcher) apiContainers(w http.ResponseWriter, r *http.Request) {
- if r.Method != "GET" {
- httpserver.Error(w, "method not allowed", http.StatusMethodNotAllowed)
- return
- }
var resp struct {
- Items []container.QueueEnt
+ Items []container.QueueEnt `json:"items"`
}
qEntries, _ := disp.queue.Entries()
for _, ent := range qEntries {
// Management API: all active instances (cloud VMs).
func (disp *dispatcher) apiInstances(w http.ResponseWriter, r *http.Request) {
- if r.Method != "GET" {
- httpserver.Error(w, "method not allowed", http.StatusMethodNotAllowed)
- return
- }
var resp struct {
- Items []worker.InstanceView
+ Items []worker.InstanceView `json:"items"`
}
resp.Items = disp.pool.Instances()
json.NewEncoder(w).Encode(resp)
}
+
+// Management API: set idle behavior to "hold" for specified instance.
+func (disp *dispatcher) apiInstanceHold(w http.ResponseWriter, r *http.Request) {
+ disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorHold)
+}
+
+// Management API: set idle behavior to "drain" for specified instance.
+func (disp *dispatcher) apiInstanceDrain(w http.ResponseWriter, r *http.Request) {
+ disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorDrain)
+}
+
+// Management API: set idle behavior to "run" for specified instance.
+func (disp *dispatcher) apiInstanceRun(w http.ResponseWriter, r *http.Request) {
+ disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorRun)
+}
+
+func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
+ params, _ := r.Context().Value(httprouter.ParamsKey).(httprouter.Params)
+ id := cloud.InstanceID(params.ByName("instance_id"))
+ err := disp.pool.SetIdleBehavior(id, want)
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusNotFound)
+ return
+ }
+}
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- "github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
check "gopkg.in/check.v1"
)
var _ = check.Suite(&DispatcherSuite{})
type DispatcherSuite struct {
- cluster *arvados.Cluster
- instanceSet *test.LameInstanceSet
- stubDriver *test.StubDriver
- disp *dispatcher
-}
-
-func (s *DispatcherSuite) SetUpSuite(c *check.C) {
- if os.Getenv("ARVADOS_DEBUG") != "" {
- logrus.StandardLogger().SetLevel(logrus.DebugLevel)
- }
+ cluster *arvados.Cluster
+ stubDriver *test.StubDriver
+ disp *dispatcher
}
func (s *DispatcherSuite) SetUpTest(c *check.C) {
_, hostpriv := test.LoadTestKey(c, "test/sshkey_vm")
s.stubDriver = &test.StubDriver{
- HostKey: hostpriv,
- AuthorizedKeys: []ssh.PublicKey{dispatchpub},
- ErrorRateDestroy: 0.1,
+ HostKey: hostpriv,
+ AuthorizedKeys: []ssh.PublicKey{dispatchpub},
+ ErrorRateDestroy: 0.1,
+ MinTimeBetweenCreateCalls: time.Millisecond,
}
s.cluster = &arvados.Cluster{
CloudVMs: arvados.CloudVMs{
Driver: "test",
SyncInterval: arvados.Duration(10 * time.Millisecond),
- TimeoutIdle: arvados.Duration(30 * time.Millisecond),
- TimeoutBooting: arvados.Duration(30 * time.Millisecond),
+ TimeoutIdle: arvados.Duration(150 * time.Millisecond),
+ TimeoutBooting: arvados.Duration(150 * time.Millisecond),
TimeoutProbe: arvados.Duration(15 * time.Millisecond),
TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
},
type instance struct {
Instance string
- WorkerState string
+ WorkerState string `json:"worker_state"`
Price float64
- LastContainerUUID string
- ArvadosInstanceType string
- ProviderInstanceType string
+ LastContainerUUID string `json:"last_container_uuid"`
+ ArvadosInstanceType string `json:"arvados_instance_type"`
+ ProviderInstanceType string `json:"provider_instance_type"`
}
type instancesResponse struct {
Items []instance
ch := s.disp.pool.Subscribe()
defer s.disp.pool.Unsubscribe(ch)
- err := s.disp.pool.Create(test.InstanceType(1))
- c.Check(err, check.IsNil)
+ ok := s.disp.pool.Create(test.InstanceType(1))
+ c.Check(ok, check.Equals, true)
<-ch
sr = getInstances()
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package dispatchcloud
-
-import (
- "git.curoverse.com/arvados.git/lib/cloud"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "golang.org/x/crypto/ssh"
-)
-
-type instanceSetProxy struct {
- cloud.InstanceSet
-}
-
-func (is *instanceSetProxy) Create(it arvados.InstanceType, id cloud.ImageID, tags cloud.InstanceTags, pk ssh.PublicKey) (cloud.Instance, error) {
- // TODO: return if Create failed recently with a RateLimitError or QuotaError
- return is.InstanceSet.Create(it, id, tags, pk)
-}
-
-func (is *instanceSetProxy) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
- // TODO: return if Instances failed recently with a RateLimitError
- return is.InstanceSet.Instances(tags)
-}
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+# cpan -I -T install Graph::Easy
+# (eval `perl -I ~/perl5/lib/perl5 -Mlocal::lib`; cpan -T install Graph::Easy)
+# graph-easy --as=svg < readme_states.txt
+
+[Nonexistent] - appears in cloud list -> [Unknown]
+[Nonexistent] - create() returns ID -> [Booting]
+[Unknown] - create() returns ID -> [Booting]
+[Unknown] - boot timeout -> [Shutdown]
+[Booting] - boot+run probes succeed -> [Idle]
+[Idle] - idle timeout -> [Shutdown]
+[Idle] - probe timeout -> [Shutdown]
+[Idle] - want=drain -> [Shutdown]
+[Idle] - container starts -> [Running]
+[Running] - container ends -> [Idle]
+[Running] - container ends, want=drain -> [Shutdown]
+[Shutdown] - instance disappears from cloud -> [Gone]
+
+# Layouter fails if we add these
+#[Hold] - want=run -> [Booting]
+#[Hold] - want=drain -> [Shutdown]
+#[Running] - container ends, want=hold -> [Hold]
+#[Unknown] - want=hold -> [Hold]
+#[Booting] - want=hold -> [Hold]
+#[Idle] - want=hold -> [Hold]
+
+# Not worth saying?
+#[Booting] - boot probe succeeds, run probe fails -> [Booting]
)
// A ContainerQueue is a set of containers that need to be started or
-// stopped. Implemented by container.Queue and test stubs.
+// stopped. Implemented by container.Queue and test stubs. See
+// container.Queue method documentation for details.
type ContainerQueue interface {
Entries() (entries map[string]container.QueueEnt, updated time.Time)
Lock(uuid string) error
// A WorkerPool asynchronously starts and stops worker VMs, and starts
// and stops containers on them. Implemented by worker.Pool and test
-// stubs.
+// stubs. See worker.Pool method documentation for details.
type WorkerPool interface {
Running() map[string]time.Time
Unallocated() map[arvados.InstanceType]int
CountWorkers() map[worker.State]int
AtQuota() bool
- Create(arvados.InstanceType) error
+ Create(arvados.InstanceType) bool
Shutdown(arvados.InstanceType) bool
StartContainer(arvados.InstanceType, arvados.Container) bool
KillContainer(uuid string)
import (
"sort"
- "git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/lib/dispatchcloud/container"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/sirupsen/logrus"
break tryrun
} else {
logger.Info("creating new instance")
- err := sch.pool.Create(it)
- if err != nil {
- if _, ok := err.(cloud.QuotaError); !ok {
- logger.WithError(err).Warn("error creating worker")
- }
+ if !sch.pool.Create(it) {
+ // (Note pool.Create works
+ // asynchronously and logs its
+ // own failures, so we don't
+ // need to log this as a
+ // failure.)
+
sch.queue.Unlock(ctr.UUID)
// Don't let lower-priority
// containers starve this one
package scheduler
import (
- "errors"
+ "sync"
"time"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- "github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
var (
- logger = logrus.StandardLogger()
-
// arbitrary example container UUIDs
uuids = func() (r []string) {
for i := 0; i < 16; i++ {
creates []arvados.InstanceType
starts []string
shutdowns int
+ sync.Mutex
}
-func (p *stubPool) AtQuota() bool { return p.atQuota }
-func (p *stubPool) Subscribe() <-chan struct{} { return p.notify }
-func (p *stubPool) Unsubscribe(<-chan struct{}) {}
-func (p *stubPool) Running() map[string]time.Time { return p.running }
+func (p *stubPool) AtQuota() bool { return p.atQuota }
+func (p *stubPool) Subscribe() <-chan struct{} { return p.notify }
+func (p *stubPool) Unsubscribe(<-chan struct{}) {}
+func (p *stubPool) Running() map[string]time.Time {
+ p.Lock()
+ defer p.Unlock()
+ r := map[string]time.Time{}
+ for k, v := range p.running {
+ r[k] = v
+ }
+ return r
+}
func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
+ p.Lock()
+ defer p.Unlock()
r := map[arvados.InstanceType]int{}
for it, n := range p.unalloc {
r[it] = n
}
return r
}
-func (p *stubPool) Create(it arvados.InstanceType) error {
+func (p *stubPool) Create(it arvados.InstanceType) bool {
+ p.Lock()
+ defer p.Unlock()
p.creates = append(p.creates, it)
if p.canCreate < 1 {
- return stubQuotaError{errors.New("quota")}
+ return false
}
p.canCreate--
p.unalloc[it]++
- return nil
+ return true
}
func (p *stubPool) KillContainer(uuid string) {
- p.running[uuid] = time.Now()
+ p.Lock()
+ defer p.Unlock()
+ delete(p.running, uuid)
}
func (p *stubPool) Shutdown(arvados.InstanceType) bool {
p.shutdowns++
return false
}
func (p *stubPool) CountWorkers() map[worker.State]int {
+ p.Lock()
+ defer p.Unlock()
return map[worker.State]int{
worker.StateBooting: len(p.unalloc) - len(p.idle),
worker.StateIdle: len(p.idle),
}
}
func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
+ p.Lock()
+ defer p.Unlock()
p.starts = append(p.starts, ctr.UUID)
if p.idle[it] == 0 {
return false
return true
}
+func chooseType(ctr *arvados.Container) (arvados.InstanceType, error) {
+ return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
+}
+
var _ = check.Suite(&SchedulerSuite{})
type SchedulerSuite struct{}
// create.
func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
queue := test.Queue{
- ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
- return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
- },
+ ChooseType: chooseType,
Containers: []arvados.Container{
{
UUID: test.ContainerUUID(1),
running: map[string]time.Time{},
canCreate: 0,
}
- New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+ New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
c.Check(pool.running, check.HasLen, 1)
shouldCreate = append(shouldCreate, test.InstanceType(3))
}
queue := test.Queue{
- ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
- return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
- },
+ ChooseType: chooseType,
Containers: []arvados.Container{
{
UUID: test.ContainerUUID(2),
starts: []string{},
canCreate: 0,
}
- New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+ New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, shouldCreate)
c.Check(pool.starts, check.DeepEquals, []string{})
c.Check(pool.shutdowns, check.Not(check.Equals), 0)
canCreate: 4,
}
queue := test.Queue{
- ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
- return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
- },
+ ChooseType: chooseType,
Containers: []arvados.Container{
{
// create a new worker
},
}
queue.Update()
- New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+ New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
running := map[string]bool{}
//
// An Executor must not be copied.
type Executor struct {
- target cloud.ExecutorTarget
- signers []ssh.Signer
- mtx sync.RWMutex // controls access to instance after creation
+ target cloud.ExecutorTarget
+ targetPort string
+ signers []ssh.Signer
+ mtx sync.RWMutex // controls access to instance after creation
client *ssh.Client
clientErr error
exr.target = t
}
+// SetTargetPort sets the default port (name or number) to connect
+// to. This is used only when the address returned by the target's
+// Address() method does not specify a port. If the given port is
+// empty (or SetTargetPort is not called at all), the default port is
+// "ssh".
+func (exr *Executor) SetTargetPort(port string) {
+ exr.mtx.Lock()
+ defer exr.mtx.Unlock()
+ exr.targetPort = port
+}
+
// Target returns the current target.
func (exr *Executor) Target() cloud.ExecutorTarget {
exr.mtx.RLock()
// Execute runs cmd on the target. If an existing connection is not
// usable, it sets up a new connection to the current target.
-func (exr *Executor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+func (exr *Executor) Execute(env map[string]string, cmd string, stdin io.Reader) ([]byte, []byte, error) {
session, err := exr.newSession()
if err != nil {
return nil, nil, err
}
defer session.Close()
+ for k, v := range env {
+ err = session.Setenv(k, v)
+ if err != nil {
+ return nil, nil, err
+ }
+ }
var stdout, stderr bytes.Buffer
session.Stdin = stdin
session.Stdout = &stdout
if addr == "" {
return nil, errors.New("instance has no address")
}
+ if h, p, err := net.SplitHostPort(addr); err != nil || p == "" {
+ // Target address does not specify a port. Use
+ // targetPort, or "ssh".
+ if p = exr.targetPort; p == "" {
+ p = "ssh"
+ }
+ addr = net.JoinHostPort(h, p)
+ }
var receivedKey ssh.PublicKey
client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
User: "root",
import (
"bytes"
+ "fmt"
"io"
"io/ioutil"
+ "net"
"sync"
"testing"
"time"
return nil
}
+// Address returns the wrapped SSHService's host, with the port
+// stripped. This ensures the executor won't work until
+// SetTargetPort() is called -- see (*testTarget)Port().
+func (tt *testTarget) Address() string {
+ h, _, err := net.SplitHostPort(tt.SSHService.Address())
+ if err != nil {
+ panic(err)
+ }
+ return h
+}
+
+func (tt *testTarget) Port() string {
+ _, p, err := net.SplitHostPort(tt.SSHService.Address())
+ if err != nil {
+ panic(err)
+ }
+ return p
+}
+
+type mitmTarget struct {
+ test.SSHService
+}
+
+func (*mitmTarget) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
+ return fmt.Errorf("host key failed verification: %#v", key)
+}
+
type ExecutorSuite struct{}
+func (s *ExecutorSuite) TestBadHostKey(c *check.C) {
+ _, hostpriv := test.LoadTestKey(c, "../test/sshkey_vm")
+ clientpub, clientpriv := test.LoadTestKey(c, "../test/sshkey_dispatch")
+ target := &mitmTarget{
+ SSHService: test.SSHService{
+ Exec: func(map[string]string, string, io.Reader, io.Writer, io.Writer) uint32 {
+ c.Error("Target Exec func called even though host key verification failed")
+ return 0
+ },
+ HostKey: hostpriv,
+ AuthorizedKeys: []ssh.PublicKey{clientpub},
+ },
+ }
+
+ err := target.Start()
+ c.Check(err, check.IsNil)
+ c.Logf("target address %q", target.Address())
+ defer target.Close()
+
+ exr := New(target)
+ exr.SetSigners(clientpriv)
+
+ _, _, err = exr.Execute(nil, "true", nil)
+ c.Check(err, check.ErrorMatches, "host key failed verification: .*")
+}
+
func (s *ExecutorSuite) TestExecute(c *check.C) {
command := `foo 'bar' "baz"`
stdinData := "foobar\nbaz\n"
_, hostpriv := test.LoadTestKey(c, "../test/sshkey_vm")
clientpub, clientpriv := test.LoadTestKey(c, "../test/sshkey_dispatch")
for _, exitcode := range []int{0, 1, 2} {
- srv := &testTarget{
+ target := &testTarget{
SSHService: test.SSHService{
- Exec: func(cmd string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+ Exec: func(env map[string]string, cmd string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+ c.Check(env["TESTVAR"], check.Equals, "test value")
c.Check(cmd, check.Equals, command)
var wg sync.WaitGroup
wg.Add(2)
AuthorizedKeys: []ssh.PublicKey{clientpub},
},
}
- err := srv.Start()
+ err := target.Start()
c.Check(err, check.IsNil)
- c.Logf("srv address %q", srv.Address())
- defer srv.Close()
+ c.Logf("target address %q", target.Address())
+ defer target.Close()
- exr := New(srv)
+ exr := New(target)
exr.SetSigners(clientpriv)
+ // Use the default target port (ssh). Execute will
+ // return a connection error or an authentication
+ // error, depending on whether the test host is
+ // running an SSH server.
+ _, _, err = exr.Execute(nil, command, nil)
+ c.Check(err, check.ErrorMatches, `.*(unable to authenticate|connection refused).*`)
+
+ // Use a bogus target port. Execute will return a
+ // connection error.
+ exr.SetTargetPort("0")
+ _, _, err = exr.Execute(nil, command, nil)
+ c.Check(err, check.ErrorMatches, `.*connection refused.*`)
+
+ // Use the test server's listening port.
+ exr.SetTargetPort(target.Port())
+
done := make(chan bool)
go func() {
- stdout, stderr, err := exr.Execute(command, bytes.NewBufferString(stdinData))
+ stdout, stderr, err := exr.Execute(map[string]string{"TESTVAR": "test value"}, command, bytes.NewBufferString(stdinData))
if exitcode == 0 {
c.Check(err, check.IsNil)
} else {
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package test
-
-import (
- "fmt"
- "math/rand"
- "sync"
-
- "git.curoverse.com/arvados.git/lib/cloud"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "golang.org/x/crypto/ssh"
-)
-
-// LameInstanceSet creates instances that boot but can't run
-// containers.
-type LameInstanceSet struct {
- Hold chan bool // set to make(chan bool) to hold operations until Release is called
-
- mtx sync.Mutex
- instances map[*lameInstance]bool
-}
-
-// Create returns a new instance.
-func (p *LameInstanceSet) Create(instType arvados.InstanceType, imageID cloud.ImageID, tags cloud.InstanceTags, pubkey ssh.PublicKey) (cloud.Instance, error) {
- inst := &lameInstance{
- p: p,
- id: cloud.InstanceID(fmt.Sprintf("lame-%x", rand.Uint64())),
- providerType: instType.ProviderType,
- }
- inst.SetTags(tags)
- if p.Hold != nil {
- p.Hold <- true
- }
- p.mtx.Lock()
- defer p.mtx.Unlock()
- if p.instances == nil {
- p.instances = map[*lameInstance]bool{}
- }
- p.instances[inst] = true
- return inst, nil
-}
-
-// Instances returns the instances that haven't been destroyed.
-func (p *LameInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
- p.mtx.Lock()
- defer p.mtx.Unlock()
- var instances []cloud.Instance
- for i := range p.instances {
- instances = append(instances, i)
- }
- return instances, nil
-}
-
-// Stop is a no-op, but exists to satisfy cloud.InstanceSet.
-func (p *LameInstanceSet) Stop() {
-}
-
-// Release n held calls. Blocks if n calls aren't already
-// waiting. Blocks forever if Hold is nil.
-func (p *LameInstanceSet) Release(n int) {
- for i := 0; i < n; i++ {
- <-p.Hold
- }
-}
-
-type lameInstance struct {
- p *LameInstanceSet
- id cloud.InstanceID
- providerType string
- tags cloud.InstanceTags
-}
-
-func (inst *lameInstance) ID() cloud.InstanceID {
- return inst.id
-}
-
-func (inst *lameInstance) String() string {
- return fmt.Sprint(inst.id)
-}
-
-func (inst *lameInstance) ProviderType() string {
- return inst.providerType
-}
-
-func (inst *lameInstance) Address() string {
- return "0.0.0.0:1234"
-}
-
-func (inst *lameInstance) SetTags(tags cloud.InstanceTags) error {
- inst.p.mtx.Lock()
- defer inst.p.mtx.Unlock()
- inst.tags = cloud.InstanceTags{}
- for k, v := range tags {
- inst.tags[k] = v
- }
- return nil
-}
-
-func (inst *lameInstance) Destroy() error {
- if inst.p.Hold != nil {
- inst.p.Hold <- true
- }
- inst.p.mtx.Lock()
- defer inst.p.mtx.Unlock()
- delete(inst.p.instances, inst)
- return nil
-}
-
-func (inst *lameInstance) Tags() cloud.InstanceTags {
- return inst.tags
-}
-
-func (inst *lameInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
- return nil
-}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package test
+
+import (
+ "os"
+
+ "github.com/sirupsen/logrus"
+)
+
+func Logger() logrus.FieldLogger {
+ logger := logrus.StandardLogger()
+ if os.Getenv("ARVADOS_DEBUG") != "" {
+ logger.SetLevel(logrus.DebugLevel)
+ }
+ return logger
+}
// An SSHExecFunc handles an "exec" session on a multiplexed SSH
// connection.
-type SSHExecFunc func(command string, stdin io.Reader, stdout, stderr io.Writer) uint32
+type SSHExecFunc func(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32
// An SSHService accepts SSH connections on an available TCP port and
// passes clients' "exec" sessions to the provided SSHExecFunc.
log.Printf("accept channel: %s", err)
return
}
- var execReq struct {
- Command string
- }
+ didExec := false
+ sessionEnv := map[string]string{}
go func() {
for req := range reqs {
- if req.Type == "exec" && execReq.Command == "" {
+ switch {
+ case didExec:
+ // Reject anything after exec
+ req.Reply(false, nil)
+ case req.Type == "exec":
+ var execReq struct {
+ Command string
+ }
req.Reply(true, nil)
ssh.Unmarshal(req.Payload, &execReq)
go func() {
var resp struct {
Status uint32
}
- resp.Status = ss.Exec(execReq.Command, ch, ch, ch.Stderr())
+ resp.Status = ss.Exec(sessionEnv, execReq.Command, ch, ch, ch.Stderr())
ch.SendRequest("exit-status", false, ssh.Marshal(&resp))
ch.Close()
}()
+ didExec = true
+ case req.Type == "env":
+ var envReq struct {
+ Name string
+ Value string
+ }
+ req.Reply(true, nil)
+ ssh.Unmarshal(req.Payload, &envReq)
+ sessionEnv[envReq.Name] = envReq.Value
}
}
}()
// Destroy. 0=always succeed, 1=always fail.
ErrorRateDestroy float64
+ // If Create() or Instances() is called too frequently, return
+ // rate-limiting errors.
+ MinTimeBetweenCreateCalls time.Duration
+ MinTimeBetweenInstancesCalls time.Duration
+
+ // If true, Create and Destroy calls block until Release() is
+ // called.
+ HoldCloudOps bool
+
instanceSets []*StubInstanceSet
+ holdCloudOps chan bool
}
// InstanceSet returns a new *StubInstanceSet.
-func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID,
- logger logrus.FieldLogger) (cloud.InstanceSet, error) {
-
+func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
+ if sd.holdCloudOps == nil {
+ sd.holdCloudOps = make(chan bool)
+ }
sis := StubInstanceSet{
driver: sd,
servers: map[cloud.InstanceID]*StubVM{},
return sd.instanceSets
}
+// ReleaseCloudOps releases n pending Create/Destroy calls. If there
+// are fewer than n blocked calls pending, it waits for the rest to
+// arrive.
+func (sd *StubDriver) ReleaseCloudOps(n int) {
+ for i := 0; i < n; i++ {
+ <-sd.holdCloudOps
+ }
+}
+
type StubInstanceSet struct {
driver *StubDriver
servers map[cloud.InstanceID]*StubVM
mtx sync.RWMutex
stopped bool
+
+ allowCreateCall time.Time
+ allowInstancesCall time.Time
}
func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
+ if sis.driver.HoldCloudOps {
+ sis.driver.holdCloudOps <- true
+ }
sis.mtx.Lock()
defer sis.mtx.Unlock()
if sis.stopped {
return nil, errors.New("StubInstanceSet: Create called after Stop")
}
+ if sis.allowCreateCall.After(time.Now()) {
+ return nil, RateLimitError{sis.allowCreateCall}
+ } else {
+ sis.allowCreateCall = time.Now().Add(sis.driver.MinTimeBetweenCreateCalls)
+ }
+
ak := sis.driver.AuthorizedKeys
if authKey != nil {
ak = append([]ssh.PublicKey{authKey}, ak...)
func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
sis.mtx.RLock()
defer sis.mtx.RUnlock()
+ if sis.allowInstancesCall.After(time.Now()) {
+ return nil, RateLimitError{sis.allowInstancesCall}
+ } else {
+ sis.allowInstancesCall = time.Now().Add(sis.driver.MinTimeBetweenInstancesCalls)
+ }
var r []cloud.Instance
for _, ss := range sis.servers {
r = append(r, ss.Instance())
sis.stopped = true
}
+type RateLimitError struct{ Retry time.Time }
+
+func (e RateLimitError) Error() string { return fmt.Sprintf("rate limited until %s", e.Retry) }
+func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
+
// StubVM is a fake server that runs an SSH service. It represents a
// VM running in a fake cloud.
//
}
}
-func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
queue := svm.sis.driver.Queue
uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
if eta := svm.Boot.Sub(time.Now()); eta > 0 {
return 1
}
if strings.HasPrefix(command, "crunch-run --detach ") {
+ for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
+ if env[name] == "" {
+ fmt.Fprintf(stderr, "%s missing from environment %q\n", name, env)
+ return 1
+ }
+ }
svm.Lock()
if svm.running == nil {
svm.running = map[string]bool{}
svm.Unlock()
time.Sleep(svm.CrunchRunDetachDelay)
fmt.Fprintf(stderr, "starting %s\n", uuid)
- logger := logrus.WithField("ContainerUUID", uuid)
+ logger := logrus.WithFields(logrus.Fields{
+ "Instance": svm.id,
+ "ContainerUUID": uuid,
+ })
logger.Printf("[test] starting crunch-run stub")
go func() {
crashluck := math_rand.Float64()
}
func (si stubInstance) Destroy() error {
+ sis := si.svm.sis
+ if sis.driver.HoldCloudOps {
+ sis.driver.holdCloudOps <- true
+ }
if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
return errors.New("instance could not be destroyed")
}
si.svm.SSHService.Close()
- sis := si.svm.sis
sis.mtx.Lock()
defer sis.mtx.Unlock()
delete(sis.servers, si.svm.id)
package worker
import (
+ "errors"
"io"
"sort"
"strings"
const (
tagKeyInstanceType = "InstanceType"
- tagKeyHold = "Hold"
+ tagKeyIdleBehavior = "IdleBehavior"
)
// An InstanceView shows a worker's current state and recent activity.
type InstanceView struct {
- Instance string
- Price float64
- ArvadosInstanceType string
- ProviderInstanceType string
- LastContainerUUID string
- LastBusy time.Time
- WorkerState string
+ Instance cloud.InstanceID `json:"instance"`
+ Price float64 `json:"price"`
+ ArvadosInstanceType string `json:"arvados_instance_type"`
+ ProviderInstanceType string `json:"provider_instance_type"`
+ LastContainerUUID string `json:"last_container_uuid"`
+ LastBusy time.Time `json:"last_busy"`
+ WorkerState string `json:"worker_state"`
+ IdleBehavior IdleBehavior `json:"idle_behavior"`
}
// An Executor executes shell commands on a remote host.
type Executor interface {
// Run cmd on the current target.
- Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
+ Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
// Use the given target for subsequent operations. The new
// target is the same host as the previous target, but it
defaultTimeoutBooting = time.Minute * 10
defaultTimeoutProbe = time.Minute * 10
defaultTimeoutShutdown = time.Second * 10
+
+ // Time after a quota error to try again anyway, even if no
+ // instances have been shutdown.
+ quotaErrorTTL = time.Minute
+
+ // Time between "X failed because rate limiting" messages
+ logRateLimitErrorInterval = time.Second * 10
)
func duration(conf arvados.Duration, def time.Duration) time.Duration {
//
// New instances are configured and set up according to the given
// cluster configuration.
-func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
wp := &Pool{
logger: logger,
- instanceSet: instanceSet,
+ arvClient: arvClient,
+ instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
newExecutor: newExecutor,
bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
type Pool struct {
// configuration
logger logrus.FieldLogger
- instanceSet cloud.InstanceSet
+ arvClient *arvados.Client
+ instanceSet *throttledInstanceSet
newExecutor func(cloud.Instance) Executor
bootProbeCommand string
imageID cloud.ImageID
mtx sync.RWMutex
setupOnce sync.Once
+ throttleCreate throttle
+ throttleInstances throttle
+
mInstances prometheus.Gauge
+ mInstancesPrice prometheus.Gauge
mContainersRunning prometheus.Gauge
mVCPUs prometheus.Gauge
mVCPUsInuse prometheus.Gauge
mMemoryInuse prometheus.Gauge
}
-// Subscribe returns a channel that becomes ready whenever a worker's
-// state changes.
+// Subscribe returns a buffered channel that becomes ready after any
+// change to the pool's state that could have scheduling implications:
+// a worker's state changes, a new worker appears, the cloud
+// provider's API rate limiting period ends, etc.
+//
+// Additional events that occur while the channel is already ready
+// will be dropped, so it is OK if the caller services the channel
+// slowly.
//
// Example:
//
// ch := wp.Subscribe()
// defer wp.Unsubscribe(ch)
// for range ch {
-// // ...try scheduling some work...
+// tryScheduling(wp)
// if done {
// break
// }
}
// Unallocated returns the number of unallocated (creating + booting +
-// idle + unknown) workers for each instance type.
+// idle + unknown) workers for each instance type. Workers in
+// hold/drain mode are not included.
func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
wp.setupOnce.Do(wp.setup)
wp.mtx.RLock()
creating[it] = len(times)
}
for _, wkr := range wp.workers {
- if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) {
+ // Skip workers that are not expected to become
+ // available soon. Note len(wkr.running)>0 is not
+ // redundant here: it can be true even in
+ // StateUnknown.
+ if wkr.state == StateShutdown ||
+ wkr.state == StateRunning ||
+ wkr.idleBehavior != IdleBehaviorRun ||
+ len(wkr.running) > 0 {
continue
}
it := wkr.instType
// Create a new instance with the given type, and add it to the worker
// pool. The worker is added immediately; instance creation runs in
// the background.
-func (wp *Pool) Create(it arvados.InstanceType) error {
+//
+// Create returns false if a pre-existing error state prevents it from
+// even attempting to create a new instance. Those errors are logged
+// by the Pool, so the caller does not need to log anything in such
+// cases.
+func (wp *Pool) Create(it arvados.InstanceType) bool {
logger := wp.logger.WithField("InstanceType", it.Name)
wp.setupOnce.Do(wp.setup)
wp.mtx.Lock()
defer wp.mtx.Unlock()
- if time.Now().Before(wp.atQuotaUntil) {
- return wp.atQuotaErr
+ if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
+ return false
+ }
+ tags := cloud.InstanceTags{
+ tagKeyInstanceType: it.Name,
+ tagKeyIdleBehavior: string(IdleBehaviorRun),
}
- tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
now := time.Now()
wp.creating[it] = append(wp.creating[it], now)
go func() {
break
}
}
- if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
- wp.atQuotaErr = err
- wp.atQuotaUntil = time.Now().Add(time.Minute)
- }
if err != nil {
+ if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
+ wp.atQuotaErr = err
+ wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
+ time.AfterFunc(quotaErrorTTL, wp.notify)
+ }
logger.WithError(err).Error("create failed")
+ wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
return
}
wp.updateWorker(inst, it, StateBooting)
}()
- return nil
+ return true
}
// AtQuota returns true if Create is not expected to work at the
return time.Now().Before(wp.atQuotaUntil)
}
+// SetIdleBehavior determines how the indicated instance will behave
+// when it has no containers running.
+func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
+ wp.mtx.Lock()
+ defer wp.mtx.Unlock()
+ wkr, ok := wp.workers[id]
+ if !ok {
+ return errors.New("requested instance does not exist")
+ }
+ wkr.idleBehavior = idleBehavior
+ wkr.saveTags()
+ wkr.shutdownIfIdle()
+ return nil
+}
+
// Add or update worker attached to the given instance. Use
// initialState if a new worker is created.
//
if initialState == StateBooting && wkr.state == StateUnknown {
wkr.state = StateBooting
}
+ wkr.saveTags()
return wkr, false
}
- if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
- initialState = StateHold
+
+ // If an instance has a valid IdleBehavior tag when it first
+ // appears, initialize the new worker accordingly (this is how
+ // we restore IdleBehavior that was set by a prior dispatch
+ // process); otherwise, default to "run". After this,
+ // wkr.idleBehavior is the source of truth, and will only be
+ // changed via SetIdleBehavior().
+ idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
+ if !validIdleBehavior[idleBehavior] {
+ idleBehavior = IdleBehaviorRun
}
+
logger := wp.logger.WithFields(logrus.Fields{
"InstanceType": it.Name,
"Instance": inst,
})
- logger.WithField("State", initialState).Infof("instance appeared in cloud")
+ logger.WithFields(logrus.Fields{
+ "State": initialState,
+ "IdleBehavior": idleBehavior,
+ }).Infof("instance appeared in cloud")
now := time.Now()
wkr := &worker{
- mtx: &wp.mtx,
- wp: wp,
- logger: logger,
- executor: wp.newExecutor(inst),
- state: initialState,
- instance: inst,
- instType: it,
- appeared: now,
- probed: now,
- busy: now,
- updated: now,
- running: make(map[string]struct{}),
- starting: make(map[string]struct{}),
- probing: make(chan struct{}, 1),
+ mtx: &wp.mtx,
+ wp: wp,
+ logger: logger,
+ executor: wp.newExecutor(inst),
+ state: initialState,
+ idleBehavior: idleBehavior,
+ instance: inst,
+ instType: it,
+ appeared: now,
+ probed: now,
+ busy: now,
+ updated: now,
+ running: make(map[string]struct{}),
+ starting: make(map[string]struct{}),
+ probing: make(chan struct{}, 1),
}
wp.workers[id] = wkr
return wkr, true
// TODO: shutdown the worker with the longest idle
// time (Idle) or the earliest create time (Booting)
for _, wkr := range wp.workers {
- if wkr.state == tryState && wkr.instType == it {
+ if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
logger.WithField("Instance", wkr.instance).Info("shutting down")
wkr.shutdown()
return true
}
// Running returns the container UUIDs being prepared/run on workers.
+//
+// In the returned map, the time value indicates when the Pool
+// observed that the container process had exited. A container that
+// has not yet exited has a zero time value. The caller should use
+// KillContainer() to garbage-collect the entries for exited
+// containers.
func (wp *Pool) Running() map[string]time.Time {
wp.setupOnce.Do(wp.setup)
wp.mtx.Lock()
"Instance": wkr.instance,
})
logger.Debug("killing process")
- stdout, stderr, err := wkr.executor.Execute("crunch-run --kill 15 "+uuid, nil)
+ stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
if err != nil {
logger.WithFields(logrus.Fields{
"stderr": string(stderr),
Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.",
})
reg.MustRegister(wp.mInstances)
+ wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_price_total",
+ Help: "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
+ })
+ reg.MustRegister(wp.mInstancesPrice)
wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
wp.mtx.RLock()
defer wp.mtx.RUnlock()
+ var price float64
var alloc, cpu, cpuInuse, mem, memInuse int64
for _, wkr := range wp.workers {
+ price += wkr.instType.Price
cpu += int64(wkr.instType.VCPUs)
mem += int64(wkr.instType.RAM)
if len(wkr.running)+len(wkr.starting) == 0 {
memInuse += int64(wkr.instType.RAM)
}
wp.mInstances.Set(float64(len(wp.workers)))
+ wp.mInstancesPrice.Set(price)
wp.mContainersRunning.Set(float64(alloc))
wp.mVCPUs.Set(float64(cpu))
wp.mMemory.Set(float64(mem))
wp.mtx.Lock()
for _, w := range wp.workers {
r = append(r, InstanceView{
- Instance: w.instance.String(),
+ Instance: w.instance.ID(),
Price: w.instType.Price,
ArvadosInstanceType: w.instType.Name,
ProviderInstanceType: w.instType.ProviderType,
LastContainerUUID: w.lastUUID,
LastBusy: w.busy,
WorkerState: w.state.String(),
+ IdleBehavior: w.idleBehavior,
})
}
wp.mtx.Unlock()
sort.Slice(r, func(i, j int) bool {
- return strings.Compare(r[i].Instance, r[j].Instance) < 0
+ return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
})
return r
}
func (wp *Pool) getInstancesAndSync() error {
wp.setupOnce.Do(wp.setup)
+ if err := wp.instanceSet.throttleInstances.Error(); err != nil {
+ return err
+ }
wp.logger.Debug("getting instance list")
threshold := time.Now()
instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
if err != nil {
+ wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
return err
}
wp.sync(threshold, instances)
package worker
import (
- "io"
+ "sort"
+ "strings"
"time"
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- "github.com/sirupsen/logrus"
+ "github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
type PoolSuite struct{}
-func (suite *PoolSuite) SetUpSuite(c *check.C) {
- logrus.StandardLogger().SetLevel(logrus.DebugLevel)
-}
+func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
+ type1 := test.InstanceType(1)
+ type2 := test.InstanceType(2)
+ type3 := test.InstanceType(3)
+ waitForIdle := func(pool *Pool, notify <-chan struct{}) {
+ timeout := time.NewTimer(time.Second)
+ for {
+ instances := pool.Instances()
+ sort.Slice(instances, func(i, j int) bool {
+ return strings.Compare(instances[i].ArvadosInstanceType, instances[j].ArvadosInstanceType) < 0
+ })
+ if len(instances) == 3 &&
+ instances[0].ArvadosInstanceType == type1.Name &&
+ instances[0].WorkerState == StateIdle.String() &&
+ instances[1].ArvadosInstanceType == type1.Name &&
+ instances[1].WorkerState == StateIdle.String() &&
+ instances[2].ArvadosInstanceType == type2.Name &&
+ instances[2].WorkerState == StateIdle.String() {
+ return
+ }
+ select {
+ case <-timeout.C:
+ c.Logf("pool.Instances() == %#v", instances)
+ c.Error("timed out")
+ return
+ case <-notify:
+ }
+ }
+ }
-func (suite *PoolSuite) TestStartContainer(c *check.C) {
- // TODO: use an instanceSet stub with an SSH server
-}
+ logger := test.Logger()
+ driver := &test.StubDriver{}
+ is, err := driver.InstanceSet(nil, "", logger)
+ c.Assert(err, check.IsNil)
+
+ newExecutor := func(cloud.Instance) Executor {
+ return stubExecutor{
+ "crunch-run --list": stubResp{},
+ "true": stubResp{},
+ }
+ }
+
+ cluster := &arvados.Cluster{
+ Dispatch: arvados.Dispatch{
+ MaxProbesPerSecond: 1000,
+ ProbeInterval: arvados.Duration(time.Millisecond * 10),
+ },
+ CloudVMs: arvados.CloudVMs{
+ BootProbeCommand: "true",
+ SyncInterval: arvados.Duration(time.Millisecond * 10),
+ },
+ InstanceTypes: arvados.InstanceTypeMap{
+ type1.Name: type1,
+ type2.Name: type2,
+ type3.Name: type3,
+ },
+ }
-func (suite *PoolSuite) TestVerifyHostKey(c *check.C) {
- // TODO: use an instanceSet stub with an SSH server
+ pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
+ notify := pool.Subscribe()
+ defer pool.Unsubscribe(notify)
+ pool.Create(type1)
+ pool.Create(type1)
+ pool.Create(type2)
+ waitForIdle(pool, notify)
+ var heldInstanceID cloud.InstanceID
+ for _, inst := range pool.Instances() {
+ if inst.ArvadosInstanceType == type2.Name {
+ heldInstanceID = cloud.InstanceID(inst.Instance)
+ pool.SetIdleBehavior(heldInstanceID, IdleBehaviorHold)
+ }
+ }
+ pool.Stop()
+
+ c.Log("------- starting new pool, waiting to recover state")
+
+ pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
+ notify2 := pool2.Subscribe()
+ defer pool2.Unsubscribe(notify2)
+ waitForIdle(pool2, notify2)
+ for _, inst := range pool2.Instances() {
+ if inst.ArvadosInstanceType == type2.Name {
+ c.Check(inst.Instance, check.Equals, heldInstanceID)
+ c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorHold)
+ } else {
+ c.Check(inst.IdleBehavior, check.Equals, IdleBehaviorRun)
+ }
+ }
+ pool2.Stop()
}
func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
- lameInstanceSet := &test.LameInstanceSet{Hold: make(chan bool)}
+ logger := test.Logger()
+ driver := test.StubDriver{HoldCloudOps: true}
+ instanceSet, err := driver.InstanceSet(nil, "", logger)
+ c.Assert(err, check.IsNil)
+
type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
+ type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
pool := &Pool{
- logger: logrus.StandardLogger(),
- newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
- instanceSet: lameInstanceSet,
+ logger: logger,
+ newExecutor: func(cloud.Instance) Executor { return stubExecutor{} },
+ instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
instanceTypes: arvados.InstanceTypeMap{
type1.Name: type1,
type2.Name: type2,
+ type3.Name: type3,
},
}
notify := pool.Subscribe()
c.Check(pool.Unallocated()[type1], check.Equals, 0)
c.Check(pool.Unallocated()[type2], check.Equals, 0)
+ c.Check(pool.Unallocated()[type3], check.Equals, 0)
pool.Create(type2)
pool.Create(type1)
pool.Create(type2)
+ pool.Create(type3)
c.Check(pool.Unallocated()[type1], check.Equals, 1)
c.Check(pool.Unallocated()[type2], check.Equals, 2)
+ c.Check(pool.Unallocated()[type3], check.Equals, 1)
// Unblock the pending Create calls.
- go lameInstanceSet.Release(3)
+ go driver.ReleaseCloudOps(4)
// Wait for each instance to either return from its Create
// call, or show up in a poll.
suite.wait(c, pool, notify, func() bool {
pool.mtx.RLock()
defer pool.mtx.RUnlock()
- return len(pool.workers) == 3
+ return len(pool.workers) == 4
})
+ // Place type3 node on admin-hold
+ ivs := suite.instancesByType(pool, type3)
+ c.Assert(ivs, check.HasLen, 1)
+ type3instanceID := ivs[0].Instance
+ err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorHold)
+ c.Check(err, check.IsNil)
+
+ // Check admin-hold behavior: refuse to shutdown, and don't
+ // report as Unallocated ("available now or soon").
+ c.Check(pool.Shutdown(type3), check.Equals, false)
+ suite.wait(c, pool, notify, func() bool {
+ return pool.Unallocated()[type3] == 0
+ })
+ c.Check(suite.instancesByType(pool, type3), check.HasLen, 1)
+
+ // Shutdown both type2 nodes
c.Check(pool.Shutdown(type2), check.Equals, true)
suite.wait(c, pool, notify, func() bool {
return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
}
break
}
+
+ // Shutdown type1 node
c.Check(pool.Shutdown(type1), check.Equals, true)
suite.wait(c, pool, notify, func() bool {
- return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0
+ return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 && pool.Unallocated()[type3] == 0
})
select {
case <-notify2:
case <-time.After(time.Second):
c.Error("notify did not receive")
}
- go lameInstanceSet.Release(3) // unblock Destroy calls
+
+ // Put type3 node back in service.
+ err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorRun)
+ c.Check(err, check.IsNil)
+ suite.wait(c, pool, notify, func() bool {
+ return pool.Unallocated()[type3] == 1
+ })
+
+ // Check admin-drain behavior: shut down right away, and don't
+ // report as Unallocated.
+ err = pool.SetIdleBehavior(type3instanceID, IdleBehaviorDrain)
+ c.Check(err, check.IsNil)
+ suite.wait(c, pool, notify, func() bool {
+ return pool.Unallocated()[type3] == 0
+ })
+ suite.wait(c, pool, notify, func() bool {
+ ivs := suite.instancesByType(pool, type3)
+ return len(ivs) == 1 && ivs[0].WorkerState == StateShutdown.String()
+ })
+
+ // Unblock all pending Destroy calls. Pool calls Destroy again
+ // if a node still appears in the provider list after a
+ // previous attempt, so there might be more than 4 Destroy
+ // calls to unblock.
+ go driver.ReleaseCloudOps(4444)
+
+ // Sync until all instances disappear from the provider list.
+ suite.wait(c, pool, notify, func() bool {
+ pool.getInstancesAndSync()
+ return len(pool.Instances()) == 0
+ })
+}
+
+func (suite *PoolSuite) instancesByType(pool *Pool, it arvados.InstanceType) []InstanceView {
+ var ivs []InstanceView
+ for _, iv := range pool.Instances() {
+ if iv.ArvadosInstanceType == it.Name {
+ ivs = append(ivs, iv)
+ }
+ }
+ return ivs
}
func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
}
c.Check(ready(), check.Equals, true)
}
-
-type stubExecutor struct{}
-
-func (*stubExecutor) SetTarget(cloud.ExecutorTarget) {}
-
-func (*stubExecutor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
- return nil, nil, nil
-}
-
-func (*stubExecutor) Close() {}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+ "github.com/sirupsen/logrus"
+)
+
+type throttle struct {
+ err error
+ until time.Time
+ mtx sync.Mutex
+}
+
+// CheckRateLimitError checks whether the given error is a
+// cloud.RateLimitError, and if so, ensures Error() returns a non-nil
+// error until the rate limiting holdoff period expires.
+//
+// If a notify func is given, it will be called after the holdoff
+// period expires.
+func (thr *throttle) CheckRateLimitError(err error, logger logrus.FieldLogger, callType string, notify func()) {
+ rle, ok := err.(cloud.RateLimitError)
+ if !ok {
+ return
+ }
+ until := rle.EarliestRetry()
+ if !until.After(time.Now()) {
+ return
+ }
+ dur := until.Sub(time.Now())
+ logger.WithFields(logrus.Fields{
+ "CallType": callType,
+ "Duration": dur,
+ "ResumeAt": until,
+ }).Info("suspending remote calls due to rate-limit error")
+ thr.ErrorUntil(fmt.Errorf("remote calls are suspended for %s, until %s", dur, until), until, notify)
+}
+
+func (thr *throttle) ErrorUntil(err error, until time.Time, notify func()) {
+ thr.mtx.Lock()
+ defer thr.mtx.Unlock()
+ thr.err, thr.until = err, until
+ if notify != nil {
+ time.AfterFunc(until.Sub(time.Now()), notify)
+ }
+}
+
+func (thr *throttle) Error() error {
+ thr.mtx.Lock()
+ defer thr.mtx.Unlock()
+ if thr.err != nil && time.Now().After(thr.until) {
+ thr.err = nil
+ }
+ return thr.err
+}
+
+type throttledInstanceSet struct {
+ cloud.InstanceSet
+ throttleCreate throttle
+ throttleInstances throttle
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+ "errors"
+ "time"
+
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&ThrottleSuite{})
+
+type ThrottleSuite struct{}
+
+func (s *ThrottleSuite) TestRateLimitError(c *check.C) {
+ var t throttle
+ c.Check(t.Error(), check.IsNil)
+ t.ErrorUntil(errors.New("wait"), time.Now().Add(time.Second), nil)
+ c.Check(t.Error(), check.NotNil)
+ t.ErrorUntil(nil, time.Now(), nil)
+ c.Check(t.Error(), check.IsNil)
+
+ notified := false
+ t.ErrorUntil(errors.New("wait"), time.Now().Add(time.Millisecond), func() { notified = true })
+ c.Check(t.Error(), check.NotNil)
+ time.Sleep(time.Millisecond * 10)
+ c.Check(t.Error(), check.IsNil)
+ c.Check(notified, check.Equals, true)
+}
import (
"bytes"
+ "fmt"
"strings"
"sync"
"time"
"github.com/sirupsen/logrus"
)
+const (
+ // TODO: configurable
+ maxPingFailTime = 10 * time.Minute
+)
+
// State indicates whether a worker is available to do work, and (if
// not) whether/when it is expected to become ready.
type State int
StateIdle // instance booted, no containers are running
StateRunning // instance is running one or more containers
StateShutdown // worker has stopped monitoring the instance
- StateHold // running, but not available to run new containers
-)
-
-const (
- // TODO: configurable
- maxPingFailTime = 10 * time.Minute
)
var stateString = map[State]string{
StateIdle: "idle",
StateRunning: "running",
StateShutdown: "shutdown",
- StateHold: "hold",
}
// String implements fmt.Stringer.
return []byte(stateString[s]), nil
}
+// IdleBehavior indicates the behavior desired when a node becomes idle.
+type IdleBehavior string
+
+const (
+ IdleBehaviorRun IdleBehavior = "run" // run containers, or shutdown on idle timeout
+ IdleBehaviorHold IdleBehavior = "hold" // don't shutdown or run more containers
+ IdleBehaviorDrain IdleBehavior = "drain" // shutdown immediately when idle
+)
+
+var validIdleBehavior = map[IdleBehavior]bool{
+ IdleBehaviorRun: true,
+ IdleBehaviorHold: true,
+ IdleBehaviorDrain: true,
+}
+
type worker struct {
logger logrus.FieldLogger
executor Executor
wp *Pool
- mtx sync.Locker // must be wp's Locker.
- state State
- instance cloud.Instance
- instType arvados.InstanceType
- vcpus int64
- memory int64
- appeared time.Time
- probed time.Time
- updated time.Time
- busy time.Time
- destroyed time.Time
- lastUUID string
- running map[string]struct{} // remember to update state idle<->running when this changes
- starting map[string]struct{} // remember to update state idle<->running when this changes
- probing chan struct{}
+ mtx sync.Locker // must be wp's Locker.
+ state State
+ idleBehavior IdleBehavior
+ instance cloud.Instance
+ instType arvados.InstanceType
+ vcpus int64
+ memory int64
+ appeared time.Time
+ probed time.Time
+ updated time.Time
+ busy time.Time
+ destroyed time.Time
+ lastUUID string
+ running map[string]struct{} // remember to update state idle<->running when this changes
+ starting map[string]struct{} // remember to update state idle<->running when this changes
+ probing chan struct{}
}
// caller must have lock.
wkr.starting[ctr.UUID] = struct{}{}
wkr.state = StateRunning
go func() {
- stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
+ env := map[string]string{
+ "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
+ "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
+ }
+ stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
now := time.Now()
}
}
-// should be called in a new goroutine
+// probeAndUpdate calls probeBooted and/or probeRunning if needed, and
+// updates state accordingly.
+//
+// In StateUnknown: Call both probeBooted and probeRunning.
+// In StateBooting: Call probeBooted; if successful, call probeRunning.
+// In StateRunning: Call probeRunning.
+// In StateIdle: Call probeRunning.
+// In StateShutdown: Do nothing.
+//
+// If both probes succeed, wkr.state changes to
+// StateIdle/StateRunning.
+//
+// If probeRunning succeeds, wkr.running is updated. (This means
+// wkr.running might be non-empty even in StateUnknown, if the boot
+// probe failed.)
+//
+// probeAndUpdate should be called in a new goroutine.
func (wkr *worker) probeAndUpdate() {
wkr.mtx.Lock()
updated := wkr.updated
- needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
- needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
+ initialState := wkr.state
wkr.mtx.Unlock()
- if !needProbeBooted && !needProbeRunning {
- return
- }
var (
+ booted bool
ctrUUIDs []string
ok bool
- stderr []byte
+ stderr []byte // from probeBooted
)
- if needProbeBooted {
- ok, stderr = wkr.probeBooted()
- wkr.mtx.Lock()
- if ok || wkr.state == StateRunning || wkr.state == StateIdle {
- wkr.logger.Info("instance booted; will try probeRunning")
- needProbeRunning = true
+
+ switch initialState {
+ case StateShutdown:
+ return
+ case StateIdle, StateRunning:
+ booted = true
+ case StateUnknown, StateBooting:
+ default:
+ panic(fmt.Sprintf("unknown state %s", initialState))
+ }
+
+ probeStart := time.Now()
+ logger := wkr.logger.WithField("ProbeStart", probeStart)
+
+ if !booted {
+ booted, stderr = wkr.probeBooted()
+ if !booted {
+ // Pretend this probe succeeded if another
+ // concurrent attempt succeeded.
+ wkr.mtx.Lock()
+ booted = wkr.state == StateRunning || wkr.state == StateIdle
+ wkr.mtx.Unlock()
+ }
+ if booted {
+ logger.Info("instance booted; will try probeRunning")
}
- wkr.mtx.Unlock()
}
- if needProbeRunning {
- ctrUUIDs, ok, stderr = wkr.probeRunning()
+ if booted || wkr.state == StateUnknown {
+ ctrUUIDs, ok = wkr.probeRunning()
}
- logger := wkr.logger.WithField("stderr", string(stderr))
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
- if !ok {
+ if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
if wkr.state == StateShutdown && wkr.updated.After(updated) {
// Skip the logging noise if shutdown was
// initiated during probe.
return
}
- dur := time.Since(wkr.probed)
- logger := logger.WithFields(logrus.Fields{
- "Duration": dur,
- "State": wkr.state,
- })
- if wkr.state == StateBooting && !needProbeRunning {
- // If we know the instance has never passed a
- // boot probe, it's not noteworthy that it
- // hasn't passed this probe.
- logger.Debug("new instance not responding")
- } else {
- logger.Info("instance not responding")
+ // Using the start time of the probe as the timeout
+ // threshold ensures we always initiate at least one
+ // probe attempt after the boot/probe timeout expires
+ // (otherwise, a slow probe failure could cause us to
+ // shutdown an instance even though it did in fact
+ // boot/recover before the timeout expired).
+ dur := probeStart.Sub(wkr.probed)
+ if wkr.shutdownIfBroken(dur) {
+ // stderr from failed run-probes will have
+ // been logged already, but boot-probe
+ // failures are normal so they are logged only
+ // at Debug level. This is our chance to log
+ // some evidence about why the node never
+ // booted, even in non-debug mode.
+ if !booted {
+ logger.WithFields(logrus.Fields{
+ "Duration": dur,
+ "stderr": string(stderr),
+ }).Info("boot failed")
+ }
}
- wkr.shutdownIfBroken(dur)
return
}
// advantage of the non-busy state, though.
wkr.busy = updateTime
}
- running := map[string]struct{}{}
changed := false
+
+ // Build a new "running" map. Set changed=true if it differs
+ // from the existing map (wkr.running) to ensure the scheduler
+ // gets notified below.
+ running := map[string]struct{}{}
for _, uuid := range ctrUUIDs {
running[uuid] = struct{}{}
if _, ok := wkr.running[uuid]; !ok {
+ if _, ok := wkr.starting[uuid]; !ok {
+ // We didn't start it -- it must have
+ // been started by a previous
+ // dispatcher process.
+ logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected")
+ }
changed = true
}
}
changed = true
}
}
- if wkr.state == StateUnknown || wkr.state == StateBooting {
+
+ // Update state if this was the first successful boot-probe.
+ if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) {
+ // Note: this will change again below if
+ // len(wkr.starting)+len(wkr.running) > 0.
wkr.state = StateIdle
changed = true
}
- if changed {
- wkr.running = running
- if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
- wkr.state = StateRunning
- } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
- wkr.state = StateIdle
- }
- wkr.updated = updateTime
- go wkr.wp.notify()
+
+ // If wkr.state and wkr.running aren't changing then there's
+ // no need to log anything, notify the scheduler, move state
+ // back and forth between idle/running, etc.
+ if !changed {
+ return
+ }
+
+ // Log whenever a run-probe reveals crunch-run processes
+ // appearing/disappearing before boot-probe succeeds.
+ if wkr.state == StateUnknown && len(running) != len(wkr.running) {
+ logger.WithFields(logrus.Fields{
+ "RunningContainers": len(running),
+ "State": wkr.state,
+ }).Info("crunch-run probe succeeded, but boot probe is still failing")
+ }
+
+ wkr.running = running
+ if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
+ wkr.state = StateRunning
+ } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
+ wkr.state = StateIdle
}
+ wkr.updated = updateTime
+ if booted && (initialState == StateUnknown || initialState == StateBooting) {
+ logger.WithFields(logrus.Fields{
+ "RunningContainers": len(running),
+ "State": wkr.state,
+ }).Info("probes succeeded, instance is in service")
+ }
+ go wkr.wp.notify()
}
-func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
+func (wkr *worker) probeRunning() (running []string, ok bool) {
cmd := "crunch-run --list"
- stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+ stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
if err != nil {
wkr.logger.WithFields(logrus.Fields{
"Command": cmd,
"stdout": string(stdout),
"stderr": string(stderr),
}).WithError(err).Warn("probe failed")
- return nil, false, stderr
+ return nil, false
}
stdout = bytes.TrimRight(stdout, "\n")
if len(stdout) == 0 {
- return nil, true, stderr
+ return nil, true
}
- return strings.Split(string(stdout), "\n"), true, stderr
+ return strings.Split(string(stdout), "\n"), true
}
func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
if cmd == "" {
cmd = "true"
}
- stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+ stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
logger := wkr.logger.WithFields(logrus.Fields{
"Command": cmd,
"stdout": string(stdout),
}
// caller must have lock.
-func (wkr *worker) shutdownIfBroken(dur time.Duration) {
- if wkr.state == StateHold {
- return
+func (wkr *worker) shutdownIfBroken(dur time.Duration) bool {
+ if wkr.idleBehavior == IdleBehaviorHold {
+ // Never shut down.
+ return false
}
label, threshold := "", wkr.wp.timeoutProbe
- if wkr.state == StateBooting {
+ if wkr.state == StateUnknown || wkr.state == StateBooting {
label, threshold = "new ", wkr.wp.timeoutBooting
}
if dur < threshold {
- return
+ return false
}
wkr.logger.WithFields(logrus.Fields{
"Duration": dur,
"State": wkr.state,
}).Warnf("%sinstance unresponsive, shutting down", label)
wkr.shutdown()
+ return true
}
// caller must have lock.
func (wkr *worker) shutdownIfIdle() bool {
- if wkr.state != StateIdle {
+ if wkr.idleBehavior == IdleBehaviorHold {
+ // Never shut down.
return false
}
age := time.Since(wkr.busy)
- if age < wkr.wp.timeoutIdle {
+
+ old := age >= wkr.wp.timeoutIdle
+ draining := wkr.idleBehavior == IdleBehaviorDrain
+ shouldShutdown := ((old || draining) && wkr.state == StateIdle) ||
+ (draining && wkr.state == StateBooting)
+ if !shouldShutdown {
return false
}
- wkr.logger.WithField("Age", age).Info("shutdown idle worker")
+
+ wkr.logger.WithFields(logrus.Fields{
+ "State": wkr.state,
+ "Age": age,
+ "IdleBehavior": wkr.idleBehavior,
+ }).Info("shutdown idle worker")
wkr.shutdown()
return true
}
-// caller must have lock
+// caller must have lock.
func (wkr *worker) shutdown() {
now := time.Now()
wkr.updated = now
}
}()
}
+
+// Save worker tags to cloud provider metadata, if they don't already
+// match. Caller must have lock.
+func (wkr *worker) saveTags() {
+ instance := wkr.instance
+ have := instance.Tags()
+ want := cloud.InstanceTags{
+ tagKeyInstanceType: wkr.instType.Name,
+ tagKeyIdleBehavior: string(wkr.idleBehavior),
+ }
+ go func() {
+ for k, v := range want {
+ if v == have[k] {
+ continue
+ }
+ err := instance.SetTags(want)
+ if err != nil {
+ wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags")
+ }
+ break
+
+ }
+ }()
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+ "errors"
+ "io"
+ "time"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+ "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&WorkerSuite{})
+
+type WorkerSuite struct{}
+
+func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
+ logger := test.Logger()
+ bootTimeout := time.Minute
+ probeTimeout := time.Second
+
+ is, err := (&test.StubDriver{}).InstanceSet(nil, "", logger)
+ c.Assert(err, check.IsNil)
+ inst, err := is.Create(arvados.InstanceType{}, "", nil, nil)
+ c.Assert(err, check.IsNil)
+
+ type trialT struct {
+ testCaseComment string // displayed in test output to help identify failure case
+ age time.Duration
+ state State
+ running int
+ starting int
+ respBoot stubResp // zero value is success
+ respRun stubResp // zero value is success + nothing running
+ expectState State
+ expectRunning int
+ }
+
+ errFail := errors.New("failed")
+ respFail := stubResp{"", "command failed\n", errFail}
+ respContainerRunning := stubResp{"zzzzz-dz642-abcdefghijklmno\n", "", nil}
+ for _, trial := range []trialT{
+ {
+ testCaseComment: "Unknown, probes fail",
+ state: StateUnknown,
+ respBoot: respFail,
+ respRun: respFail,
+ expectState: StateUnknown,
+ },
+ {
+ testCaseComment: "Unknown, boot probe fails, but one container is running",
+ state: StateUnknown,
+ respBoot: respFail,
+ respRun: respContainerRunning,
+ expectState: StateUnknown,
+ expectRunning: 1,
+ },
+ {
+ testCaseComment: "Unknown, boot probe fails, previously running container has exited",
+ state: StateUnknown,
+ running: 1,
+ respBoot: respFail,
+ expectState: StateUnknown,
+ expectRunning: 0,
+ },
+ {
+ testCaseComment: "Unknown, boot timeout exceeded, boot probe fails",
+ state: StateUnknown,
+ age: bootTimeout + time.Second,
+ respBoot: respFail,
+ respRun: respFail,
+ expectState: StateShutdown,
+ },
+ {
+ testCaseComment: "Unknown, boot timeout exceeded, boot probe succeeds but crunch-run fails",
+ state: StateUnknown,
+ age: bootTimeout * 2,
+ respRun: respFail,
+ expectState: StateShutdown,
+ },
+ {
+ testCaseComment: "Unknown, boot timeout exceeded, boot probe fails but crunch-run succeeds",
+ state: StateUnknown,
+ age: bootTimeout * 2,
+ respBoot: respFail,
+ expectState: StateShutdown,
+ },
+ {
+ testCaseComment: "Unknown, boot timeout exceeded, boot probe fails but container is running",
+ state: StateUnknown,
+ age: bootTimeout * 2,
+ respBoot: respFail,
+ respRun: respContainerRunning,
+ expectState: StateUnknown,
+ expectRunning: 1,
+ },
+ {
+ testCaseComment: "Booting, boot probe fails, run probe fails",
+ state: StateBooting,
+ respBoot: respFail,
+ respRun: respFail,
+ expectState: StateBooting,
+ },
+ {
+ testCaseComment: "Booting, boot probe fails, run probe succeeds (but isn't expected to be called)",
+ state: StateBooting,
+ respBoot: respFail,
+ expectState: StateBooting,
+ },
+ {
+ testCaseComment: "Booting, boot probe succeeds, run probe fails",
+ state: StateBooting,
+ respRun: respFail,
+ expectState: StateBooting,
+ },
+ {
+ testCaseComment: "Booting, boot probe succeeds, run probe succeeds",
+ state: StateBooting,
+ expectState: StateIdle,
+ },
+ {
+ testCaseComment: "Booting, boot probe succeeds, run probe succeeds, container is running",
+ state: StateBooting,
+ respRun: respContainerRunning,
+ expectState: StateRunning,
+ expectRunning: 1,
+ },
+ {
+ testCaseComment: "Booting, boot timeout exceeded",
+ state: StateBooting,
+ age: bootTimeout * 2,
+ respRun: respFail,
+ expectState: StateShutdown,
+ },
+ {
+ testCaseComment: "Idle, probe timeout exceeded, one container running",
+ state: StateIdle,
+ age: probeTimeout * 2,
+ respRun: respContainerRunning,
+ expectState: StateRunning,
+ expectRunning: 1,
+ },
+ {
+ testCaseComment: "Idle, probe timeout exceeded, one container running, probe fails",
+ state: StateIdle,
+ age: probeTimeout * 2,
+ running: 1,
+ respRun: respFail,
+ expectState: StateShutdown,
+ expectRunning: 1,
+ },
+ {
+ testCaseComment: "Idle, probe timeout exceeded, nothing running, probe fails",
+ state: StateIdle,
+ age: probeTimeout * 2,
+ respRun: respFail,
+ expectState: StateShutdown,
+ },
+ {
+ testCaseComment: "Running, one container still running",
+ state: StateRunning,
+ running: 1,
+ respRun: respContainerRunning,
+ expectState: StateRunning,
+ expectRunning: 1,
+ },
+ {
+ testCaseComment: "Running, container has exited",
+ state: StateRunning,
+ running: 1,
+ expectState: StateIdle,
+ expectRunning: 0,
+ },
+ {
+ testCaseComment: "Running, probe timeout exceeded, nothing running, new container being started",
+ state: StateRunning,
+ age: probeTimeout * 2,
+ starting: 1,
+ expectState: StateRunning,
+ },
+ } {
+ c.Logf("------- %#v", trial)
+ ctime := time.Now().Add(-trial.age)
+ exr := stubExecutor{
+ "bootprobe": trial.respBoot,
+ "crunch-run --list": trial.respRun,
+ }
+ wp := &Pool{
+ newExecutor: func(cloud.Instance) Executor { return exr },
+ bootProbeCommand: "bootprobe",
+ timeoutBooting: bootTimeout,
+ timeoutProbe: probeTimeout,
+ exited: map[string]time.Time{},
+ }
+ wkr := &worker{
+ logger: logger,
+ executor: exr,
+ wp: wp,
+ mtx: &wp.mtx,
+ state: trial.state,
+ instance: inst,
+ appeared: ctime,
+ busy: ctime,
+ probed: ctime,
+ updated: ctime,
+ }
+ if trial.running > 0 {
+ wkr.running = map[string]struct{}{"zzzzz-dz642-abcdefghijklmno": struct{}{}}
+ }
+ if trial.starting > 0 {
+ wkr.starting = map[string]struct{}{"zzzzz-dz642-abcdefghijklmno": struct{}{}}
+ }
+ wkr.probeAndUpdate()
+ c.Check(wkr.state, check.Equals, trial.expectState)
+ c.Check(len(wkr.running), check.Equals, trial.expectRunning)
+ }
+}
+
+type stubResp struct {
+ stdout string
+ stderr string
+ err error
+}
+type stubExecutor map[string]stubResp
+
+func (se stubExecutor) SetTarget(cloud.ExecutorTarget) {}
+func (se stubExecutor) Close() {}
+func (se stubExecutor) Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error) {
+ resp, ok := se[cmd]
+ if !ok {
+ return nil, []byte("command not found\n"), errors.New("command not found")
+ }
+ return []byte(resp.stdout), []byte(resp.stderr), resp.err
+}
if err != nil {
return err
}
- if (method == "GET" || body != nil) && urlValues != nil {
- // FIXME: what if params don't fit in URL
+ if urlValues == nil {
+ // Nothing to send
+ } else if method == "GET" || method == "HEAD" || body != nil {
+ // Must send params in query part of URL (FIXME: what
+ // if resulting URL is too long?)
u, err := url.Parse(urlString)
if err != nil {
return err
}
u.RawQuery = urlValues.Encode()
urlString = u.String()
+ } else {
+ body = strings.NewReader(urlValues.Encode())
}
req, err := http.NewRequest(method, urlString, body)
if err != nil {
// and ready to run containers, e.g., "mount | grep
// /encrypted-tmp"
BootProbeCommand string
- SyncInterval Duration
+
+ // Listening port (name or number) of SSH servers on worker
+ // VMs
+ SSHPort string
+
+ SyncInterval Duration
// Maximum idle time before automatic shutdown
TimeoutIdle Duration
// Container is an arvados#container resource.
type Container struct {
- UUID string `json:"uuid"`
- CreatedAt time.Time `json:"created_at"`
- Command []string `json:"command"`
- ContainerImage string `json:"container_image"`
- Cwd string `json:"cwd"`
- Environment map[string]string `json:"environment"`
- LockedByUUID string `json:"locked_by_uuid"`
- Mounts map[string]Mount `json:"mounts"`
- Output string `json:"output"`
- OutputPath string `json:"output_path"`
- Priority int64 `json:"priority"`
- RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
- State ContainerState `json:"state"`
- SchedulingParameters SchedulingParameters `json:"scheduling_parameters"`
- ExitCode int `json:"exit_code"`
+ UUID string `json:"uuid"`
+ CreatedAt time.Time `json:"created_at"`
+ Command []string `json:"command"`
+ ContainerImage string `json:"container_image"`
+ Cwd string `json:"cwd"`
+ Environment map[string]string `json:"environment"`
+ LockedByUUID string `json:"locked_by_uuid"`
+ Mounts map[string]Mount `json:"mounts"`
+ Output string `json:"output"`
+ OutputPath string `json:"output_path"`
+ Priority int64 `json:"priority"`
+ RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
+ State ContainerState `json:"state"`
+ SchedulingParameters SchedulingParameters `json:"scheduling_parameters"`
+ ExitCode int `json:"exit_code"`
+ RuntimeStatus map[string]interface{} `json:"runtime_status"`
}
// Container is an arvados#container resource.
QueuedContainerRequestUUID = "zzzzz-xvhdp-cr4queuedcontnr"
QueuedContainerUUID = "zzzzz-dz642-queuedcontainer"
+ RunningContainerUUID = "zzzzz-dz642-runningcontainr"
+
+ CompletedContainerUUID = "zzzzz-dz642-compltcontainer"
+
ArvadosRepoUUID = "zzzzz-s0uqq-arvadosrepo0123"
ArvadosRepoName = "arvados"
FooRepoUUID = "zzzzz-s0uqq-382brsig8rp3666"
"revision": "2bb1b664bcff821e02b2a0644cd29c7e824d54f8",
"revisionTime": "2015-08-17T12:26:01Z"
},
+ {
+ "checksumSHA1": "X7g98YfLr+zM7aN76AZvAfpZyfk=",
+ "path": "github.com/julienschmidt/httprouter",
+ "revision": "adbc77eec0d91467376ca515bc3a14b8434d0f18",
+ "revisionTime": "2018-04-11T15:45:01Z"
+ },
{
"checksumSHA1": "oX6jFQD74oOApvDIhOzW2dXpg5Q=",
"path": "github.com/kevinburke/ssh_config",