From 63bae17d784c2c1522a087d71a0fcb2a9b6eddcd Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Thu, 16 Aug 2018 15:38:42 -0400 Subject: [PATCH] 13959: Use logrus for crunch-dispatch-slurm logging. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- sdk/go/dispatch/dispatch.go | 32 +++++++++++++----- .../crunch-dispatch-slurm.go | 33 +++++++++++++++---- .../crunch-dispatch-slurm_test.go | 14 +++++--- services/crunch-dispatch-slurm/squeue.go | 12 +++---- services/crunch-dispatch-slurm/squeue_test.go | 5 +++ 5 files changed, 70 insertions(+), 26 deletions(-) diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index e0dc2eefda..74cefed057 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -9,12 +9,12 @@ package dispatch import ( "context" "fmt" - "log" "sync" "time" "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "github.com/Sirupsen/logrus" ) const ( @@ -25,10 +25,18 @@ const ( Cancelled = arvados.ContainerStateCancelled ) +type Logger interface { + Printf(string, ...interface{}) + Warnf(string, ...interface{}) + Debugf(string, ...interface{}) +} + // Dispatcher struct type Dispatcher struct { Arv *arvadosclient.ArvadosClient + Logger Logger + // Queue polling frequency PollPeriod time.Duration @@ -62,6 +70,10 @@ type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) // dispatcher's token. When a new one appears, Run calls RunContainer // in a new goroutine. func (d *Dispatcher) Run(ctx context.Context) error { + if d.Logger == nil { + d.Logger = logrus.StandardLogger() + } + err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth) if err != nil { return fmt.Errorf("error getting my token UUID: %v", err) @@ -135,7 +147,7 @@ func (d *Dispatcher) Run(ctx context.Context) error { // Containers that I know about that didn't show up in any // query should be let go. for uuid, tracker := range todo { - log.Printf("Container %q not returned by any query, stopping tracking.", uuid) + d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid) tracker.close() } @@ -145,7 +157,10 @@ func (d *Dispatcher) Run(ctx context.Context) error { // Start a runner in a new goroutine, and send the initial container // record to its updates channel. func (d *Dispatcher) start(c arvados.Container) *runTracker { - tracker := &runTracker{updates: make(chan arvados.Container, 1)} + tracker := &runTracker{ + updates: make(chan arvados.Container, 1), + logger: d.Logger, + } tracker.updates <- c go func() { d.RunContainer(d, c, tracker.updates) @@ -174,7 +189,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*r err := d.Arv.List("containers", params, &list) if err != nil { - log.Printf("Error getting list of containers: %q", err) + d.Logger.Warnf("error getting list of containers: %q", err) return false } d.checkListForUpdates(list.Items, todo) @@ -197,7 +212,7 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma delete(todo, c.UUID) if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID { - log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID) + d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID) } else if alreadyTracking { switch c.State { case Queued: @@ -215,7 +230,7 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma } err := d.lock(c.UUID) if err != nil { - log.Printf("debug: error locking container %s: %s", c.UUID, err) + d.Logger.Debugf("error locking container %s: %s", c.UUID, err) break } c.State = Locked @@ -239,7 +254,7 @@ func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) erro "container": arvadosclient.Dict{"state": state}, }, nil) if err != nil { - log.Printf("Error updating container %s to state %q: %s", uuid, state, err) + d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err) } return err } @@ -294,6 +309,7 @@ func (d *Dispatcher) TrackContainer(uuid string) error { type runTracker struct { closing bool updates chan arvados.Container + logger Logger } func (tracker *runTracker) close() { @@ -309,7 +325,7 @@ func (tracker *runTracker) update(c arvados.Container) { } select { case <-tracker.updates: - log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID) + tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID) default: } tracker.updates <- c diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go index b4103cc625..b12be91c91 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go @@ -23,9 +23,15 @@ import ( "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/config" "git.curoverse.com/arvados.git/sdk/go/dispatch" + "github.com/Sirupsen/logrus" "github.com/coreos/go-systemd/daemon" ) +type logger interface { + dispatch.Logger + Fatalf(string, ...interface{}) +} + const initialNiceValue int64 = 10000 var ( @@ -35,6 +41,7 @@ var ( type Dispatcher struct { *dispatch.Dispatcher + logger logrus.FieldLogger cluster *arvados.Cluster sqCheck *SqueueChecker slurm Slurm @@ -60,10 +67,17 @@ type Dispatcher struct { } func main() { - disp := &Dispatcher{} + logger := logrus.StandardLogger() + if os.Getenv("DEBUG") != "" { + logger.SetLevel(logrus.DebugLevel) + } + logger.Formatter = &logrus.JSONFormatter{ + TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00", + } + disp := &Dispatcher{logger: logger} err := disp.Run(os.Args[0], os.Args[1:]) if err != nil { - log.Fatal(err) + logrus.Fatalf("%s", err) } } @@ -101,7 +115,7 @@ func (disp *Dispatcher) configure(prog string, args []string) error { return nil } - log.Printf("crunch-dispatch-slurm %s started", version) + disp.logger.Printf("crunch-dispatch-slurm %s started", version) err := disp.readConfig(*configPath) if err != nil { @@ -129,7 +143,7 @@ func (disp *Dispatcher) configure(prog string, args []string) error { os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(disp.Client.KeepServiceURIs, " ")) os.Setenv("ARVADOS_EXTERNAL_CLIENT", "") } else { - log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).") + disp.logger.Warnf("Client credentials missing from config, so falling back on environment variables (deprecated).") } if *dumpConfig { @@ -138,7 +152,7 @@ func (disp *Dispatcher) configure(prog string, args []string) error { siteConfig, err := arvados.GetConfig(arvados.DefaultConfigFile) if os.IsNotExist(err) { - log.Printf("warning: no cluster config (%s), proceeding with no node types defined", err) + disp.logger.Warnf("no cluster config (%s), proceeding with no node types defined", err) } else if err != nil { return fmt.Errorf("error loading config: %s", err) } else if disp.cluster, err = siteConfig.GetCluster(""); err != nil { @@ -150,20 +164,25 @@ func (disp *Dispatcher) configure(prog string, args []string) error { // setup() initializes private fields after configure(). func (disp *Dispatcher) setup() { + if disp.logger == nil { + disp.logger = logrus.StandardLogger() + } arv, err := arvadosclient.MakeArvadosClient() if err != nil { - log.Fatalf("Error making Arvados client: %v", err) + disp.logger.Fatalf("Error making Arvados client: %v", err) } arv.Retries = 25 disp.slurm = &slurmCLI{} disp.sqCheck = &SqueueChecker{ + Logger: disp.logger, Period: time.Duration(disp.PollPeriod), PrioritySpread: disp.PrioritySpread, Slurm: disp.slurm, } disp.Dispatcher = &dispatch.Dispatcher{ Arv: arv, + Logger: disp.logger, RunContainer: disp.runContainer, PollPeriod: time.Duration(disp.PollPeriod), MinRetryPeriod: time.Duration(disp.MinRetryPeriod), @@ -321,7 +340,7 @@ func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain case <-ctx.Done(): // Disappeared from squeue if err := disp.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil { - log.Printf("Error getting final container state for %s: %s", ctr.UUID, err) + log.Printf("error getting final container state for %s: %s", ctr.UUID, err) } switch ctr.State { case dispatch.Running: diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go index 4ef4ba1d5d..9b858f3310 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go @@ -11,7 +11,6 @@ import ( "fmt" "io" "io/ioutil" - "log" "net/http" "net/http/httptest" "os" @@ -25,6 +24,7 @@ import ( "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/arvadostest" "git.curoverse.com/arvados.git/sdk/go/dispatch" + "github.com/Sirupsen/logrus" . "gopkg.in/check.v1" ) @@ -138,7 +138,11 @@ func (s *IntegrationSuite) integrationTest(c *C, } s.disp.slurm = &s.slurm - s.disp.sqCheck = &SqueueChecker{Period: 500 * time.Millisecond, Slurm: s.disp.slurm} + s.disp.sqCheck = &SqueueChecker{ + Logger: logrus.StandardLogger(), + Period: 500 * time.Millisecond, + Slurm: s.disp.slurm, + } err = s.disp.Dispatcher.Run(ctx) <-doneRun @@ -246,7 +250,7 @@ func (s *StubbedSuite) TestAPIErrorGettingContainers(c *C) { apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`} apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)} - s.testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers") + s.testWithServerStub(c, apiStubResponses, "echo", "error getting list of containers") } func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) { @@ -264,8 +268,8 @@ func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arva } buf := bytes.NewBuffer(nil) - log.SetOutput(io.MultiWriter(buf, os.Stderr)) - defer log.SetOutput(os.Stderr) + logrus.SetOutput(io.MultiWriter(buf, os.Stderr)) + defer logrus.SetOutput(os.Stderr) s.disp.CrunchRunCommand = []string{crunchCmd} diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go index 0ce4fb6732..5aee7e087b 100644 --- a/services/crunch-dispatch-slurm/squeue.go +++ b/services/crunch-dispatch-slurm/squeue.go @@ -7,7 +7,6 @@ package main import ( "bytes" "fmt" - "log" "sort" "strings" "sync" @@ -27,6 +26,7 @@ type slurmJob struct { // Squeue implements asynchronous polling monitor of the SLURM queue using the // command 'squeue'. type SqueueChecker struct { + Logger logger Period time.Duration PrioritySpread int64 Slurm Slurm @@ -121,7 +121,7 @@ func (sqc *SqueueChecker) reniceAll() { } err := sqc.Slurm.Renice(job.uuid, niceNew) if err != nil && niceNew > slurm15NiceLimit && strings.Contains(err.Error(), "Invalid nice value") { - log.Printf("container %q clamping nice values at %d, priority order will not be correct -- see https://dev.arvados.org/projects/arvados/wiki/SLURM_integration#Limited-nice-values-SLURM-15", job.uuid, slurm15NiceLimit) + sqc.Logger.Warnf("container %q clamping nice values at %d, priority order will not be correct -- see https://dev.arvados.org/projects/arvados/wiki/SLURM_integration#Limited-nice-values-SLURM-15", job.uuid, slurm15NiceLimit) job.hitNiceLimit = true } } @@ -143,7 +143,7 @@ func (sqc *SqueueChecker) check() { stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{} cmd.Stdout, cmd.Stderr = stdout, stderr if err := cmd.Run(); err != nil { - log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String()) + sqc.Logger.Warnf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String()) return } @@ -156,7 +156,7 @@ func (sqc *SqueueChecker) check() { var uuid, state, reason string var n, p int64 if _, err := fmt.Sscan(line, &uuid, &n, &p, &state, &reason); err != nil { - log.Printf("warning: ignoring unparsed line in squeue output: %q", line) + sqc.Logger.Warnf("ignoring unparsed line in squeue output: %q", line) continue } @@ -192,10 +192,10 @@ func (sqc *SqueueChecker) check() { // "launch failed requeued held" seems to be // another manifestation of this problem, // resolved the same way. - log.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason) + sqc.Logger.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason) sqc.Slurm.Release(uuid) } else if state != "RUNNING" && p <= 2*slurm15NiceLimit && replacing.wantPriority > 0 { - log.Printf("warning: job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason) + sqc.Logger.Warnf("job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason) } } sqc.lock.Lock() diff --git a/services/crunch-dispatch-slurm/squeue_test.go b/services/crunch-dispatch-slurm/squeue_test.go index ef036dabd7..de674a1397 100644 --- a/services/crunch-dispatch-slurm/squeue_test.go +++ b/services/crunch-dispatch-slurm/squeue_test.go @@ -7,6 +7,7 @@ package main import ( "time" + "github.com/Sirupsen/logrus" . "gopkg.in/check.v1" ) @@ -24,6 +25,7 @@ func (s *SqueueSuite) TestReleasePending(c *C) { queue: uuids[0] + " 10000 4294000000 PENDING Resources\n" + uuids[1] + " 10000 4294000111 PENDING Resources\n" + uuids[2] + " 10000 0 PENDING BadConstraints\n", } sqc := &SqueueChecker{ + Logger: logrus.StandardLogger(), Slurm: slurm, Period: time.Hour, } @@ -88,6 +90,7 @@ func (s *SqueueSuite) TestReniceAll(c *C) { queue: test.squeue, } sqc := &SqueueChecker{ + Logger: logrus.StandardLogger(), Slurm: slurm, PrioritySpread: test.spread, Period: time.Hour, @@ -112,6 +115,7 @@ func (s *SqueueSuite) TestReniceInvalidNiceValue(c *C) { rejectNice10K: true, } sqc := &SqueueChecker{ + Logger: logrus.StandardLogger(), Slurm: slurm, PrioritySpread: 1, Period: time.Hour, @@ -155,6 +159,7 @@ func (s *SqueueSuite) TestSetPriorityBeforeQueued(c *C) { slurm := &slurmFake{} sqc := &SqueueChecker{ + Logger: logrus.StandardLogger(), Slurm: slurm, Period: time.Hour, } -- 2.30.2