X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/44c95f99098fa6c6acbfa82d4b6cbc6015eb6e39..70e5c7a3c6a5860d702d5e5c219dc0f3a3696d35:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index 356d087a46..4e25ba4f06 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -29,6 +29,9 @@ const ( type Dispatcher struct { Arv *arvadosclient.ArvadosClient + // Batch size for container queries + BatchSize int64 + // Queue polling frequency PollPeriod time.Duration @@ -72,39 +75,78 @@ func (d *Dispatcher) Run(ctx context.Context) error { poll := time.NewTicker(d.PollPeriod) defer poll.Stop() + if d.BatchSize == 0 { + d.BatchSize = 100 + } + for { - tracked := d.trackedUUIDs() - d.checkForUpdates([][]interface{}{ - {"uuid", "in", tracked}}) - d.checkForUpdates([][]interface{}{ - {"locked_by_uuid", "=", d.auth.UUID}, - {"uuid", "not in", tracked}}) - d.checkForUpdates([][]interface{}{ - {"state", "=", Queued}, - {"priority", ">", "0"}, - {"uuid", "not in", tracked}}) select { case <-poll.C: - continue + break case <-ctx.Done(): return ctx.Err() } - } -} -func (d *Dispatcher) trackedUUIDs() []string { - d.mtx.Lock() - defer d.mtx.Unlock() - if len(d.trackers) == 0 { - // API bug: ["uuid", "not in", []] does not work as - // expected, but this does: - return []string{"this-uuid-does-not-exist"} - } - uuids := make([]string, 0, len(d.trackers)) - for x := range d.trackers { - uuids = append(uuids, x) + todo := make(map[string]*runTracker) + d.mtx.Lock() + // Make a copy of trackers + for uuid, tracker := range d.trackers { + todo[uuid] = tracker + } + d.mtx.Unlock() + + // Containers I currently own (Locked/Running) + querySuccess := d.checkForUpdates([][]interface{}{ + {"locked_by_uuid", "=", d.auth.UUID}}, todo) + + // Containers I should try to dispatch + querySuccess = d.checkForUpdates([][]interface{}{ + {"state", "=", Queued}, + {"priority", ">", "0"}}, todo) && querySuccess + + if !querySuccess { + // There was an error in one of the previous queries, + // we probably didn't get updates for all the + // containers we should have. Don't check them + // individually because it may be expensive. + continue + } + + // Containers I know about but didn't fall into the + // above two categories (probably Complete/Cancelled) + var missed []string + for uuid := range todo { + missed = append(missed, uuid) + } + + for len(missed) > 0 { + var batch []string + if len(missed) > 20 { + batch = missed[0:20] + missed = missed[20:] + } else { + batch = missed + missed = missed[0:0] + } + querySuccess = d.checkForUpdates([][]interface{}{ + {"uuid", "in", batch}}, todo) && querySuccess + } + + if !querySuccess { + // There was an error in one of the previous queries, we probably + // didn't see all the containers we should have, so don't shut down + // the missed containers. + continue + } + + // Containers that I know about that didn't show up in any + // query should be let go. + for uuid, tracker := range todo { + log.Printf("Container %q not returned by any query, stopping tracking.", uuid) + tracker.close() + } + } - return uuids } // Start a runner in a new goroutine, and send the initial container @@ -114,7 +156,8 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker { tracker.updates <- c go func() { d.RunContainer(d, c, tracker.updates) - + // RunContainer blocks for the lifetime of the container. When + // it returns, the tracker should delete itself. d.mtx.Lock() delete(d.trackers, c.UUID) d.mtx.Unlock() @@ -122,25 +165,48 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker { return tracker } -func (d *Dispatcher) checkForUpdates(filters [][]interface{}) { +func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool { + var countList arvados.ContainerList params := arvadosclient.Dict{ "filters": filters, + "count": "exact", + "limit": 0, "order": []string{"priority desc"}} - - var list arvados.ContainerList - for offset, more := 0, true; more; offset += len(list.Items) { + err := d.Arv.List("containers", params, &countList) + if err != nil { + log.Printf("error getting count of containers: %q", err) + return false + } + itemsAvailable := countList.ItemsAvailable + params = arvadosclient.Dict{ + "filters": filters, + "count": "none", + "limit": d.BatchSize, + "order": []string{"priority desc"}} + offset := 0 + for { params["offset"] = offset + + // This list variable must be a new one declared + // inside the loop: otherwise, items in the API + // response would get deep-merged into the items + // loaded in previous iterations. + var list arvados.ContainerList + err := d.Arv.List("containers", params, &list) if err != nil { log.Printf("Error getting list of containers: %q", err) - return + return false + } + d.checkListForUpdates(list.Items, todo) + offset += len(list.Items) + if len(list.Items) == 0 || itemsAvailable <= offset { + return true } - more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset - d.checkListForUpdates(list.Items) } } -func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) { +func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) { d.mtx.Lock() defer d.mtx.Unlock() if d.trackers == nil { @@ -149,6 +215,8 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) { for _, c := range containers { tracker, alreadyTracking := d.trackers[c.UUID] + delete(todo, c.UUID) + if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID { log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID) } else if alreadyTracking {