X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e231fef37079916d0dc6babf93d669d474598ced..8d660b2b276475f1bd5b56614a28a33a358dd038:/lib/dispatchcloud/container/queue.go diff --git a/lib/dispatchcloud/container/queue.go b/lib/dispatchcloud/container/queue.go index bbe47625a8..4e807a12ab 100644 --- a/lib/dispatchcloud/container/queue.go +++ b/lib/dispatchcloud/container/queue.go @@ -5,6 +5,7 @@ package container import ( + "errors" "io" "sync" "time" @@ -398,32 +399,62 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) { } apply(avail) - var missing []string + missing := map[string]bool{} cq.mtx.Lock() for uuid, ent := range cq.current { if next[uuid] == nil && ent.Container.State != arvados.ContainerStateCancelled && ent.Container.State != arvados.ContainerStateComplete { - missing = append(missing, uuid) + missing[uuid] = true } } cq.mtx.Unlock() - for i, page := 0, 20; i < len(missing); i += page { - batch := missing[i:] - if len(batch) > page { - batch = batch[:page] + for len(missing) > 0 { + var batch []string + for uuid := range missing { + batch = append(batch, uuid) + if len(batch) == 20 { + break + } } + filters := []arvados.Filter{{"uuid", "in", batch}} ended, err := cq.fetchAll(arvados.ResourceListParams{ Select: selectParam, Order: "uuid", Count: "none", - Filters: []arvados.Filter{{"uuid", "in", batch}}, + Filters: filters, }) if err != nil { return nil, err } apply(ended) + if len(ended) == 0 { + // This is the only case where we can conclude + // a container has been deleted from the + // database. A short (but non-zero) page, on + // the other hand, can be caused by a response + // size limit. + for _, uuid := range batch { + cq.logger.WithField("ContainerUUID", uuid).Warn("container not found by controller (deleted?)") + delete(missing, uuid) + cq.mtx.Lock() + cq.delEnt(uuid, cq.current[uuid].Container.State) + cq.mtx.Unlock() + } + continue + } + for _, ctr := range ended { + if _, ok := missing[ctr.UUID]; !ok { + msg := "BUG? server response did not match requested filters, erroring out rather than risk deadlock" + cq.logger.WithFields(logrus.Fields{ + "ContainerUUID": ctr.UUID, + "Filters": filters, + }).Error(msg) + return nil, errors.New(msg) + } + delete(missing, ctr.UUID) + } } return next, nil }