Merge branch '13933-dispatch-batch-size'
authorTom Clegg <tclegg@veritasgenetics.com>
Thu, 16 Aug 2018 21:25:11 +0000 (17:25 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Thu, 16 Aug 2018 21:25:11 +0000 (17:25 -0400)
closes #13933

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

1  2 
sdk/go/dispatch/dispatch.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go

index e0dc2eefda5e52e014ab8e57d5839bdf176ea362,ba2cf289c8cb029b1c38369d833d3cc8dfde9cbb..4e25ba4f0603699569402d619127dd4b9fd99fb1
@@@ -29,6 -29,9 +29,9 @@@ const 
  type Dispatcher struct {
        Arv *arvadosclient.ArvadosClient
  
+       // Batch size for container queries
+       BatchSize int64
        // Queue polling frequency
        PollPeriod time.Duration
  
@@@ -72,6 -75,10 +75,10 @@@ func (d *Dispatcher) Run(ctx context.Co
        poll := time.NewTicker(d.PollPeriod)
        defer poll.Stop()
  
+       if d.BatchSize == 0 {
+               d.BatchSize = 100
+       }
        for {
                select {
                case <-poll.C:
@@@ -159,19 -166,27 +166,33 @@@ func (d *Dispatcher) start(c arvados.Co
  }
  
  func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
+       var countList arvados.ContainerList
        params := arvadosclient.Dict{
                "filters": filters,
+               "count":   "exact",
+               "limit":   0,
+               "order":   []string{"priority desc"}}
+       err := d.Arv.List("containers", params, &countList)
+       if err != nil {
+               log.Printf("error getting count of containers: %q", err)
+               return false
+       }
+       itemsAvailable := countList.ItemsAvailable
+       params = arvadosclient.Dict{
+               "filters": filters,
+               "count":   "none",
+               "limit":   d.BatchSize,
                "order":   []string{"priority desc"}}
        offset := 0
        for {
                params["offset"] = offset
 +
 +              // This list variable must be a new one declared
 +              // inside the loop: otherwise, items in the API
 +              // response would get deep-merged into the items
 +              // loaded in previous iterations.
                var list arvados.ContainerList
 +
                err := d.Arv.List("containers", params, &list)
                if err != nil {
                        log.Printf("Error getting list of containers: %q", err)
                }
                d.checkListForUpdates(list.Items, todo)
                offset += len(list.Items)
-               if len(list.Items) == 0 || list.ItemsAvailable <= offset {
+               if len(list.Items) == 0 || itemsAvailable <= offset {
                        return true
                }
        }
index b4103cc625a2badc3a3ab3f3d7458bac3f35e34e,c1009a5d8ed007810a517842d79067811abf03cf..36ef264963d760f04501fc25cee6916c62ef4bf2
@@@ -57,6 -57,9 +57,9 @@@ type Dispatcher struct 
  
        // Minimum time between two attempts to run the same container
        MinRetryPeriod arvados.Duration
+       // Batch size for container queries
+       BatchSize int64
  }
  
  func main() {
@@@ -164,6 -167,7 +167,7 @@@ func (disp *Dispatcher) setup() 
        }
        disp.Dispatcher = &dispatch.Dispatcher{
                Arv:            arv,
+               BatchSize:      disp.BatchSize,
                RunContainer:   disp.runContainer,
                PollPeriod:     time.Duration(disp.PollPeriod),
                MinRetryPeriod: time.Duration(disp.MinRetryPeriod),
@@@ -252,6 -256,9 +256,6 @@@ func (disp *Dispatcher) submit(containe
        crArgs = append(crArgs, container.UUID)
        crScript := strings.NewReader(execScript(crArgs))
  
 -      disp.sqCheck.L.Lock()
 -      defer disp.sqCheck.L.Unlock()
 -
        sbArgs, err := disp.sbatchArgs(container)
        if err != nil {
                return err
@@@ -352,7 -359,10 +356,7 @@@ func (disp *Dispatcher) runContainer(_ 
        }
  }
  func (disp *Dispatcher) scancel(ctr arvados.Container) {
 -      disp.sqCheck.L.Lock()
        err := disp.slurm.Cancel(ctr.UUID)
 -      disp.sqCheck.L.Unlock()
 -
        if err != nil {
                log.Printf("scancel: %s", err)
                time.Sleep(time.Second)
index 4ef4ba1d5d85a076a11dd0faf78c5b92d3641fcf,c0f2945a0597b0f072823e7b7ccdae96ee9e729c..33cad3af1f4341bd909e81502944e6cc4366d9f6
@@@ -116,7 -116,7 +116,7 @@@ func (s *IntegrationSuite) integrationT
        var containers arvados.ContainerList
        err = arv.List("containers", params, &containers)
        c.Check(err, IsNil)
 -      c.Check(len(containers.Items), Equals, 1)
 +      c.Assert(len(containers.Items), Equals, 1)
  
        s.disp.CrunchRunCommand = []string{"echo"}
  
@@@ -246,7 -246,7 +246,7 @@@ func (s *StubbedSuite) TestAPIErrorGett
        apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
        apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
  
-       s.testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
+       s.testWithServerStub(c, apiStubResponses, "echo", "error getting count of containers")
  }
  
  func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {