21891: Test copier with a large collection.
[arvados.git] / lib / controller / federation / list.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package federation
6
7 import (
8         "context"
9         "fmt"
10         "net/http"
11         "sort"
12         "sync"
13         "sync/atomic"
14
15         "git.arvados.org/arvados.git/sdk/go/arvados"
16         "git.arvados.org/arvados.git/sdk/go/httpserver"
17 )
18
19 //go:generate go run generate.go
20
21 // CollectionList is used as a template to auto-generate List()
22 // methods for other types; see generate.go.
23
24 func (conn *Conn) generated_CollectionList(ctx context.Context, options arvados.ListOptions) (arvados.CollectionList, error) {
25         var mtx sync.Mutex
26         var merged arvados.CollectionList
27         var needSort atomic.Value
28         needSort.Store(false)
29         err := conn.splitListRequest(ctx, options, func(ctx context.Context, _ string, backend arvados.API, options arvados.ListOptions) ([]string, error) {
30                 options.ForwardedFor = conn.cluster.ClusterID + "-" + options.ForwardedFor
31                 cl, err := backend.CollectionList(ctx, options)
32                 if err != nil {
33                         return nil, err
34                 }
35                 mtx.Lock()
36                 defer mtx.Unlock()
37                 if len(merged.Items) == 0 {
38                         merged = cl
39                 } else if len(cl.Items) > 0 {
40                         merged.Items = append(merged.Items, cl.Items...)
41                         needSort.Store(true)
42                 }
43                 uuids := make([]string, 0, len(cl.Items))
44                 for _, item := range cl.Items {
45                         uuids = append(uuids, item.UUID)
46                 }
47                 return uuids, nil
48         })
49         if needSort.Load().(bool) {
50                 // Apply the default/implied order, "modified_at desc"
51                 sort.Slice(merged.Items, func(i, j int) bool {
52                         mi, mj := merged.Items[i].ModifiedAt, merged.Items[j].ModifiedAt
53                         return mj.Before(mi)
54                 })
55         }
56         if merged.Items == nil {
57                 // Return empty results as [], not null
58                 // (https://github.com/golang/go/issues/27589 might be
59                 // a better solution in the future)
60                 merged.Items = []arvados.Collection{}
61         }
62         return merged, err
63 }
64
65 // Call fn on one or more local/remote backends if opts indicates a
66 // federation-wide list query, i.e.:
67 //
68 //   - There is at least one filter of the form
69 //     ["uuid","in",[a,b,c,...]] or ["uuid","=",a]
70 //
71 //   - One or more of the supplied UUIDs (a,b,c,...) has a non-local
72 //     prefix.
73 //
74 //   - There are no other filters
75 //
76 // (If opts doesn't indicate a federation-wide list query, fn is just
77 // called once with the local backend.)
78 //
79 // fn is called more than once only if the query meets the following
80 // restrictions:
81 //
82 //   - Count=="none"
83 //
84 //   - Limit<0
85 //
86 //   - len(Order)==0
87 //
88 //   - Each filter is either "uuid = ..." or "uuid in [...]".
89 //
90 //   - The maximum possible response size (total number of objects
91 //     that could potentially be matched by all of the specified
92 //     filters) exceeds the local cluster's response page size limit.
93 //
94 // If the query involves multiple backends but doesn't meet these
95 // restrictions, an error is returned without calling fn.
96 //
97 // Thus, the caller can assume that either:
98 //
99 //   - splitListRequest() returns an error, or
100 //
101 //   - fn is called exactly once, or
102 //
103 //   - fn is called more than once, with options that satisfy the above
104 //     restrictions.
105 //
106 // Each call to fn indicates a single (local or remote) backend and a
107 // corresponding options argument suitable for sending to that
108 // backend.
109 func (conn *Conn) splitListRequest(ctx context.Context, opts arvados.ListOptions, fn func(context.Context, string, arvados.API, arvados.ListOptions) ([]string, error)) error {
110
111         if opts.BypassFederation || opts.ForwardedFor != "" {
112                 // Client requested no federation.  Pass through.
113                 _, err := fn(ctx, conn.cluster.ClusterID, conn.local, opts)
114                 return err
115         }
116         if opts.ClusterID != "" {
117                 // Client explicitly selected cluster
118                 _, err := fn(ctx, conn.cluster.ClusterID, conn.chooseBackend(opts.ClusterID), opts)
119                 return err
120         }
121
122         cannotSplit := false
123         var matchAllFilters map[string]bool
124         for _, f := range opts.Filters {
125                 matchThisFilter := map[string]bool{}
126                 if f.Attr != "uuid" {
127                         cannotSplit = true
128                         continue
129                 }
130                 if f.Operator == "=" {
131                         if uuid, ok := f.Operand.(string); ok {
132                                 matchThisFilter[uuid] = true
133                         } else {
134                                 return httpErrorf(http.StatusBadRequest, "invalid operand type %T for filter %q", f.Operand, f)
135                         }
136                 } else if f.Operator == "in" {
137                         if operand, ok := f.Operand.([]interface{}); ok {
138                                 // skip any elements that aren't
139                                 // strings (thus can't match a UUID,
140                                 // thus can't affect the response).
141                                 for _, v := range operand {
142                                         if uuid, ok := v.(string); ok {
143                                                 matchThisFilter[uuid] = true
144                                         }
145                                 }
146                         } else if strings, ok := f.Operand.([]string); ok {
147                                 for _, uuid := range strings {
148                                         matchThisFilter[uuid] = true
149                                 }
150                         } else {
151                                 return httpErrorf(http.StatusBadRequest, "invalid operand type %T in filter %q", f.Operand, f)
152                         }
153                 } else {
154                         cannotSplit = true
155                         continue
156                 }
157
158                 if matchAllFilters == nil {
159                         matchAllFilters = matchThisFilter
160                 } else {
161                         // Reduce matchAllFilters to the intersection
162                         // of matchAllFilters ∩ matchThisFilter.
163                         for uuid := range matchAllFilters {
164                                 if !matchThisFilter[uuid] {
165                                         delete(matchAllFilters, uuid)
166                                 }
167                         }
168                 }
169         }
170
171         if matchAllFilters == nil {
172                 // Not filtering by UUID at all; just query the local
173                 // cluster.
174                 _, err := fn(ctx, conn.cluster.ClusterID, conn.local, opts)
175                 return err
176         }
177
178         // Collate UUIDs in matchAllFilters by remote cluster ID --
179         // e.g., todoByRemote["aaaaa"]["aaaaa-4zz18-000000000000000"]
180         // will be true -- and count the total number of UUIDs we're
181         // filtering on, so we can compare it to our max page size
182         // limit.
183         nUUIDs := 0
184         todoByRemote := map[string]map[string]bool{}
185         for uuid := range matchAllFilters {
186                 if len(uuid) != 27 {
187                         // Cannot match anything, just drop it
188                 } else {
189                         if todoByRemote[uuid[:5]] == nil {
190                                 todoByRemote[uuid[:5]] = map[string]bool{}
191                         }
192                         todoByRemote[uuid[:5]][uuid] = true
193                         nUUIDs++
194                 }
195         }
196
197         if len(todoByRemote) == 0 {
198                 return nil
199         }
200         if len(todoByRemote) == 1 && todoByRemote[conn.cluster.ClusterID] != nil {
201                 // All UUIDs are local, so proxy a single request. The
202                 // generic case has some limitations (see below) which
203                 // we don't want to impose on local requests.
204                 _, err := fn(ctx, conn.cluster.ClusterID, conn.local, opts)
205                 return err
206         }
207         if cannotSplit {
208                 return httpErrorf(http.StatusBadRequest, "cannot execute federated list query: each filter must be either 'uuid = ...' or 'uuid in [...]'")
209         }
210         if opts.Count != "none" {
211                 return httpErrorf(http.StatusBadRequest, "cannot execute federated list query unless count==\"none\"")
212         }
213         if (opts.Limit >= 0 && opts.Limit < int64(nUUIDs)) || opts.Offset != 0 || len(opts.Order) > 0 {
214                 return httpErrorf(http.StatusBadRequest, "cannot execute federated list query with limit (%d) < nUUIDs (%d), offset (%d) > 0, or order (%v) parameter", opts.Limit, nUUIDs, opts.Offset, opts.Order)
215         }
216         if max := conn.cluster.API.MaxItemsPerResponse; nUUIDs > max {
217                 return httpErrorf(http.StatusBadRequest, "cannot execute federated list query because number of UUIDs (%d) exceeds page size limit %d", nUUIDs, max)
218         }
219
220         ctx, cancel := context.WithCancel(ctx)
221         defer cancel()
222         errs := make(chan error, len(todoByRemote))
223         for clusterID, todo := range todoByRemote {
224                 go func(clusterID string, todo map[string]bool) {
225                         // This goroutine sends exactly one value to
226                         // errs.
227                         batch := make([]string, 0, len(todo))
228                         for uuid := range todo {
229                                 batch = append(batch, uuid)
230                         }
231
232                         var backend arvados.API
233                         if clusterID == conn.cluster.ClusterID {
234                                 backend = conn.local
235                         } else if backend = conn.remotes[clusterID]; backend == nil {
236                                 errs <- httpErrorf(http.StatusNotFound, "cannot execute federated list query: no proxy available for cluster %q", clusterID)
237                                 return
238                         }
239                         remoteOpts := opts
240                         if remoteOpts.Select != nil {
241                                 // We always need to select UUIDs to
242                                 // use the response, even if our
243                                 // caller doesn't.
244                                 remoteOpts.Select = append([]string{"uuid"}, remoteOpts.Select...)
245                         }
246                         for len(todo) > 0 {
247                                 if len(batch) > len(todo) {
248                                         // Reduce batch to just the todo's
249                                         batch = batch[:0]
250                                         for uuid := range todo {
251                                                 batch = append(batch, uuid)
252                                         }
253                                 }
254                                 remoteOpts.Filters = []arvados.Filter{{"uuid", "in", batch}}
255
256                                 done, err := fn(ctx, clusterID, backend, remoteOpts)
257                                 if err != nil {
258                                         errs <- httpErrorf(http.StatusBadGateway, "%s", err.Error())
259                                         return
260                                 }
261                                 progress := false
262                                 for _, uuid := range done {
263                                         if _, ok := todo[uuid]; ok {
264                                                 progress = true
265                                                 delete(todo, uuid)
266                                         }
267                                 }
268                                 if len(done) == 0 {
269                                         // Zero items == no more
270                                         // results exist, no need to
271                                         // get another page.
272                                         break
273                                 } else if !progress {
274                                         errs <- httpErrorf(http.StatusBadGateway, "cannot make progress in federated list query: cluster %q returned %d items but none had the requested UUIDs", clusterID, len(done))
275                                         return
276                                 }
277                         }
278                         errs <- nil
279                 }(clusterID, todo)
280         }
281
282         // Wait for all goroutines to return, then return the first
283         // non-nil error, if any.
284         var firstErr error
285         for range todoByRemote {
286                 if err := <-errs; err != nil && firstErr == nil {
287                         firstErr = err
288                         // Signal to any remaining fn() calls that
289                         // further effort is futile.
290                         cancel()
291                 }
292         }
293         return firstErr
294 }
295
296 func httpErrorf(code int, format string, args ...interface{}) error {
297         return httpserver.ErrorWithStatus(fmt.Errorf(format, args...), code)
298 }