+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
// Package dispatch is a helper library for building Arvados container
// dispatchers.
package dispatch
import (
+ "bytes"
"context"
"fmt"
- "log"
"sync"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/lib/dispatchcloud"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "github.com/sirupsen/logrus"
)
const (
Cancelled = arvados.ContainerStateCancelled
)
+type Logger interface {
+ Printf(string, ...interface{})
+ Warnf(string, ...interface{})
+ Debugf(string, ...interface{})
+}
+
// Dispatcher struct
type Dispatcher struct {
Arv *arvadosclient.ArvadosClient
+ Logger Logger
+
+ // Batch size for container queries
+ BatchSize int
+
// Queue polling frequency
PollPeriod time.Duration
// running, and return.
//
// The DispatchFunc should not return until the container is finished.
-type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
+type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) error
// Run watches the API server's queue for containers that are either
// ready to run and available to lock, or are already locked by this
// dispatcher's token. When a new one appears, Run calls RunContainer
// in a new goroutine.
func (d *Dispatcher) Run(ctx context.Context) error {
+ if d.Logger == nil {
+ d.Logger = logrus.StandardLogger()
+ }
+
err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
if err != nil {
return fmt.Errorf("error getting my token UUID: %v", err)
poll := time.NewTicker(d.PollPeriod)
defer poll.Stop()
+ if d.BatchSize == 0 {
+ d.BatchSize = 100
+ }
+
for {
- tracked := d.trackedUUIDs()
- d.checkForUpdates([][]interface{}{
- {"uuid", "in", tracked}})
- d.checkForUpdates([][]interface{}{
- {"locked_by_uuid", "=", d.auth.UUID},
- {"uuid", "not in", tracked}})
- d.checkForUpdates([][]interface{}{
- {"state", "=", Queued},
- {"priority", ">", "0"},
- {"uuid", "not in", tracked}})
select {
case <-poll.C:
- continue
+ break
case <-ctx.Done():
+ d.mtx.Lock()
+ defer d.mtx.Unlock()
+ for _, tracker := range d.trackers {
+ tracker.close()
+ }
return ctx.Err()
}
- }
-}
-func (d *Dispatcher) trackedUUIDs() []string {
- d.mtx.Lock()
- defer d.mtx.Unlock()
- if len(d.trackers) == 0 {
- // API bug: ["uuid", "not in", []] does not work as
- // expected, but this does:
- return []string{"this-uuid-does-not-exist"}
- }
- uuids := make([]string, 0, len(d.trackers))
- for x := range d.trackers {
- uuids = append(uuids, x)
+ todo := make(map[string]*runTracker)
+ d.mtx.Lock()
+ // Make a copy of trackers
+ for uuid, tracker := range d.trackers {
+ todo[uuid] = tracker
+ }
+ d.mtx.Unlock()
+
+ // Containers I currently own (Locked/Running)
+ querySuccess := d.checkForUpdates([][]interface{}{
+ {"locked_by_uuid", "=", d.auth.UUID}}, todo)
+
+ // Containers I should try to dispatch
+ querySuccess = d.checkForUpdates([][]interface{}{
+ {"state", "=", Queued},
+ {"priority", ">", "0"}}, todo) && querySuccess
+
+ if !querySuccess {
+ // There was an error in one of the previous queries,
+ // we probably didn't get updates for all the
+ // containers we should have. Don't check them
+ // individually because it may be expensive.
+ continue
+ }
+
+ // Containers I know about but didn't fall into the
+ // above two categories (probably Complete/Cancelled)
+ var missed []string
+ for uuid := range todo {
+ missed = append(missed, uuid)
+ }
+
+ for len(missed) > 0 {
+ var batch []string
+ if len(missed) > 20 {
+ batch = missed[0:20]
+ missed = missed[20:]
+ } else {
+ batch = missed
+ missed = missed[0:0]
+ }
+ querySuccess = d.checkForUpdates([][]interface{}{
+ {"uuid", "in", batch}}, todo) && querySuccess
+ }
+
+ if !querySuccess {
+ // There was an error in one of the previous queries, we probably
+ // didn't see all the containers we should have, so don't shut down
+ // the missed containers.
+ continue
+ }
+
+ // Containers that I know about that didn't show up in any
+ // query should be let go.
+ for uuid, tracker := range todo {
+ d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid)
+ tracker.close()
+ }
+
}
- return uuids
}
// Start a runner in a new goroutine, and send the initial container
// record to its updates channel.
func (d *Dispatcher) start(c arvados.Container) *runTracker {
- tracker := &runTracker{updates: make(chan arvados.Container, 1)}
+ tracker := &runTracker{
+ updates: make(chan arvados.Container, 1),
+ logger: d.Logger,
+ }
tracker.updates <- c
go func() {
- d.RunContainer(d, c, tracker.updates)
-
- d.mtx.Lock()
- delete(d.trackers, c.UUID)
- d.mtx.Unlock()
+ fallbackState := Queued
+ err := d.RunContainer(d, c, tracker.updates)
+ if err != nil {
+ text := fmt.Sprintf("Error running container %s: %s", c.UUID, err)
+ if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
+ fallbackState = Cancelled
+ var logBuf bytes.Buffer
+ fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err)
+ if len(err.AvailableTypes) == 0 {
+ fmt.Fprint(&logBuf, "No instance types are configured.\n")
+ } else {
+ fmt.Fprint(&logBuf, "Available instance types:\n")
+ for _, t := range err.AvailableTypes {
+ fmt.Fprintf(&logBuf,
+ "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
+ t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price)
+ }
+ }
+ text = logBuf.String()
+ }
+ d.Logger.Printf("%s", text)
+ lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+ "object_uuid": c.UUID,
+ "event_type": "dispatch",
+ "properties": map[string]string{"text": text}}}
+ d.Arv.Create("logs", lr, nil)
+ }
+ // If checkListForUpdates() doesn't close the tracker
+ // after 2 queue updates, try to move the container to
+ // the fallback state, which should eventually work
+ // and cause the tracker to close.
+ updates := 0
+ for upd := range tracker.updates {
+ updates++
+ if upd.State == Locked || upd.State == Running {
+ // Tracker didn't clean up before
+ // returning -- or this is the first
+ // update and it contains stale
+ // information from before
+ // RunContainer() returned.
+ if updates < 2 {
+ // Avoid generating confusing
+ // logs / API calls in the
+ // stale-info case.
+ continue
+ }
+ d.Logger.Printf("container %s state is still %s, changing to %s", c.UUID, upd.State, fallbackState)
+ d.UpdateState(c.UUID, fallbackState)
+ }
+ }
}()
return tracker
}
-func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
+func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
+ var countList arvados.ContainerList
params := arvadosclient.Dict{
"filters": filters,
+ "count": "exact",
+ "limit": 0,
"order": []string{"priority desc"}}
-
- var list arvados.ContainerList
- for offset, more := 0, true; more; offset += len(list.Items) {
+ err := d.Arv.List("containers", params, &countList)
+ if err != nil {
+ d.Logger.Warnf("error getting count of containers: %q", err)
+ return false
+ }
+ itemsAvailable := countList.ItemsAvailable
+ params = arvadosclient.Dict{
+ "filters": filters,
+ "count": "none",
+ "limit": d.BatchSize,
+ "order": []string{"priority desc"}}
+ offset := 0
+ for {
params["offset"] = offset
+
+ // This list variable must be a new one declared
+ // inside the loop: otherwise, items in the API
+ // response would get deep-merged into the items
+ // loaded in previous iterations.
+ var list arvados.ContainerList
+
err := d.Arv.List("containers", params, &list)
if err != nil {
- log.Printf("Error getting list of containers: %q", err)
- return
+ d.Logger.Warnf("error getting list of containers: %q", err)
+ return false
+ }
+ d.checkListForUpdates(list.Items, todo)
+ offset += len(list.Items)
+ if len(list.Items) == 0 || itemsAvailable <= offset {
+ return true
}
- more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset
- d.checkListForUpdates(list.Items)
}
}
-func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
+func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
d.mtx.Lock()
defer d.mtx.Unlock()
if d.trackers == nil {
for _, c := range containers {
tracker, alreadyTracking := d.trackers[c.UUID]
+ delete(todo, c.UUID)
+
if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
- log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
+ d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
} else if alreadyTracking {
switch c.State {
- case Queued:
+ case Queued, Cancelled, Complete:
+ d.Logger.Debugf("update has %s in state %s, closing tracker", c.UUID, c.State)
tracker.close()
+ delete(d.trackers, c.UUID)
case Locked, Running:
+ d.Logger.Debugf("update has %s in state %s, updating tracker", c.UUID, c.State)
tracker.update(c)
- case Cancelled, Complete:
- tracker.close()
}
} else {
switch c.State {
}
err := d.lock(c.UUID)
if err != nil {
- log.Printf("debug: error locking container %s: %s", c.UUID, err)
+ d.Logger.Warnf("error locking container %s: %s", c.UUID, err)
break
}
c.State = Locked
"container": arvadosclient.Dict{"state": state},
}, nil)
if err != nil {
- log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
+ d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err)
}
return err
}
type runTracker struct {
closing bool
updates chan arvados.Container
+ logger Logger
}
func (tracker *runTracker) close() {
}
select {
case <-tracker.updates:
- log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
+ tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID)
default:
}
tracker.updates <- c