X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b3a016e9a47d453b5ae4d287d8b6eaafd69971df..adbbe252483b5442707c1e2e03f5347f913e1125:/lib/controller/federation/list.go?ds=sidebyside diff --git a/lib/controller/federation/list.go b/lib/controller/federation/list.go index 5a171c9c37..039caac574 100644 --- a/lib/controller/federation/list.go +++ b/lib/controller/federation/list.go @@ -10,9 +10,10 @@ import ( "net/http" "sort" "sync" + "sync/atomic" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/httpserver" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/httpserver" ) //go:generate go run generate.go @@ -20,10 +21,13 @@ import ( // CollectionList is used as a template to auto-generate List() // methods for other types; see generate.go. -func (conn *Conn) CollectionList(ctx context.Context, options arvados.ListOptions) (arvados.CollectionList, error) { +func (conn *Conn) generated_CollectionList(ctx context.Context, options arvados.ListOptions) (arvados.CollectionList, error) { var mtx sync.Mutex var merged arvados.CollectionList + var needSort atomic.Value + needSort.Store(false) err := conn.splitListRequest(ctx, options, func(ctx context.Context, _ string, backend arvados.API, options arvados.ListOptions) ([]string, error) { + options.ForwardedFor = conn.cluster.ClusterID + "-" + options.ForwardedFor cl, err := backend.CollectionList(ctx, options) if err != nil { return nil, err @@ -32,8 +36,9 @@ func (conn *Conn) CollectionList(ctx context.Context, options arvados.ListOption defer mtx.Unlock() if len(merged.Items) == 0 { merged = cl - } else { + } else if len(cl.Items) > 0 { merged.Items = append(merged.Items, cl.Items...) + needSort.Store(true) } uuids := make([]string, 0, len(cl.Items)) for _, item := range cl.Items { @@ -41,7 +46,19 @@ func (conn *Conn) CollectionList(ctx context.Context, options arvados.ListOption } return uuids, nil }) - sort.Slice(merged.Items, func(i, j int) bool { return merged.Items[i].UUID < merged.Items[j].UUID }) + if needSort.Load().(bool) { + // Apply the default/implied order, "modified_at desc" + sort.Slice(merged.Items, func(i, j int) bool { + mi, mj := merged.Items[i].ModifiedAt, merged.Items[j].ModifiedAt + return mj.Before(mi) + }) + } + if merged.Items == nil { + // Return empty results as [], not null + // (https://github.com/golang/go/issues/27589 might be + // a better solution in the future) + merged.Items = []arvados.Collection{} + } return merged, err } @@ -68,8 +85,7 @@ func (conn *Conn) CollectionList(ctx context.Context, options arvados.ListOption // // * len(Order)==0 // -// * there are no filters other than the "uuid = ..." and "uuid in -// ..." filters mentioned above. +// * Each filter is either "uuid = ..." or "uuid in [...]". // // * The maximum possible response size (total number of objects that // could potentially be matched by all of the specified filters) @@ -91,6 +107,18 @@ func (conn *Conn) CollectionList(ctx context.Context, options arvados.ListOption // corresponding options argument suitable for sending to that // backend. func (conn *Conn) splitListRequest(ctx context.Context, opts arvados.ListOptions, fn func(context.Context, string, arvados.API, arvados.ListOptions) ([]string, error)) error { + + if opts.BypassFederation || opts.ForwardedFor != "" { + // Client requested no federation. Pass through. + _, err := fn(ctx, conn.cluster.ClusterID, conn.local, opts) + return err + } + if opts.ClusterID != "" { + // Client explicitly selected cluster + _, err := fn(ctx, conn.cluster.ClusterID, conn.chooseBackend(opts.ClusterID), opts) + return err + } + cannotSplit := false var matchAllFilters map[string]bool for _, f := range opts.Filters { @@ -130,7 +158,8 @@ func (conn *Conn) splitListRequest(ctx context.Context, opts arvados.ListOptions if matchAllFilters == nil { matchAllFilters = matchThisFilter } else { - // matchAllFilters = intersect(matchAllFilters, matchThisFilter) + // Reduce matchAllFilters to the intersection + // of matchAllFilters ∩ matchThisFilter. for uuid := range matchAllFilters { if !matchThisFilter[uuid] { delete(matchAllFilters, uuid) @@ -139,6 +168,18 @@ func (conn *Conn) splitListRequest(ctx context.Context, opts arvados.ListOptions } } + if matchAllFilters == nil { + // Not filtering by UUID at all; just query the local + // cluster. + _, err := fn(ctx, conn.cluster.ClusterID, conn.local, opts) + return err + } + + // Collate UUIDs in matchAllFilters by remote cluster ID -- + // e.g., todoByRemote["aaaaa"]["aaaaa-4zz18-000000000000000"] + // will be true -- and count the total number of UUIDs we're + // filtering on, so we can compare it to our max page size + // limit. nUUIDs := 0 todoByRemote := map[string]map[string]bool{} for uuid := range matchAllFilters { @@ -153,42 +194,41 @@ func (conn *Conn) splitListRequest(ctx context.Context, opts arvados.ListOptions } } - if len(todoByRemote) > 1 { - if cannotSplit { - return httpErrorf(http.StatusBadRequest, "cannot execute federated list query with filters other than 'uuid = ...' and 'uuid in [...]'") - } - if opts.Count != "none" { - return httpErrorf(http.StatusBadRequest, "cannot execute federated list query unless count==\"none\"") - } - if opts.Limit >= 0 || opts.Offset != 0 || len(opts.Order) > 0 { - return httpErrorf(http.StatusBadRequest, "cannot execute federated list query with limit, offset, or order parameter") - } - if max := conn.cluster.API.MaxItemsPerResponse; nUUIDs > max { - return httpErrorf(http.StatusBadRequest, "cannot execute federated list query because number of UUIDs (%d) exceeds page size limit %d", nUUIDs, max) - } - selectingUUID := false - for _, attr := range opts.Select { - if attr == "uuid" { - selectingUUID = true - } - } - if opts.Select != nil && !selectingUUID { - return httpErrorf(http.StatusBadRequest, "cannot execute federated list query with a select parameter that does not include uuid") - } + if len(todoByRemote) == 0 { + return nil + } + if len(todoByRemote) == 1 && todoByRemote[conn.cluster.ClusterID] != nil { + // All UUIDs are local, so proxy a single request. The + // generic case has some limitations (see below) which + // we don't want to impose on local requests. + _, err := fn(ctx, conn.cluster.ClusterID, conn.local, opts) + return err + } + if cannotSplit { + return httpErrorf(http.StatusBadRequest, "cannot execute federated list query: each filter must be either 'uuid = ...' or 'uuid in [...]'") + } + if opts.Count != "none" { + return httpErrorf(http.StatusBadRequest, "cannot execute federated list query unless count==\"none\"") + } + if (opts.Limit >= 0 && opts.Limit < int64(nUUIDs)) || opts.Offset != 0 || len(opts.Order) > 0 { + return httpErrorf(http.StatusBadRequest, "cannot execute federated list query with limit (%d) < nUUIDs (%d), offset (%d) > 0, or order (%v) parameter", opts.Limit, nUUIDs, opts.Offset, opts.Order) + } + if max := conn.cluster.API.MaxItemsPerResponse; nUUIDs > max { + return httpErrorf(http.StatusBadRequest, "cannot execute federated list query because number of UUIDs (%d) exceeds page size limit %d", nUUIDs, max) } ctx, cancel := context.WithCancel(ctx) defer cancel() errs := make(chan error, len(todoByRemote)) for clusterID, todo := range todoByRemote { - clusterID, todo := clusterID, todo - batch := make([]string, 0, len(todo)) - for uuid := range todo { - batch = append(batch, uuid) - } - go func() { + go func(clusterID string, todo map[string]bool) { // This goroutine sends exactly one value to // errs. + batch := make([]string, 0, len(todo)) + for uuid := range todo { + batch = append(batch, uuid) + } + var backend arvados.API if clusterID == conn.cluster.ClusterID { backend = conn.local @@ -197,6 +237,12 @@ func (conn *Conn) splitListRequest(ctx context.Context, opts arvados.ListOptions return } remoteOpts := opts + if remoteOpts.Select != nil { + // We always need to select UUIDs to + // use the response, even if our + // caller doesn't. + remoteOpts.Select = append([]string{"uuid"}, remoteOpts.Select...) + } for len(todo) > 0 { if len(batch) > len(todo) { // Reduce batch to just the todo's @@ -209,7 +255,7 @@ func (conn *Conn) splitListRequest(ctx context.Context, opts arvados.ListOptions done, err := fn(ctx, clusterID, backend, remoteOpts) if err != nil { - errs <- err + errs <- httpErrorf(http.StatusBadGateway, "%s", err.Error()) return } progress := false @@ -219,19 +265,24 @@ func (conn *Conn) splitListRequest(ctx context.Context, opts arvados.ListOptions delete(todo, uuid) } } - if !progress { - errs <- httpErrorf(http.StatusBadGateway, "cannot make progress in federated list query: cluster %q returned none of the requested UUIDs", clusterID) + if len(done) == 0 { + // Zero items == no more + // results exist, no need to + // get another page. + break + } else if !progress { + errs <- httpErrorf(http.StatusBadGateway, "cannot make progress in federated list query: cluster %q returned %d items but none had the requested UUIDs", clusterID, len(done)) return } } errs <- nil - }() + }(clusterID, todo) } // Wait for all goroutines to return, then return the first // non-nil error, if any. var firstErr error - for i := 0; i < len(todoByRemote); i++ { + for range todoByRemote { if err := <-errs; err != nil && firstErr == nil { firstErr = err // Signal to any remaining fn() calls that