Merge branch '16263-logincluster-user-list-fix' refs #16263
[arvados.git] / lib / controller / federation / list.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package federation
6
7 import (
8         "context"
9         "fmt"
10         "net/http"
11         "sort"
12         "sync"
13         "sync/atomic"
14
15         "git.arvados.org/arvados.git/sdk/go/arvados"
16         "git.arvados.org/arvados.git/sdk/go/httpserver"
17 )
18
19 //go:generate go run generate.go
20
21 // CollectionList is used as a template to auto-generate List()
22 // methods for other types; see generate.go.
23
24 func (conn *Conn) generated_CollectionList(ctx context.Context, options arvados.ListOptions) (arvados.CollectionList, error) {
25         var mtx sync.Mutex
26         var merged arvados.CollectionList
27         var needSort atomic.Value
28         needSort.Store(false)
29         err := conn.splitListRequest(ctx, options, func(ctx context.Context, _ string, backend arvados.API, options arvados.ListOptions) ([]string, error) {
30                 cl, err := backend.CollectionList(ctx, options)
31                 if err != nil {
32                         return nil, err
33                 }
34                 mtx.Lock()
35                 defer mtx.Unlock()
36                 if len(merged.Items) == 0 {
37                         merged = cl
38                 } else if len(cl.Items) > 0 {
39                         merged.Items = append(merged.Items, cl.Items...)
40                         needSort.Store(true)
41                 }
42                 uuids := make([]string, 0, len(cl.Items))
43                 for _, item := range cl.Items {
44                         uuids = append(uuids, item.UUID)
45                 }
46                 return uuids, nil
47         })
48         if needSort.Load().(bool) {
49                 // Apply the default/implied order, "modified_at desc"
50                 sort.Slice(merged.Items, func(i, j int) bool {
51                         mi, mj := merged.Items[i].ModifiedAt, merged.Items[j].ModifiedAt
52                         return mj.Before(mi)
53                 })
54         }
55         if merged.Items == nil {
56                 // Return empty results as [], not null
57                 // (https://github.com/golang/go/issues/27589 might be
58                 // a better solution in the future)
59                 merged.Items = []arvados.Collection{}
60         }
61         return merged, err
62 }
63
64 // Call fn on one or more local/remote backends if opts indicates a
65 // federation-wide list query, i.e.:
66 //
67 // * There is at least one filter of the form
68 //   ["uuid","in",[a,b,c,...]] or ["uuid","=",a]
69 //
70 // * One or more of the supplied UUIDs (a,b,c,...) has a non-local
71 //   prefix.
72 //
73 // * There are no other filters
74 //
75 // (If opts doesn't indicate a federation-wide list query, fn is just
76 // called once with the local backend.)
77 //
78 // fn is called more than once only if the query meets the following
79 // restrictions:
80 //
81 // * Count=="none"
82 //
83 // * Limit<0
84 //
85 // * len(Order)==0
86 //
87 // * Each filter is either "uuid = ..." or "uuid in [...]".
88 //
89 // * The maximum possible response size (total number of objects that
90 //   could potentially be matched by all of the specified filters)
91 //   exceeds the local cluster's response page size limit.
92 //
93 // If the query involves multiple backends but doesn't meet these
94 // restrictions, an error is returned without calling fn.
95 //
96 // Thus, the caller can assume that either:
97 //
98 // * splitListRequest() returns an error, or
99 //
100 // * fn is called exactly once, or
101 //
102 // * fn is called more than once, with options that satisfy the above
103 //   restrictions.
104 //
105 // Each call to fn indicates a single (local or remote) backend and a
106 // corresponding options argument suitable for sending to that
107 // backend.
108 func (conn *Conn) splitListRequest(ctx context.Context, opts arvados.ListOptions, fn func(context.Context, string, arvados.API, arvados.ListOptions) ([]string, error)) error {
109
110         if opts.BypassFederation {
111                 // Client requested no federation.  Pass through.
112                 _, err := fn(ctx, conn.cluster.ClusterID, conn.local, opts)
113                 return err
114         }
115
116         cannotSplit := false
117         var matchAllFilters map[string]bool
118         for _, f := range opts.Filters {
119                 matchThisFilter := map[string]bool{}
120                 if f.Attr != "uuid" {
121                         cannotSplit = true
122                         continue
123                 }
124                 if f.Operator == "=" {
125                         if uuid, ok := f.Operand.(string); ok {
126                                 matchThisFilter[uuid] = true
127                         } else {
128                                 return httpErrorf(http.StatusBadRequest, "invalid operand type %T for filter %q", f.Operand, f)
129                         }
130                 } else if f.Operator == "in" {
131                         if operand, ok := f.Operand.([]interface{}); ok {
132                                 // skip any elements that aren't
133                                 // strings (thus can't match a UUID,
134                                 // thus can't affect the response).
135                                 for _, v := range operand {
136                                         if uuid, ok := v.(string); ok {
137                                                 matchThisFilter[uuid] = true
138                                         }
139                                 }
140                         } else if strings, ok := f.Operand.([]string); ok {
141                                 for _, uuid := range strings {
142                                         matchThisFilter[uuid] = true
143                                 }
144                         } else {
145                                 return httpErrorf(http.StatusBadRequest, "invalid operand type %T in filter %q", f.Operand, f)
146                         }
147                 } else {
148                         cannotSplit = true
149                         continue
150                 }
151
152                 if matchAllFilters == nil {
153                         matchAllFilters = matchThisFilter
154                 } else {
155                         // Reduce matchAllFilters to the intersection
156                         // of matchAllFilters ∩ matchThisFilter.
157                         for uuid := range matchAllFilters {
158                                 if !matchThisFilter[uuid] {
159                                         delete(matchAllFilters, uuid)
160                                 }
161                         }
162                 }
163         }
164
165         if matchAllFilters == nil {
166                 // Not filtering by UUID at all; just query the local
167                 // cluster.
168                 _, err := fn(ctx, conn.cluster.ClusterID, conn.local, opts)
169                 return err
170         }
171
172         // Collate UUIDs in matchAllFilters by remote cluster ID --
173         // e.g., todoByRemote["aaaaa"]["aaaaa-4zz18-000000000000000"]
174         // will be true -- and count the total number of UUIDs we're
175         // filtering on, so we can compare it to our max page size
176         // limit.
177         nUUIDs := 0
178         todoByRemote := map[string]map[string]bool{}
179         for uuid := range matchAllFilters {
180                 if len(uuid) != 27 {
181                         // Cannot match anything, just drop it
182                 } else {
183                         if todoByRemote[uuid[:5]] == nil {
184                                 todoByRemote[uuid[:5]] = map[string]bool{}
185                         }
186                         todoByRemote[uuid[:5]][uuid] = true
187                         nUUIDs++
188                 }
189         }
190
191         if len(todoByRemote) == 0 {
192                 return nil
193         }
194         if len(todoByRemote) == 1 && todoByRemote[conn.cluster.ClusterID] != nil {
195                 // All UUIDs are local, so proxy a single request. The
196                 // generic case has some limitations (see below) which
197                 // we don't want to impose on local requests.
198                 _, err := fn(ctx, conn.cluster.ClusterID, conn.local, opts)
199                 return err
200         }
201         if cannotSplit {
202                 return httpErrorf(http.StatusBadRequest, "cannot execute federated list query: each filter must be either 'uuid = ...' or 'uuid in [...]'")
203         }
204         if opts.Count != "none" {
205                 return httpErrorf(http.StatusBadRequest, "cannot execute federated list query unless count==\"none\"")
206         }
207         if opts.Limit >= 0 || opts.Offset != 0 || len(opts.Order) > 0 {
208                 return httpErrorf(http.StatusBadRequest, "cannot execute federated list query with limit, offset, or order parameter")
209         }
210         if max := conn.cluster.API.MaxItemsPerResponse; nUUIDs > max {
211                 return httpErrorf(http.StatusBadRequest, "cannot execute federated list query because number of UUIDs (%d) exceeds page size limit %d", nUUIDs, max)
212         }
213
214         ctx, cancel := context.WithCancel(ctx)
215         defer cancel()
216         errs := make(chan error, len(todoByRemote))
217         for clusterID, todo := range todoByRemote {
218                 go func(clusterID string, todo map[string]bool) {
219                         // This goroutine sends exactly one value to
220                         // errs.
221                         batch := make([]string, 0, len(todo))
222                         for uuid := range todo {
223                                 batch = append(batch, uuid)
224                         }
225
226                         var backend arvados.API
227                         if clusterID == conn.cluster.ClusterID {
228                                 backend = conn.local
229                         } else if backend = conn.remotes[clusterID]; backend == nil {
230                                 errs <- httpErrorf(http.StatusNotFound, "cannot execute federated list query: no proxy available for cluster %q", clusterID)
231                                 return
232                         }
233                         remoteOpts := opts
234                         if remoteOpts.Select != nil {
235                                 // We always need to select UUIDs to
236                                 // use the response, even if our
237                                 // caller doesn't.
238                                 remoteOpts.Select = append([]string{"uuid"}, remoteOpts.Select...)
239                         }
240                         for len(todo) > 0 {
241                                 if len(batch) > len(todo) {
242                                         // Reduce batch to just the todo's
243                                         batch = batch[:0]
244                                         for uuid := range todo {
245                                                 batch = append(batch, uuid)
246                                         }
247                                 }
248                                 remoteOpts.Filters = []arvados.Filter{{"uuid", "in", batch}}
249
250                                 done, err := fn(ctx, clusterID, backend, remoteOpts)
251                                 if err != nil {
252                                         errs <- httpErrorf(http.StatusBadGateway, err.Error())
253                                         return
254                                 }
255                                 progress := false
256                                 for _, uuid := range done {
257                                         if _, ok := todo[uuid]; ok {
258                                                 progress = true
259                                                 delete(todo, uuid)
260                                         }
261                                 }
262                                 if len(done) == 0 {
263                                         // Zero items == no more
264                                         // results exist, no need to
265                                         // get another page.
266                                         break
267                                 } else if !progress {
268                                         errs <- httpErrorf(http.StatusBadGateway, "cannot make progress in federated list query: cluster %q returned %d items but none had the requested UUIDs", clusterID, len(done))
269                                         return
270                                 }
271                         }
272                         errs <- nil
273                 }(clusterID, todo)
274         }
275
276         // Wait for all goroutines to return, then return the first
277         // non-nil error, if any.
278         var firstErr error
279         for range todoByRemote {
280                 if err := <-errs; err != nil && firstErr == nil {
281                         firstErr = err
282                         // Signal to any remaining fn() calls that
283                         // further effort is futile.
284                         cancel()
285                 }
286         }
287         return firstErr
288 }
289
290 func httpErrorf(code int, format string, args ...interface{}) error {
291         return httpserver.ErrorWithStatus(fmt.Errorf(format, args...), code)
292 }