13959: Merge branch 'master' into 13959-timeouts-and-logging
[arvados.git] / sdk / go / dispatch / dispatch.go
index 4e25ba4f0603699569402d619127dd4b9fd99fb1..c8fb5aeb37e97eb9a43846301e2a07ea63e28ae7 100644 (file)
@@ -9,12 +9,12 @@ package dispatch
 import (
        "context"
        "fmt"
-       "log"
        "sync"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "github.com/Sirupsen/logrus"
 )
 
 const (
@@ -25,10 +25,18 @@ const (
        Cancelled = arvados.ContainerStateCancelled
 )
 
+type Logger interface {
+       Printf(string, ...interface{})
+       Warnf(string, ...interface{})
+       Debugf(string, ...interface{})
+}
+
 // Dispatcher struct
 type Dispatcher struct {
        Arv *arvadosclient.ArvadosClient
 
+       Logger Logger
+
        // Batch size for container queries
        BatchSize int64
 
@@ -65,6 +73,10 @@ type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
 // dispatcher's token. When a new one appears, Run calls RunContainer
 // in a new goroutine.
 func (d *Dispatcher) Run(ctx context.Context) error {
+       if d.Logger == nil {
+               d.Logger = logrus.StandardLogger()
+       }
+
        err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
        if err != nil {
                return fmt.Errorf("error getting my token UUID: %v", err)
@@ -142,7 +154,7 @@ func (d *Dispatcher) Run(ctx context.Context) error {
                // Containers that I know about that didn't show up in any
                // query should be let go.
                for uuid, tracker := range todo {
-                       log.Printf("Container %q not returned by any query, stopping tracking.", uuid)
+                       d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid)
                        tracker.close()
                }
 
@@ -152,7 +164,10 @@ func (d *Dispatcher) Run(ctx context.Context) error {
 // Start a runner in a new goroutine, and send the initial container
 // record to its updates channel.
 func (d *Dispatcher) start(c arvados.Container) *runTracker {
-       tracker := &runTracker{updates: make(chan arvados.Container, 1)}
+       tracker := &runTracker{
+               updates: make(chan arvados.Container, 1),
+               logger:  d.Logger,
+       }
        tracker.updates <- c
        go func() {
                d.RunContainer(d, c, tracker.updates)
@@ -174,7 +189,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*r
                "order":   []string{"priority desc"}}
        err := d.Arv.List("containers", params, &countList)
        if err != nil {
-               log.Printf("error getting count of containers: %q", err)
+               d.Logger.Warnf("error getting count of containers: %q", err)
                return false
        }
        itemsAvailable := countList.ItemsAvailable
@@ -195,7 +210,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*r
 
                err := d.Arv.List("containers", params, &list)
                if err != nil {
-                       log.Printf("Error getting list of containers: %q", err)
+                       d.Logger.Warnf("error getting list of containers: %q", err)
                        return false
                }
                d.checkListForUpdates(list.Items, todo)
@@ -218,7 +233,7 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma
                delete(todo, c.UUID)
 
                if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
-                       log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
+                       d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
                } else if alreadyTracking {
                        switch c.State {
                        case Queued:
@@ -236,7 +251,7 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma
                                }
                                err := d.lock(c.UUID)
                                if err != nil {
-                                       log.Printf("debug: error locking container %s: %s", c.UUID, err)
+                                       d.Logger.Warnf("error locking container %s: %s", c.UUID, err)
                                        break
                                }
                                c.State = Locked
@@ -260,7 +275,7 @@ func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) erro
                        "container": arvadosclient.Dict{"state": state},
                }, nil)
        if err != nil {
-               log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
+               d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err)
        }
        return err
 }
@@ -315,6 +330,7 @@ func (d *Dispatcher) TrackContainer(uuid string) error {
 type runTracker struct {
        closing bool
        updates chan arvados.Container
+       logger  Logger
 }
 
 func (tracker *runTracker) close() {
@@ -330,7 +346,7 @@ func (tracker *runTracker) update(c arvados.Container) {
        }
        select {
        case <-tracker.updates:
-               log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
+               tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID)
        default:
        }
        tracker.updates <- c