*next[upd.UUID] = upd
}
}
- selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "mounts", "scheduling_parameters"}
+ selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "mounts", "scheduling_parameters", "created_at"}
limitParam := 1000
mine, err := cq.fetchAll(arvados.ResourceListParams{
dontstart := map[arvados.InstanceType]bool{}
var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota
var containerAllocatedWorkerBootingCount int
- var longestWaitTimeCandidate, previousLongestWaitTimeCandidate float64
tryrun:
for i, ctr := range sorted {
if _, running := running[ctr.UUID]; running || ctr.Priority < 1 {
continue
}
- previousLongestWaitTimeCandidate = longestWaitTimeCandidate
- since := time.Since(ctr.CreatedAt).Seconds()
- if since > longestWaitTimeCandidate {
- longestWaitTimeCandidate = since
- }
switch ctr.State {
case arvados.ContainerStateQueued:
if unalloc[it] < 1 && sch.pool.AtQuota() {
logger.Info("not restarting yet: crunch-run process from previous attempt has not exited")
} else if sch.pool.StartContainer(it, ctr) {
// Success.
- longestWaitTimeCandidate = previousLongestWaitTimeCandidate
} else {
containerAllocatedWorkerBootingCount += 1
dontstart[it] = true
sch.mContainersAllocatedNotStarted.Set(float64(containerAllocatedWorkerBootingCount))
sch.mContainersNotAllocatedOverQuota.Set(float64(len(overquota)))
- sch.mLongestWaitTimeSinceQueue.Set(longestWaitTimeCandidate)
if len(overquota) > 0 {
// Unlock any containers that are unmappable while
}
sch := New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
sch.runQueue()
+ sch.updateMetrics()
c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 1)
c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 0)
pool = stubPool{}
sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
sch.runQueue()
+ sch.updateMetrics()
c.Check(int(testutil.ToFloat64(sch.mContainersAllocatedNotStarted)), check.Equals, 0)
c.Check(int(testutil.ToFloat64(sch.mContainersNotAllocatedOverQuota)), check.Equals, 1)
}
sch = New(ctx, &queue, &pool, nil, time.Millisecond, time.Millisecond)
sch.runQueue()
+ sch.updateMetrics()
c.Check(int(testutil.ToFloat64(sch.mLongestWaitTimeSinceQueue)), check.Equals, 0)
}
"sync"
"time"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
reg.MustRegister(sch.mLongestWaitTimeSinceQueue)
}
+func (sch *Scheduler) updateMetrics() {
+ earliest := time.Now()
+ entries, _ := sch.queue.Entries()
+ running := sch.pool.Running()
+ for _, ent := range entries {
+ if ent.Container.Priority > 0 &&
+ (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) {
+ // Exclude containers that are preparing to run the payload (i.e.
+ // ContainerStateLocked and running on a worker, most likely loading the
+ // payload image
+ if _, ok := running[ent.Container.UUID]; !ok {
+ if ent.Container.CreatedAt.Before(earliest) {
+ earliest = ent.Container.CreatedAt
+ }
+ }
+ }
+ }
+ if !earliest.IsZero() {
+ sch.mLongestWaitTimeSinceQueue.Set(time.Since(earliest).Seconds())
+ } else {
+ sch.mLongestWaitTimeSinceQueue.Set(0)
+ }
+}
+
// Start starts the scheduler.
func (sch *Scheduler) Start() {
go sch.runOnce.Do(sch.run)
for {
sch.runQueue()
sch.sync()
+ sch.updateMetrics()
select {
case <-sch.stop:
return
if wp.mDisappearances != nil {
wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
}
- if wp.mTimeFromShutdownToGone != nil {
+ // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
+ if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
}
delete(wp.workers, id)