// cache up to date.
type Queue struct {
logger logrus.FieldLogger
- reg *prometheus.Registry
chooseType typeChooser
client APIClient
// Arvados cluster's queue during Update, chooseType will be called to
// assign an appropriate arvados.InstanceType for the queue entry.
func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
- return &Queue{
+ cq := &Queue{
logger: logger,
- reg: reg,
chooseType: chooseType,
client: client,
current: map[string]QueueEnt{},
subscribers: map[<-chan struct{}]chan struct{}{},
}
+ if reg != nil {
+ go cq.runMetrics(reg)
+ }
+ return cq
}
// Subscribe returns a channel that becomes ready to receive when an
}
return results, nil
}
+
+func (cq *Queue) runMetrics(reg *prometheus.Registry) {
+ mEntries := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "queue_entries",
+ Help: "Number of active container entries in the controller database.",
+ }, []string{"state", "instance_type"})
+ reg.MustRegister(mEntries)
+
+ type entKey struct {
+ state arvados.ContainerState
+ inst string
+ }
+ count := map[entKey]int{}
+
+ ch := cq.Subscribe()
+ defer cq.Unsubscribe(ch)
+ for range ch {
+ for k := range count {
+ count[k] = 0
+ }
+ ents, _ := cq.Entries()
+ for _, ent := range ents {
+ count[entKey{ent.Container.State, ent.InstanceType.Name}]++
+ }
+ for k, v := range count {
+ mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))
+ }
+ }
+}
scheduler.WorkerPool
Instances() []worker.InstanceView
SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
+ KillInstance(id cloud.InstanceID, reason string) error
Stop()
}
mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/drain", disp.apiInstanceDrain)
mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/run", disp.apiInstanceRun)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/kill", disp.apiInstanceKill)
metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
ErrorLog: disp.logger,
})
disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorRun)
}
+// Management API: shutdown/destroy specified instance now.
+func (disp *dispatcher) apiInstanceKill(w http.ResponseWriter, r *http.Request) {
+ id := cloud.InstanceID(r.FormValue("instance_id"))
+ if id == "" {
+ httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
+ return
+ }
+ err := disp.pool.KillInstance(id, "via management API: "+r.FormValue("reason"))
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusNotFound)
+ return
+ }
+}
+
func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
id := cloud.InstanceID(r.FormValue("instance_id"))
if id == "" {
s.cluster = &arvados.Cluster{
CloudVMs: arvados.CloudVMs{
- Driver: "test",
- SyncInterval: arvados.Duration(10 * time.Millisecond),
- TimeoutIdle: arvados.Duration(150 * time.Millisecond),
- TimeoutBooting: arvados.Duration(150 * time.Millisecond),
- TimeoutProbe: arvados.Duration(15 * time.Millisecond),
- TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
+ Driver: "test",
+ SyncInterval: arvados.Duration(10 * time.Millisecond),
+ TimeoutIdle: arvados.Duration(150 * time.Millisecond),
+ TimeoutBooting: arvados.Duration(150 * time.Millisecond),
+ TimeoutProbe: arvados.Duration(15 * time.Millisecond),
+ TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
+ MaxCloudOpsPerSecond: 500,
},
Dispatch: arvados.Dispatch{
PrivateKey: string(dispatchprivraw),
stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
case 1:
stubvm.CrunchRunMissing = true
+ case 2:
+ stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)) * time.Millisecond)
default:
stubvm.CrunchRunCrashRate = 0.1
}
import (
"fmt"
+ "time"
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/lib/cloud/azure"
"git.curoverse.com/arvados.git/lib/cloud/ec2"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/sirupsen/logrus"
+ "golang.org/x/crypto/ssh"
)
var drivers = map[string]cloud.Driver{
if !ok {
return nil, fmt.Errorf("unsupported cloud driver %q", cluster.CloudVMs.Driver)
}
- return driver.InstanceSet(cluster.CloudVMs.DriverParameters, setID, logger)
+ is, err := driver.InstanceSet(cluster.CloudVMs.DriverParameters, setID, logger)
+ if maxops := cluster.CloudVMs.MaxCloudOpsPerSecond; maxops > 0 {
+ is = &rateLimitedInstanceSet{
+ InstanceSet: is,
+ ticker: time.NewTicker(time.Second / time.Duration(maxops)),
+ }
+ }
+ return is, err
+}
+
+type rateLimitedInstanceSet struct {
+ cloud.InstanceSet
+ ticker *time.Ticker
+}
+
+func (is rateLimitedInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
+ <-is.ticker.C
+ inst, err := is.InstanceSet.Create(it, image, tags, init, pk)
+ return &rateLimitedInstance{inst, is.ticker}, err
+}
+
+type rateLimitedInstance struct {
+ cloud.Instance
+ ticker *time.Ticker
+}
+
+func (inst *rateLimitedInstance) Destroy() error {
+ <-inst.ticker.C
+ return inst.Instance.Destroy()
}
type StubVM struct {
Boot time.Time
Broken time.Time
+ ReportBroken time.Time
CrunchRunMissing bool
CrunchRunCrashRate float64
CrunchRunDetachDelay time.Duration
for uuid := range svm.running {
fmt.Fprintf(stdout, "%s\n", uuid)
}
+ if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
+ fmt.Fprintln(stdout, "broken")
+ }
return 0
}
if strings.HasPrefix(command, "crunch-run --kill ") {
return r
}
+// KillInstance destroys a cloud VM instance. It returns an error if
+// the given instance does not exist.
+func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+ wkr, ok := wp.workers[id]
+ if !ok {
+ return errors.New("instance not found")
+ }
+ wkr.logger.WithField("Reason", reason).Info("shutting down")
+ wkr.shutdown()
+ return nil
+}
+
func (wp *Pool) setup() {
wp.creating = map[string]createCall{}
wp.exited = map[string]time.Time{}
package worker
import (
- "bytes"
"fmt"
"strings"
"sync"
logger.Info("instance booted; will try probeRunning")
}
}
+ reportedBroken := false
if booted || wkr.state == StateUnknown {
- ctrUUIDs, ok = wkr.probeRunning()
+ ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
}
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
+ if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
+ logger.Info("probe reported broken instance")
+ wkr.setIdleBehavior(IdleBehaviorDrain)
+ }
if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
if wkr.state == StateShutdown && wkr.updated.After(updated) {
// Skip the logging noise if shutdown was
go wkr.wp.notify()
}
-func (wkr *worker) probeRunning() (running []string, ok bool) {
+func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
cmd := "crunch-run --list"
if u := wkr.instance.RemoteUser(); u != "root" {
cmd = "sudo " + cmd
"stdout": string(stdout),
"stderr": string(stderr),
}).WithError(err).Warn("probe failed")
- return nil, false
+ return
}
- stdout = bytes.TrimRight(stdout, "\n")
- if len(stdout) == 0 {
- return nil, true
+ ok = true
+ for _, s := range strings.Split(string(stdout), "\n") {
+ if s == "broken" {
+ reportsBroken = true
+ } else if s != "" {
+ running = append(running, s)
+ }
}
- return strings.Split(string(stdout), "\n"), true
+ return
}
func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
// Time after shutdown to retry shutdown
TimeoutShutdown Duration
+ // Maximum create/destroy-instance operations per second
+ MaxCloudOpsPerSecond int
+
ImageID string
Driver string
import (
"encoding/json"
+ "errors"
"fmt"
"log"
"os"
}
svcListCacheMtx.Unlock()
- return kc.loadKeepServers(<-cacheEnt.latest)
+ select {
+ case <-time.After(time.Minute):
+ return errors.New("timed out while getting initial list of keep services")
+ case sl := <-cacheEnt.latest:
+ return kc.loadKeepServers(sl)
+ }
}
func (kc *KeepClient) RefreshServiceDiscovery() {
lockdir = "/var/lock"
lockprefix = "crunch-run-"
locksuffix = ".lock"
+ brokenfile = "crunch-run-broken"
)
// procinfo is saved in each process's lockfile.
if info.IsDir() && path != walkdir {
return filepath.SkipDir
}
- if name := info.Name(); !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
+ if name := info.Name(); name == brokenfile {
+ fmt.Fprintln(stdout, "broken")
+ return nil
+ } else if !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
return nil
}
if info.Size() == 0 {
func (runner *ContainerRunner) runBrokenNodeHook() {
if *brokenNodeHook == "" {
- runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+ path := filepath.Join(lockdir, brokenfile)
+ runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
+ f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
+ if err != nil {
+ runner.CrunchLog.Printf("Error writing %s: %s", path, err)
+ return
+ }
+ f.Close()
} else {
runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
// run killme script
c.Check(api.CalledWith("container.state", "Queued"), NotNil)
c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
- c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*No broken node hook.*")
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
}
func (s *TestSuite) TestFullBrokenDocker3(c *C) {