X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e4fca76d5db9bd844530454894d07ddc729b4a9a..c1fa7d1c6840ce03e763191c92d3548da8494388:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index 4e25ba4f06..df43c2b10d 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -9,12 +9,12 @@ package dispatch import ( "context" "fmt" - "log" "sync" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" + "github.com/sirupsen/logrus" ) const ( @@ -25,12 +25,20 @@ const ( Cancelled = arvados.ContainerStateCancelled ) +type Logger interface { + Printf(string, ...interface{}) + Warnf(string, ...interface{}) + Debugf(string, ...interface{}) +} + // Dispatcher struct type Dispatcher struct { Arv *arvadosclient.ArvadosClient + Logger Logger + // Batch size for container queries - BatchSize int64 + BatchSize int // Queue polling frequency PollPeriod time.Duration @@ -65,6 +73,10 @@ type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) // dispatcher's token. When a new one appears, Run calls RunContainer // in a new goroutine. func (d *Dispatcher) Run(ctx context.Context) error { + if d.Logger == nil { + d.Logger = logrus.StandardLogger() + } + err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth) if err != nil { return fmt.Errorf("error getting my token UUID: %v", err) @@ -142,7 +154,7 @@ func (d *Dispatcher) Run(ctx context.Context) error { // Containers that I know about that didn't show up in any // query should be let go. for uuid, tracker := range todo { - log.Printf("Container %q not returned by any query, stopping tracking.", uuid) + d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid) tracker.close() } @@ -152,7 +164,10 @@ func (d *Dispatcher) Run(ctx context.Context) error { // Start a runner in a new goroutine, and send the initial container // record to its updates channel. func (d *Dispatcher) start(c arvados.Container) *runTracker { - tracker := &runTracker{updates: make(chan arvados.Container, 1)} + tracker := &runTracker{ + updates: make(chan arvados.Container, 1), + logger: d.Logger, + } tracker.updates <- c go func() { d.RunContainer(d, c, tracker.updates) @@ -174,7 +189,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*r "order": []string{"priority desc"}} err := d.Arv.List("containers", params, &countList) if err != nil { - log.Printf("error getting count of containers: %q", err) + d.Logger.Warnf("error getting count of containers: %q", err) return false } itemsAvailable := countList.ItemsAvailable @@ -195,7 +210,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*r err := d.Arv.List("containers", params, &list) if err != nil { - log.Printf("Error getting list of containers: %q", err) + d.Logger.Warnf("error getting list of containers: %q", err) return false } d.checkListForUpdates(list.Items, todo) @@ -218,7 +233,7 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma delete(todo, c.UUID) if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID { - log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID) + d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID) } else if alreadyTracking { switch c.State { case Queued: @@ -236,7 +251,7 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma } err := d.lock(c.UUID) if err != nil { - log.Printf("debug: error locking container %s: %s", c.UUID, err) + d.Logger.Warnf("error locking container %s: %s", c.UUID, err) break } c.State = Locked @@ -260,7 +275,7 @@ func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) erro "container": arvadosclient.Dict{"state": state}, }, nil) if err != nil { - log.Printf("Error updating container %s to state %q: %s", uuid, state, err) + d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err) } return err } @@ -315,6 +330,7 @@ func (d *Dispatcher) TrackContainer(uuid string) error { type runTracker struct { closing bool updates chan arvados.Container + logger Logger } func (tracker *runTracker) close() { @@ -330,7 +346,7 @@ func (tracker *runTracker) update(c arvados.Container) { } select { case <-tracker.updates: - log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID) + tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID) default: } tracker.updates <- c