13959: Merge branch 'master' into 13959-timeouts-and-logging
authorTom Clegg <tclegg@veritasgenetics.com>
Fri, 17 Aug 2018 14:38:39 +0000 (10:38 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Fri, 17 Aug 2018 14:38:39 +0000 (10:38 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

1  2 
sdk/go/dispatch/dispatch.go
services/crunch-dispatch-local/crunch-dispatch-local_test.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go

index 152207ea94bb4bb4c4e21de36ca2b440691109a2,4e25ba4f0603699569402d619127dd4b9fd99fb1..c8fb5aeb37e97eb9a43846301e2a07ea63e28ae7
@@@ -9,12 -9,12 +9,12 @@@ package dispatc
  import (
        "context"
        "fmt"
 -      "log"
        "sync"
        "time"
  
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 +      "github.com/Sirupsen/logrus"
  )
  
  const (
        Cancelled = arvados.ContainerStateCancelled
  )
  
 +type Logger interface {
 +      Printf(string, ...interface{})
 +      Warnf(string, ...interface{})
 +      Debugf(string, ...interface{})
 +}
 +
  // Dispatcher struct
  type Dispatcher struct {
        Arv *arvadosclient.ArvadosClient
  
 +      Logger Logger
 +
+       // Batch size for container queries
+       BatchSize int64
        // Queue polling frequency
        PollPeriod time.Duration
  
@@@ -70,10 -65,6 +73,10 @@@ type DispatchFunc func(*Dispatcher, arv
  // dispatcher's token. When a new one appears, Run calls RunContainer
  // in a new goroutine.
  func (d *Dispatcher) Run(ctx context.Context) error {
 +      if d.Logger == nil {
 +              d.Logger = logrus.StandardLogger()
 +      }
 +
        err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
        if err != nil {
                return fmt.Errorf("error getting my token UUID: %v", err)
        poll := time.NewTicker(d.PollPeriod)
        defer poll.Stop()
  
+       if d.BatchSize == 0 {
+               d.BatchSize = 100
+       }
        for {
                select {
                case <-poll.C:
                // Containers that I know about that didn't show up in any
                // query should be let go.
                for uuid, tracker := range todo {
 -                      log.Printf("Container %q not returned by any query, stopping tracking.", uuid)
 +                      d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid)
                        tracker.close()
                }
  
  // Start a runner in a new goroutine, and send the initial container
  // record to its updates channel.
  func (d *Dispatcher) start(c arvados.Container) *runTracker {
 -      tracker := &runTracker{updates: make(chan arvados.Container, 1)}
 +      tracker := &runTracker{
 +              updates: make(chan arvados.Container, 1),
 +              logger:  d.Logger,
 +      }
        tracker.updates <- c
        go func() {
                d.RunContainer(d, c, tracker.updates)
  }
  
  func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
+       var countList arvados.ContainerList
        params := arvadosclient.Dict{
                "filters": filters,
 -              log.Printf("error getting count of containers: %q", err)
+               "count":   "exact",
+               "limit":   0,
+               "order":   []string{"priority desc"}}
+       err := d.Arv.List("containers", params, &countList)
+       if err != nil {
++              d.Logger.Warnf("error getting count of containers: %q", err)
+               return false
+       }
+       itemsAvailable := countList.ItemsAvailable
+       params = arvadosclient.Dict{
+               "filters": filters,
+               "count":   "none",
+               "limit":   d.BatchSize,
                "order":   []string{"priority desc"}}
        offset := 0
        for {
  
                err := d.Arv.List("containers", params, &list)
                if err != nil {
 -                      log.Printf("Error getting list of containers: %q", err)
 +                      d.Logger.Warnf("error getting list of containers: %q", err)
                        return false
                }
                d.checkListForUpdates(list.Items, todo)
                offset += len(list.Items)
-               if len(list.Items) == 0 || list.ItemsAvailable <= offset {
+               if len(list.Items) == 0 || itemsAvailable <= offset {
                        return true
                }
        }
@@@ -212,7 -218,7 +233,7 @@@ func (d *Dispatcher) checkListForUpdate
                delete(todo, c.UUID)
  
                if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
 -                      log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
 +                      d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
                } else if alreadyTracking {
                        switch c.State {
                        case Queued:
                                }
                                err := d.lock(c.UUID)
                                if err != nil {
 -                                      log.Printf("debug: error locking container %s: %s", c.UUID, err)
 +                                      d.Logger.Warnf("error locking container %s: %s", c.UUID, err)
                                        break
                                }
                                c.State = Locked
@@@ -254,7 -260,7 +275,7 @@@ func (d *Dispatcher) UpdateState(uuid s
                        "container": arvadosclient.Dict{"state": state},
                }, nil)
        if err != nil {
 -              log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
 +              d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err)
        }
        return err
  }
@@@ -309,7 -315,6 +330,7 @@@ func (d *Dispatcher) TrackContainer(uui
  type runTracker struct {
        closing bool
        updates chan arvados.Container
 +      logger  Logger
  }
  
  func (tracker *runTracker) close() {
@@@ -325,7 -330,7 +346,7 @@@ func (tracker *runTracker) update(c arv
        }
        select {
        case <-tracker.updates:
 -              log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
 +              tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID)
        default:
        }
        tracker.updates <- c
index a1a6d5641e01b70ae92b31183218ad8ea28608d7,534de6916c52168465ee887c5600b0dab86c9000..c780c6bf2cf9b762e8cec5f9b401049cada75a87
@@@ -8,6 -8,7 +8,6 @@@ import 
        "bytes"
        "context"
        "io"
 -      "log"
        "net/http"
        "net/http/httptest"
        "os"
@@@ -20,7 -21,6 +20,7 @@@
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        "git.curoverse.com/arvados.git/sdk/go/dispatch"
 +      "github.com/Sirupsen/logrus"
        . "gopkg.in/check.v1"
  )
  
@@@ -110,7 -110,7 +110,7 @@@ func (s *MockArvadosServerSuite) Test_A
        apiStubResponses := make(map[string]arvadostest.StubResponse)
        apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
  
-       testWithServerStub(c, apiStubResponses, "echo", "error getting list of containers")
+       testWithServerStub(c, apiStubResponses, "echo", "error getting count of containers")
  }
  
  func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
@@@ -133,7 -133,7 +133,7 @@@ func (s *MockArvadosServerSuite) Test_C
                arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1, "locked_by_uuid": "` + arvadostest.Dispatch1AuthUUID + `"}`)}
  
        testWithServerStub(c, apiStubResponses, "echo",
 -              `After echo process termination, container state for Running is "zzzzz-dz642-xxxxxxxxxxxxxx2".  Updating it to "Cancelled"`)
 +              `after "echo" process termination, container state for zzzzz-dz642-xxxxxxxxxxxxxx2 is "Running"; updating it to "Cancelled"`)
  }
  
  func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
        apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx3/lock"] =
                arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3", "state":"Locked", "priority":1}`)}
  
 -      testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error starting nosuchcommand for zzzzz-dz642-xxxxxxxxxxxxxx3")
 +      testWithServerStub(c, apiStubResponses, "nosuchcommand", `error starting "nosuchcommand" for zzzzz-dz642-xxxxxxxxxxxxxx3`)
  }
  
  func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
        }
  
        buf := bytes.NewBuffer(nil)
 -      log.SetOutput(io.MultiWriter(buf, os.Stderr))
 -      defer log.SetOutput(os.Stderr)
 +      logrus.SetOutput(io.MultiWriter(buf, os.Stderr))
 +      defer logrus.SetOutput(os.Stderr)
  
        *crunchRunCommand = crunchCmd
  
        ctx, cancel := context.WithCancel(context.Background())
        dispatcher := dispatch.Dispatcher{
                Arv:        arv,
 -              PollPeriod: time.Duration(1) * time.Second,
 +              PollPeriod: time.Second / 20,
                RunContainer: func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
                        run(d, c, s)
                        cancel()
index b12be91c91e847fda0da2e76b8947abfac325100,36ef264963d760f04501fc25cee6916c62ef4bf2..ce0360261dab4aa3ab424d27c29c782e268b567f
@@@ -23,15 -23,9 +23,15 @@@ import 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/config"
        "git.curoverse.com/arvados.git/sdk/go/dispatch"
 +      "github.com/Sirupsen/logrus"
        "github.com/coreos/go-systemd/daemon"
  )
  
 +type logger interface {
 +      dispatch.Logger
 +      Fatalf(string, ...interface{})
 +}
 +
  const initialNiceValue int64 = 10000
  
  var (
@@@ -41,7 -35,6 +41,7 @@@
  
  type Dispatcher struct {
        *dispatch.Dispatcher
 +      logger  logrus.FieldLogger
        cluster *arvados.Cluster
        sqCheck *SqueueChecker
        slurm   Slurm
  
        // Minimum time between two attempts to run the same container
        MinRetryPeriod arvados.Duration
+       // Batch size for container queries
+       BatchSize int64
  }
  
  func main() {
 -      disp := &Dispatcher{}
 +      logger := logrus.StandardLogger()
 +      if os.Getenv("DEBUG") != "" {
 +              logger.SetLevel(logrus.DebugLevel)
 +      }
 +      logger.Formatter = &logrus.JSONFormatter{
 +              TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
 +      }
 +      disp := &Dispatcher{logger: logger}
        err := disp.Run(os.Args[0], os.Args[1:])
        if err != nil {
 -              log.Fatal(err)
 +              logrus.Fatalf("%s", err)
        }
  }
  
@@@ -115,7 -104,7 +118,7 @@@ func (disp *Dispatcher) configure(prog 
                return nil
        }
  
 -      log.Printf("crunch-dispatch-slurm %s started", version)
 +      disp.logger.Printf("crunch-dispatch-slurm %s started", version)
  
        err := disp.readConfig(*configPath)
        if err != nil {
                os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(disp.Client.KeepServiceURIs, " "))
                os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
        } else {
 -              log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
 +              disp.logger.Warnf("Client credentials missing from config, so falling back on environment variables (deprecated).")
        }
  
        if *dumpConfig {
  
        siteConfig, err := arvados.GetConfig(arvados.DefaultConfigFile)
        if os.IsNotExist(err) {
 -              log.Printf("warning: no cluster config (%s), proceeding with no node types defined", err)
 +              disp.logger.Warnf("no cluster config (%s), proceeding with no node types defined", err)
        } else if err != nil {
                return fmt.Errorf("error loading config: %s", err)
        } else if disp.cluster, err = siteConfig.GetCluster(""); err != nil {
  
  // setup() initializes private fields after configure().
  func (disp *Dispatcher) setup() {
 +      if disp.logger == nil {
 +              disp.logger = logrus.StandardLogger()
 +      }
        arv, err := arvadosclient.MakeArvadosClient()
        if err != nil {
 -              log.Fatalf("Error making Arvados client: %v", err)
 +              disp.logger.Fatalf("Error making Arvados client: %v", err)
        }
        arv.Retries = 25
  
        disp.slurm = &slurmCLI{}
        disp.sqCheck = &SqueueChecker{
 +              Logger:         disp.logger,
                Period:         time.Duration(disp.PollPeriod),
                PrioritySpread: disp.PrioritySpread,
                Slurm:          disp.slurm,
        }
        disp.Dispatcher = &dispatch.Dispatcher{
                Arv:            arv,
 +              Logger:         disp.logger,
+               BatchSize:      disp.BatchSize,
                RunContainer:   disp.runContainer,
                PollPeriod:     time.Duration(disp.PollPeriod),
                MinRetryPeriod: time.Duration(disp.MinRetryPeriod),
@@@ -340,7 -325,7 +344,7 @@@ func (disp *Dispatcher) runContainer(_ 
                case <-ctx.Done():
                        // Disappeared from squeue
                        if err := disp.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
 -                              log.Printf("Error getting final container state for %s: %s", ctr.UUID, err)
 +                              log.Printf("error getting final container state for %s: %s", ctr.UUID, err)
                        }
                        switch ctr.State {
                        case dispatch.Running:
index 9b858f331050b69eef3c79d07f3009bc50d6981b,33cad3af1f4341bd909e81502944e6cc4366d9f6..b76ece314d47806afcfb328ba12970b9171b58d5
@@@ -11,6 -11,7 +11,6 @@@ import 
        "fmt"
        "io"
        "io/ioutil"
 -      "log"
        "net/http"
        "net/http/httptest"
        "os"
@@@ -24,7 -25,6 +24,7 @@@
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        "git.curoverse.com/arvados.git/sdk/go/dispatch"
 +      "github.com/Sirupsen/logrus"
        . "gopkg.in/check.v1"
  )
  
@@@ -138,11 -138,7 +138,11 @@@ func (s *IntegrationSuite) integrationT
        }
  
        s.disp.slurm = &s.slurm
 -      s.disp.sqCheck = &SqueueChecker{Period: 500 * time.Millisecond, Slurm: s.disp.slurm}
 +      s.disp.sqCheck = &SqueueChecker{
 +              Logger: logrus.StandardLogger(),
 +              Period: 500 * time.Millisecond,
 +              Slurm:  s.disp.slurm,
 +      }
  
        err = s.disp.Dispatcher.Run(ctx)
        <-doneRun
@@@ -250,7 -246,7 +250,7 @@@ func (s *StubbedSuite) TestAPIErrorGett
        apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
        apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
  
-       s.testWithServerStub(c, apiStubResponses, "echo", "error getting list of containers")
+       s.testWithServerStub(c, apiStubResponses, "echo", "error getting count of containers")
  }
  
  func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
        }
  
        buf := bytes.NewBuffer(nil)
 -      log.SetOutput(io.MultiWriter(buf, os.Stderr))
 -      defer log.SetOutput(os.Stderr)
 +      logrus.SetOutput(io.MultiWriter(buf, os.Stderr))
 +      defer logrus.SetOutput(os.Stderr)
  
        s.disp.CrunchRunCommand = []string{crunchCmd}