import (
"context"
"fmt"
- "log"
"sync"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "github.com/sirupsen/logrus"
)
const (
Cancelled = arvados.ContainerStateCancelled
)
+type Logger interface {
+ Printf(string, ...interface{})
+ Warnf(string, ...interface{})
+ Debugf(string, ...interface{})
+}
+
// Dispatcher struct
type Dispatcher struct {
Arv *arvadosclient.ArvadosClient
+ Logger Logger
+
// Batch size for container queries
- BatchSize int64
+ BatchSize int
// Queue polling frequency
PollPeriod time.Duration
// dispatcher's token. When a new one appears, Run calls RunContainer
// in a new goroutine.
func (d *Dispatcher) Run(ctx context.Context) error {
+ if d.Logger == nil {
+ d.Logger = logrus.StandardLogger()
+ }
+
err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
if err != nil {
return fmt.Errorf("error getting my token UUID: %v", err)
// Containers that I know about that didn't show up in any
// query should be let go.
for uuid, tracker := range todo {
- log.Printf("Container %q not returned by any query, stopping tracking.", uuid)
+ d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid)
tracker.close()
}
// Start a runner in a new goroutine, and send the initial container
// record to its updates channel.
func (d *Dispatcher) start(c arvados.Container) *runTracker {
- tracker := &runTracker{updates: make(chan arvados.Container, 1)}
+ tracker := &runTracker{
+ updates: make(chan arvados.Container, 1),
+ logger: d.Logger,
+ }
tracker.updates <- c
go func() {
d.RunContainer(d, c, tracker.updates)
var countList arvados.ContainerList
params := arvadosclient.Dict{
"filters": filters,
- "count": "exact",
- "limit": 0,
+ "count": "exact",
+ "limit": 0,
"order": []string{"priority desc"}}
err := d.Arv.List("containers", params, &countList)
if err != nil {
- log.Printf("Error getting count of containers: %q", err)
+ d.Logger.Warnf("error getting count of containers: %q", err)
return false
}
- itemsAvailable := countList.ItemsAvailable
+ itemsAvailable := countList.ItemsAvailable
params = arvadosclient.Dict{
"filters": filters,
- "count": "none",
- "limit": d.BatchSize,
+ "count": "none",
+ "limit": d.BatchSize,
"order": []string{"priority desc"}}
offset := 0
for {
params["offset"] = offset
+
+ // This list variable must be a new one declared
+ // inside the loop: otherwise, items in the API
+ // response would get deep-merged into the items
+ // loaded in previous iterations.
var list arvados.ContainerList
+
err := d.Arv.List("containers", params, &list)
if err != nil {
- log.Printf("Error getting list of containers: %q", err)
+ d.Logger.Warnf("error getting list of containers: %q", err)
return false
}
d.checkListForUpdates(list.Items, todo)
delete(todo, c.UUID)
if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
- log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
+ d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
} else if alreadyTracking {
switch c.State {
case Queued:
}
err := d.lock(c.UUID)
if err != nil {
- log.Printf("debug: error locking container %s: %s", c.UUID, err)
+ d.Logger.Warnf("error locking container %s: %s", c.UUID, err)
break
}
c.State = Locked
"container": arvadosclient.Dict{"state": state},
}, nil)
if err != nil {
- log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
+ d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err)
}
return err
}
type runTracker struct {
closing bool
updates chan arvados.Container
+ logger Logger
}
func (tracker *runTracker) close() {
}
select {
case <-tracker.updates:
- log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
+ tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID)
default:
}
tracker.updates <- c