1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
15 "git.arvados.org/arvados.git/sdk/go/arvados"
16 "git.arvados.org/arvados.git/sdk/go/httpserver"
19 //go:generate go run generate.go
21 // CollectionList is used as a template to auto-generate List()
22 // methods for other types; see generate.go.
24 func (conn *Conn) generated_CollectionList(ctx context.Context, options arvados.ListOptions) (arvados.CollectionList, error) {
25 if options.ClusterID != "" {
26 // explicitly selected cluster
27 options.ForwardedFor = conn.cluster.ClusterID + "-" + options.ForwardedFor
28 return conn.chooseBackend(options.ClusterID).CollectionList(ctx, options)
32 var merged arvados.CollectionList
33 var needSort atomic.Value
35 err := conn.splitListRequest(ctx, options, func(ctx context.Context, _ string, backend arvados.API, options arvados.ListOptions) ([]string, error) {
36 options.ForwardedFor = conn.cluster.ClusterID + "-" + options.ForwardedFor
37 cl, err := backend.CollectionList(ctx, options)
43 if len(merged.Items) == 0 {
45 } else if len(cl.Items) > 0 {
46 merged.Items = append(merged.Items, cl.Items...)
49 uuids := make([]string, 0, len(cl.Items))
50 for _, item := range cl.Items {
51 uuids = append(uuids, item.UUID)
55 if needSort.Load().(bool) {
56 // Apply the default/implied order, "modified_at desc"
57 sort.Slice(merged.Items, func(i, j int) bool {
58 mi, mj := merged.Items[i].ModifiedAt, merged.Items[j].ModifiedAt
62 if merged.Items == nil {
63 // Return empty results as [], not null
64 // (https://github.com/golang/go/issues/27589 might be
65 // a better solution in the future)
66 merged.Items = []arvados.Collection{}
71 // Call fn on one or more local/remote backends if opts indicates a
72 // federation-wide list query, i.e.:
74 // * There is at least one filter of the form
75 // ["uuid","in",[a,b,c,...]] or ["uuid","=",a]
77 // * One or more of the supplied UUIDs (a,b,c,...) has a non-local
80 // * There are no other filters
82 // (If opts doesn't indicate a federation-wide list query, fn is just
83 // called once with the local backend.)
85 // fn is called more than once only if the query meets the following
94 // * Each filter is either "uuid = ..." or "uuid in [...]".
96 // * The maximum possible response size (total number of objects that
97 // could potentially be matched by all of the specified filters)
98 // exceeds the local cluster's response page size limit.
100 // If the query involves multiple backends but doesn't meet these
101 // restrictions, an error is returned without calling fn.
103 // Thus, the caller can assume that either:
105 // * splitListRequest() returns an error, or
107 // * fn is called exactly once, or
109 // * fn is called more than once, with options that satisfy the above
112 // Each call to fn indicates a single (local or remote) backend and a
113 // corresponding options argument suitable for sending to that
115 func (conn *Conn) splitListRequest(ctx context.Context, opts arvados.ListOptions, fn func(context.Context, string, arvados.API, arvados.ListOptions) ([]string, error)) error {
117 if opts.BypassFederation || opts.ForwardedFor != "" {
118 // Client requested no federation. Pass through.
119 _, err := fn(ctx, conn.cluster.ClusterID, conn.local, opts)
124 var matchAllFilters map[string]bool
125 for _, f := range opts.Filters {
126 matchThisFilter := map[string]bool{}
127 if f.Attr != "uuid" {
131 if f.Operator == "=" {
132 if uuid, ok := f.Operand.(string); ok {
133 matchThisFilter[uuid] = true
135 return httpErrorf(http.StatusBadRequest, "invalid operand type %T for filter %q", f.Operand, f)
137 } else if f.Operator == "in" {
138 if operand, ok := f.Operand.([]interface{}); ok {
139 // skip any elements that aren't
140 // strings (thus can't match a UUID,
141 // thus can't affect the response).
142 for _, v := range operand {
143 if uuid, ok := v.(string); ok {
144 matchThisFilter[uuid] = true
147 } else if strings, ok := f.Operand.([]string); ok {
148 for _, uuid := range strings {
149 matchThisFilter[uuid] = true
152 return httpErrorf(http.StatusBadRequest, "invalid operand type %T in filter %q", f.Operand, f)
159 if matchAllFilters == nil {
160 matchAllFilters = matchThisFilter
162 // Reduce matchAllFilters to the intersection
163 // of matchAllFilters ∩ matchThisFilter.
164 for uuid := range matchAllFilters {
165 if !matchThisFilter[uuid] {
166 delete(matchAllFilters, uuid)
172 if matchAllFilters == nil {
173 // Not filtering by UUID at all; just query the local
175 _, err := fn(ctx, conn.cluster.ClusterID, conn.local, opts)
179 // Collate UUIDs in matchAllFilters by remote cluster ID --
180 // e.g., todoByRemote["aaaaa"]["aaaaa-4zz18-000000000000000"]
181 // will be true -- and count the total number of UUIDs we're
182 // filtering on, so we can compare it to our max page size
185 todoByRemote := map[string]map[string]bool{}
186 for uuid := range matchAllFilters {
188 // Cannot match anything, just drop it
190 if todoByRemote[uuid[:5]] == nil {
191 todoByRemote[uuid[:5]] = map[string]bool{}
193 todoByRemote[uuid[:5]][uuid] = true
198 if len(todoByRemote) == 0 {
201 if len(todoByRemote) == 1 && todoByRemote[conn.cluster.ClusterID] != nil {
202 // All UUIDs are local, so proxy a single request. The
203 // generic case has some limitations (see below) which
204 // we don't want to impose on local requests.
205 _, err := fn(ctx, conn.cluster.ClusterID, conn.local, opts)
209 return httpErrorf(http.StatusBadRequest, "cannot execute federated list query: each filter must be either 'uuid = ...' or 'uuid in [...]'")
211 if opts.Count != "none" {
212 return httpErrorf(http.StatusBadRequest, "cannot execute federated list query unless count==\"none\"")
214 if (opts.Limit >= 0 && opts.Limit < int64(nUUIDs)) || opts.Offset != 0 || len(opts.Order) > 0 {
215 return httpErrorf(http.StatusBadRequest, "cannot execute federated list query with limit (%d) < nUUIDs (%d), offset (%d) > 0, or order (%v) parameter", opts.Limit, nUUIDs, opts.Offset, opts.Order)
217 if max := conn.cluster.API.MaxItemsPerResponse; nUUIDs > max {
218 return httpErrorf(http.StatusBadRequest, "cannot execute federated list query because number of UUIDs (%d) exceeds page size limit %d", nUUIDs, max)
221 ctx, cancel := context.WithCancel(ctx)
223 errs := make(chan error, len(todoByRemote))
224 for clusterID, todo := range todoByRemote {
225 go func(clusterID string, todo map[string]bool) {
226 // This goroutine sends exactly one value to
228 batch := make([]string, 0, len(todo))
229 for uuid := range todo {
230 batch = append(batch, uuid)
233 var backend arvados.API
234 if clusterID == conn.cluster.ClusterID {
236 } else if backend = conn.remotes[clusterID]; backend == nil {
237 errs <- httpErrorf(http.StatusNotFound, "cannot execute federated list query: no proxy available for cluster %q", clusterID)
241 if remoteOpts.Select != nil {
242 // We always need to select UUIDs to
243 // use the response, even if our
245 remoteOpts.Select = append([]string{"uuid"}, remoteOpts.Select...)
248 if len(batch) > len(todo) {
249 // Reduce batch to just the todo's
251 for uuid := range todo {
252 batch = append(batch, uuid)
255 remoteOpts.Filters = []arvados.Filter{{"uuid", "in", batch}}
257 done, err := fn(ctx, clusterID, backend, remoteOpts)
259 errs <- httpErrorf(http.StatusBadGateway, "%s", err.Error())
263 for _, uuid := range done {
264 if _, ok := todo[uuid]; ok {
270 // Zero items == no more
271 // results exist, no need to
274 } else if !progress {
275 errs <- httpErrorf(http.StatusBadGateway, "cannot make progress in federated list query: cluster %q returned %d items but none had the requested UUIDs", clusterID, len(done))
283 // Wait for all goroutines to return, then return the first
284 // non-nil error, if any.
286 for range todoByRemote {
287 if err := <-errs; err != nil && firstErr == nil {
289 // Signal to any remaining fn() calls that
290 // further effort is futile.
297 func httpErrorf(code int, format string, args ...interface{}) error {
298 return httpserver.ErrorWithStatus(fmt.Errorf(format, args...), code)