package dispatch
import (
+ "bytes"
"context"
"fmt"
"sync"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "github.com/Sirupsen/logrus"
+ "git.arvados.org/arvados.git/lib/dispatchcloud"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "github.com/sirupsen/logrus"
)
const (
Logger Logger
+ // Batch size for container queries
+ BatchSize int
+
// Queue polling frequency
PollPeriod time.Duration
// running, and return.
//
// The DispatchFunc should not return until the container is finished.
-type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
+type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) error
// Run watches the API server's queue for containers that are either
// ready to run and available to lock, or are already locked by this
poll := time.NewTicker(d.PollPeriod)
defer poll.Stop()
+ if d.BatchSize == 0 {
+ d.BatchSize = 100
+ }
+
for {
select {
case <-poll.C:
}
tracker.updates <- c
go func() {
- d.RunContainer(d, c, tracker.updates)
- // RunContainer blocks for the lifetime of the container. When
- // it returns, the tracker should delete itself.
+ err := d.RunContainer(d, c, tracker.updates)
+ if err != nil {
+ text := fmt.Sprintf("Error running container %s: %s", c.UUID, err)
+ if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
+ var logBuf bytes.Buffer
+ fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err)
+ if len(err.AvailableTypes) == 0 {
+ fmt.Fprint(&logBuf, "No instance types are configured.\n")
+ } else {
+ fmt.Fprint(&logBuf, "Available instance types:\n")
+ for _, t := range err.AvailableTypes {
+ fmt.Fprintf(&logBuf,
+ "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
+ t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price)
+ }
+ }
+ text = logBuf.String()
+ d.UpdateState(c.UUID, Cancelled)
+ }
+ d.Logger.Printf("%s", text)
+ lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+ "object_uuid": c.UUID,
+ "event_type": "dispatch",
+ "properties": map[string]string{"text": text}}}
+ d.Arv.Create("logs", lr, nil)
+ d.Unlock(c.UUID)
+ }
+
d.mtx.Lock()
delete(d.trackers, c.UUID)
d.mtx.Unlock()
}
func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
+ var countList arvados.ContainerList
params := arvadosclient.Dict{
"filters": filters,
+ "count": "exact",
+ "limit": 0,
+ "order": []string{"priority desc"}}
+ err := d.Arv.List("containers", params, &countList)
+ if err != nil {
+ d.Logger.Warnf("error getting count of containers: %q", err)
+ return false
+ }
+ itemsAvailable := countList.ItemsAvailable
+ params = arvadosclient.Dict{
+ "filters": filters,
+ "count": "none",
+ "limit": d.BatchSize,
"order": []string{"priority desc"}}
offset := 0
for {
}
d.checkListForUpdates(list.Items, todo)
offset += len(list.Items)
- if len(list.Items) == 0 || list.ItemsAvailable <= offset {
+ if len(list.Items) == 0 || itemsAvailable <= offset {
return true
}
}
}
err := d.lock(c.UUID)
if err != nil {
- d.Logger.Debugf("error locking container %s: %s", c.UUID, err)
+ d.Logger.Warnf("error locking container %s: %s", c.UUID, err)
break
}
c.State = Locked