17702: Federated lists supports cluster_id parameter
[arvados.git] / lib / controller / federation / list.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package federation
6
7 import (
8         "context"
9         "fmt"
10         "net/http"
11         "sort"
12         "sync"
13         "sync/atomic"
14
15         "git.arvados.org/arvados.git/sdk/go/arvados"
16         "git.arvados.org/arvados.git/sdk/go/httpserver"
17 )
18
19 //go:generate go run generate.go
20
21 // CollectionList is used as a template to auto-generate List()
22 // methods for other types; see generate.go.
23
24 func (conn *Conn) generated_CollectionList(ctx context.Context, options arvados.ListOptions) (arvados.CollectionList, error) {
25         if options.ClusterID != "" {
26                 // explicitly selected cluster
27                 options.ForwardedFor = conn.cluster.ClusterID + "-" + options.ForwardedFor
28                 return conn.chooseBackend(options.ClusterID).CollectionList(ctx, options)
29         }
30
31         var mtx sync.Mutex
32         var merged arvados.CollectionList
33         var needSort atomic.Value
34         needSort.Store(false)
35         err := conn.splitListRequest(ctx, options, func(ctx context.Context, _ string, backend arvados.API, options arvados.ListOptions) ([]string, error) {
36                 options.ForwardedFor = conn.cluster.ClusterID + "-" + options.ForwardedFor
37                 cl, err := backend.CollectionList(ctx, options)
38                 if err != nil {
39                         return nil, err
40                 }
41                 mtx.Lock()
42                 defer mtx.Unlock()
43                 if len(merged.Items) == 0 {
44                         merged = cl
45                 } else if len(cl.Items) > 0 {
46                         merged.Items = append(merged.Items, cl.Items...)
47                         needSort.Store(true)
48                 }
49                 uuids := make([]string, 0, len(cl.Items))
50                 for _, item := range cl.Items {
51                         uuids = append(uuids, item.UUID)
52                 }
53                 return uuids, nil
54         })
55         if needSort.Load().(bool) {
56                 // Apply the default/implied order, "modified_at desc"
57                 sort.Slice(merged.Items, func(i, j int) bool {
58                         mi, mj := merged.Items[i].ModifiedAt, merged.Items[j].ModifiedAt
59                         return mj.Before(mi)
60                 })
61         }
62         if merged.Items == nil {
63                 // Return empty results as [], not null
64                 // (https://github.com/golang/go/issues/27589 might be
65                 // a better solution in the future)
66                 merged.Items = []arvados.Collection{}
67         }
68         return merged, err
69 }
70
71 // Call fn on one or more local/remote backends if opts indicates a
72 // federation-wide list query, i.e.:
73 //
74 // * There is at least one filter of the form
75 //   ["uuid","in",[a,b,c,...]] or ["uuid","=",a]
76 //
77 // * One or more of the supplied UUIDs (a,b,c,...) has a non-local
78 //   prefix.
79 //
80 // * There are no other filters
81 //
82 // (If opts doesn't indicate a federation-wide list query, fn is just
83 // called once with the local backend.)
84 //
85 // fn is called more than once only if the query meets the following
86 // restrictions:
87 //
88 // * Count=="none"
89 //
90 // * Limit<0
91 //
92 // * len(Order)==0
93 //
94 // * Each filter is either "uuid = ..." or "uuid in [...]".
95 //
96 // * The maximum possible response size (total number of objects that
97 //   could potentially be matched by all of the specified filters)
98 //   exceeds the local cluster's response page size limit.
99 //
100 // If the query involves multiple backends but doesn't meet these
101 // restrictions, an error is returned without calling fn.
102 //
103 // Thus, the caller can assume that either:
104 //
105 // * splitListRequest() returns an error, or
106 //
107 // * fn is called exactly once, or
108 //
109 // * fn is called more than once, with options that satisfy the above
110 //   restrictions.
111 //
112 // Each call to fn indicates a single (local or remote) backend and a
113 // corresponding options argument suitable for sending to that
114 // backend.
115 func (conn *Conn) splitListRequest(ctx context.Context, opts arvados.ListOptions, fn func(context.Context, string, arvados.API, arvados.ListOptions) ([]string, error)) error {
116
117         if opts.BypassFederation || opts.ForwardedFor != "" {
118                 // Client requested no federation.  Pass through.
119                 _, err := fn(ctx, conn.cluster.ClusterID, conn.local, opts)
120                 return err
121         }
122
123         cannotSplit := false
124         var matchAllFilters map[string]bool
125         for _, f := range opts.Filters {
126                 matchThisFilter := map[string]bool{}
127                 if f.Attr != "uuid" {
128                         cannotSplit = true
129                         continue
130                 }
131                 if f.Operator == "=" {
132                         if uuid, ok := f.Operand.(string); ok {
133                                 matchThisFilter[uuid] = true
134                         } else {
135                                 return httpErrorf(http.StatusBadRequest, "invalid operand type %T for filter %q", f.Operand, f)
136                         }
137                 } else if f.Operator == "in" {
138                         if operand, ok := f.Operand.([]interface{}); ok {
139                                 // skip any elements that aren't
140                                 // strings (thus can't match a UUID,
141                                 // thus can't affect the response).
142                                 for _, v := range operand {
143                                         if uuid, ok := v.(string); ok {
144                                                 matchThisFilter[uuid] = true
145                                         }
146                                 }
147                         } else if strings, ok := f.Operand.([]string); ok {
148                                 for _, uuid := range strings {
149                                         matchThisFilter[uuid] = true
150                                 }
151                         } else {
152                                 return httpErrorf(http.StatusBadRequest, "invalid operand type %T in filter %q", f.Operand, f)
153                         }
154                 } else {
155                         cannotSplit = true
156                         continue
157                 }
158
159                 if matchAllFilters == nil {
160                         matchAllFilters = matchThisFilter
161                 } else {
162                         // Reduce matchAllFilters to the intersection
163                         // of matchAllFilters ∩ matchThisFilter.
164                         for uuid := range matchAllFilters {
165                                 if !matchThisFilter[uuid] {
166                                         delete(matchAllFilters, uuid)
167                                 }
168                         }
169                 }
170         }
171
172         if matchAllFilters == nil {
173                 // Not filtering by UUID at all; just query the local
174                 // cluster.
175                 _, err := fn(ctx, conn.cluster.ClusterID, conn.local, opts)
176                 return err
177         }
178
179         // Collate UUIDs in matchAllFilters by remote cluster ID --
180         // e.g., todoByRemote["aaaaa"]["aaaaa-4zz18-000000000000000"]
181         // will be true -- and count the total number of UUIDs we're
182         // filtering on, so we can compare it to our max page size
183         // limit.
184         nUUIDs := 0
185         todoByRemote := map[string]map[string]bool{}
186         for uuid := range matchAllFilters {
187                 if len(uuid) != 27 {
188                         // Cannot match anything, just drop it
189                 } else {
190                         if todoByRemote[uuid[:5]] == nil {
191                                 todoByRemote[uuid[:5]] = map[string]bool{}
192                         }
193                         todoByRemote[uuid[:5]][uuid] = true
194                         nUUIDs++
195                 }
196         }
197
198         if len(todoByRemote) == 0 {
199                 return nil
200         }
201         if len(todoByRemote) == 1 && todoByRemote[conn.cluster.ClusterID] != nil {
202                 // All UUIDs are local, so proxy a single request. The
203                 // generic case has some limitations (see below) which
204                 // we don't want to impose on local requests.
205                 _, err := fn(ctx, conn.cluster.ClusterID, conn.local, opts)
206                 return err
207         }
208         if cannotSplit {
209                 return httpErrorf(http.StatusBadRequest, "cannot execute federated list query: each filter must be either 'uuid = ...' or 'uuid in [...]'")
210         }
211         if opts.Count != "none" {
212                 return httpErrorf(http.StatusBadRequest, "cannot execute federated list query unless count==\"none\"")
213         }
214         if (opts.Limit >= 0 && opts.Limit < int64(nUUIDs)) || opts.Offset != 0 || len(opts.Order) > 0 {
215                 return httpErrorf(http.StatusBadRequest, "cannot execute federated list query with limit (%d) < nUUIDs (%d), offset (%d) > 0, or order (%v) parameter", opts.Limit, nUUIDs, opts.Offset, opts.Order)
216         }
217         if max := conn.cluster.API.MaxItemsPerResponse; nUUIDs > max {
218                 return httpErrorf(http.StatusBadRequest, "cannot execute federated list query because number of UUIDs (%d) exceeds page size limit %d", nUUIDs, max)
219         }
220
221         ctx, cancel := context.WithCancel(ctx)
222         defer cancel()
223         errs := make(chan error, len(todoByRemote))
224         for clusterID, todo := range todoByRemote {
225                 go func(clusterID string, todo map[string]bool) {
226                         // This goroutine sends exactly one value to
227                         // errs.
228                         batch := make([]string, 0, len(todo))
229                         for uuid := range todo {
230                                 batch = append(batch, uuid)
231                         }
232
233                         var backend arvados.API
234                         if clusterID == conn.cluster.ClusterID {
235                                 backend = conn.local
236                         } else if backend = conn.remotes[clusterID]; backend == nil {
237                                 errs <- httpErrorf(http.StatusNotFound, "cannot execute federated list query: no proxy available for cluster %q", clusterID)
238                                 return
239                         }
240                         remoteOpts := opts
241                         if remoteOpts.Select != nil {
242                                 // We always need to select UUIDs to
243                                 // use the response, even if our
244                                 // caller doesn't.
245                                 remoteOpts.Select = append([]string{"uuid"}, remoteOpts.Select...)
246                         }
247                         for len(todo) > 0 {
248                                 if len(batch) > len(todo) {
249                                         // Reduce batch to just the todo's
250                                         batch = batch[:0]
251                                         for uuid := range todo {
252                                                 batch = append(batch, uuid)
253                                         }
254                                 }
255                                 remoteOpts.Filters = []arvados.Filter{{"uuid", "in", batch}}
256
257                                 done, err := fn(ctx, clusterID, backend, remoteOpts)
258                                 if err != nil {
259                                         errs <- httpErrorf(http.StatusBadGateway, "%s", err.Error())
260                                         return
261                                 }
262                                 progress := false
263                                 for _, uuid := range done {
264                                         if _, ok := todo[uuid]; ok {
265                                                 progress = true
266                                                 delete(todo, uuid)
267                                         }
268                                 }
269                                 if len(done) == 0 {
270                                         // Zero items == no more
271                                         // results exist, no need to
272                                         // get another page.
273                                         break
274                                 } else if !progress {
275                                         errs <- httpErrorf(http.StatusBadGateway, "cannot make progress in federated list query: cluster %q returned %d items but none had the requested UUIDs", clusterID, len(done))
276                                         return
277                                 }
278                         }
279                         errs <- nil
280                 }(clusterID, todo)
281         }
282
283         // Wait for all goroutines to return, then return the first
284         // non-nil error, if any.
285         var firstErr error
286         for range todoByRemote {
287                 if err := <-errs; err != nil && firstErr == nil {
288                         firstErr = err
289                         // Signal to any remaining fn() calls that
290                         // further effort is futile.
291                         cancel()
292                 }
293         }
294         return firstErr
295 }
296
297 func httpErrorf(code int, format string, args ...interface{}) error {
298         return httpserver.ErrorWithStatus(fmt.Errorf(format, args...), code)
299 }