13959: Use logrus for crunch-dispatch-slurm logging.
[arvados.git] / sdk / go / dispatch / dispatch.go
index e0dc2eefda5e52e014ab8e57d5839bdf176ea362..74cefed05794b7c242ebd8033fba1e8a5781547a 100644 (file)
@@ -9,12 +9,12 @@ package dispatch
 import (
        "context"
        "fmt"
-       "log"
        "sync"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "github.com/Sirupsen/logrus"
 )
 
 const (
@@ -25,10 +25,18 @@ const (
        Cancelled = arvados.ContainerStateCancelled
 )
 
+type Logger interface {
+       Printf(string, ...interface{})
+       Warnf(string, ...interface{})
+       Debugf(string, ...interface{})
+}
+
 // Dispatcher struct
 type Dispatcher struct {
        Arv *arvadosclient.ArvadosClient
 
+       Logger Logger
+
        // Queue polling frequency
        PollPeriod time.Duration
 
@@ -62,6 +70,10 @@ type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
 // dispatcher's token. When a new one appears, Run calls RunContainer
 // in a new goroutine.
 func (d *Dispatcher) Run(ctx context.Context) error {
+       if d.Logger == nil {
+               d.Logger = logrus.StandardLogger()
+       }
+
        err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
        if err != nil {
                return fmt.Errorf("error getting my token UUID: %v", err)
@@ -135,7 +147,7 @@ func (d *Dispatcher) Run(ctx context.Context) error {
                // Containers that I know about that didn't show up in any
                // query should be let go.
                for uuid, tracker := range todo {
-                       log.Printf("Container %q not returned by any query, stopping tracking.", uuid)
+                       d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid)
                        tracker.close()
                }
 
@@ -145,7 +157,10 @@ func (d *Dispatcher) Run(ctx context.Context) error {
 // Start a runner in a new goroutine, and send the initial container
 // record to its updates channel.
 func (d *Dispatcher) start(c arvados.Container) *runTracker {
-       tracker := &runTracker{updates: make(chan arvados.Container, 1)}
+       tracker := &runTracker{
+               updates: make(chan arvados.Container, 1),
+               logger:  d.Logger,
+       }
        tracker.updates <- c
        go func() {
                d.RunContainer(d, c, tracker.updates)
@@ -174,7 +189,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*r
 
                err := d.Arv.List("containers", params, &list)
                if err != nil {
-                       log.Printf("Error getting list of containers: %q", err)
+                       d.Logger.Warnf("error getting list of containers: %q", err)
                        return false
                }
                d.checkListForUpdates(list.Items, todo)
@@ -197,7 +212,7 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma
                delete(todo, c.UUID)
 
                if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
-                       log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
+                       d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
                } else if alreadyTracking {
                        switch c.State {
                        case Queued:
@@ -215,7 +230,7 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma
                                }
                                err := d.lock(c.UUID)
                                if err != nil {
-                                       log.Printf("debug: error locking container %s: %s", c.UUID, err)
+                                       d.Logger.Debugf("error locking container %s: %s", c.UUID, err)
                                        break
                                }
                                c.State = Locked
@@ -239,7 +254,7 @@ func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) erro
                        "container": arvadosclient.Dict{"state": state},
                }, nil)
        if err != nil {
-               log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
+               d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err)
        }
        return err
 }
@@ -294,6 +309,7 @@ func (d *Dispatcher) TrackContainer(uuid string) error {
 type runTracker struct {
        closing bool
        updates chan arvados.Container
+       logger  Logger
 }
 
 func (tracker *runTracker) close() {
@@ -309,7 +325,7 @@ func (tracker *runTracker) update(c arvados.Container) {
        }
        select {
        case <-tracker.updates:
-               log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
+               tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID)
        default:
        }
        tracker.updates <- c