13959: Merge branch 'master' into 13959-timeouts-and-logging
authorTom Clegg <tclegg@veritasgenetics.com>
Fri, 17 Aug 2018 14:38:39 +0000 (10:38 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Fri, 17 Aug 2018 14:38:39 +0000 (10:38 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

sdk/go/dispatch/dispatch.go
services/crunch-dispatch-local/crunch-dispatch-local_test.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
services/crunch-dispatch-slurm/usage.go

index 152207ea94bb4bb4c4e21de36ca2b440691109a2..c8fb5aeb37e97eb9a43846301e2a07ea63e28ae7 100644 (file)
@@ -37,6 +37,9 @@ type Dispatcher struct {
 
        Logger Logger
 
+       // Batch size for container queries
+       BatchSize int64
+
        // Queue polling frequency
        PollPeriod time.Duration
 
@@ -84,6 +87,10 @@ func (d *Dispatcher) Run(ctx context.Context) error {
        poll := time.NewTicker(d.PollPeriod)
        defer poll.Stop()
 
+       if d.BatchSize == 0 {
+               d.BatchSize = 100
+       }
+
        for {
                select {
                case <-poll.C:
@@ -174,8 +181,22 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
 }
 
 func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
+       var countList arvados.ContainerList
        params := arvadosclient.Dict{
                "filters": filters,
+               "count":   "exact",
+               "limit":   0,
+               "order":   []string{"priority desc"}}
+       err := d.Arv.List("containers", params, &countList)
+       if err != nil {
+               d.Logger.Warnf("error getting count of containers: %q", err)
+               return false
+       }
+       itemsAvailable := countList.ItemsAvailable
+       params = arvadosclient.Dict{
+               "filters": filters,
+               "count":   "none",
+               "limit":   d.BatchSize,
                "order":   []string{"priority desc"}}
        offset := 0
        for {
@@ -194,7 +215,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*r
                }
                d.checkListForUpdates(list.Items, todo)
                offset += len(list.Items)
-               if len(list.Items) == 0 || list.ItemsAvailable <= offset {
+               if len(list.Items) == 0 || itemsAvailable <= offset {
                        return true
                }
        }
index a1a6d5641e01b70ae92b31183218ad8ea28608d7..c780c6bf2cf9b762e8cec5f9b401049cada75a87 100644 (file)
@@ -110,7 +110,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
        apiStubResponses := make(map[string]arvadostest.StubResponse)
        apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
 
-       testWithServerStub(c, apiStubResponses, "echo", "error getting list of containers")
+       testWithServerStub(c, apiStubResponses, "echo", "error getting count of containers")
 }
 
 func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
index b12be91c91e847fda0da2e76b8947abfac325100..ce0360261dab4aa3ab424d27c29c782e268b567f 100644 (file)
@@ -64,6 +64,9 @@ type Dispatcher struct {
 
        // Minimum time between two attempts to run the same container
        MinRetryPeriod arvados.Duration
+
+       // Batch size for container queries
+       BatchSize int64
 }
 
 func main() {
@@ -183,6 +186,7 @@ func (disp *Dispatcher) setup() {
        disp.Dispatcher = &dispatch.Dispatcher{
                Arv:            arv,
                Logger:         disp.logger,
+               BatchSize:      disp.BatchSize,
                RunContainer:   disp.runContainer,
                PollPeriod:     time.Duration(disp.PollPeriod),
                MinRetryPeriod: time.Duration(disp.MinRetryPeriod),
index 9b858f331050b69eef3c79d07f3009bc50d6981b..b76ece314d47806afcfb328ba12970b9171b58d5 100644 (file)
@@ -250,7 +250,7 @@ func (s *StubbedSuite) TestAPIErrorGettingContainers(c *C) {
        apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
        apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
 
-       s.testWithServerStub(c, apiStubResponses, "echo", "error getting list of containers")
+       s.testWithServerStub(c, apiStubResponses, "echo", "error getting count of containers")
 }
 
 func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
index 032d86284d5e0a9fc8a3d712a0283597ec29d765..bcfa5b8a39ed7c8680a5ec1ffed6b583193f1caf 100644 (file)
@@ -22,6 +22,7 @@ var exampleConfigFile = []byte(`
        "PollPeriod": "10s",
        "SbatchArguments": ["--partition=foo", "--exclude=node13"],
        "ReserveExtraRAM": 268435456,
+       "BatchSize": 10000
     }`)
 
 func usage(fs *flag.FlagSet) {