"sync"
"sync/atomic"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/httpserver"
)
//go:generate go run generate.go
var needSort atomic.Value
needSort.Store(false)
err := conn.splitListRequest(ctx, options, func(ctx context.Context, _ string, backend arvados.API, options arvados.ListOptions) ([]string, error) {
+ options.ForwardedFor = conn.cluster.ClusterID + "-" + options.ForwardedFor
cl, err := backend.CollectionList(ctx, options)
if err != nil {
return nil, err
// Call fn on one or more local/remote backends if opts indicates a
// federation-wide list query, i.e.:
//
-// * There is at least one filter of the form
-// ["uuid","in",[a,b,c,...]] or ["uuid","=",a]
+// - There is at least one filter of the form
+// ["uuid","in",[a,b,c,...]] or ["uuid","=",a]
//
-// * One or more of the supplied UUIDs (a,b,c,...) has a non-local
-// prefix.
+// - One or more of the supplied UUIDs (a,b,c,...) has a non-local
+// prefix.
//
-// * There are no other filters
+// - There are no other filters
//
// (If opts doesn't indicate a federation-wide list query, fn is just
// called once with the local backend.)
// fn is called more than once only if the query meets the following
// restrictions:
//
-// * Count=="none"
+// - Count=="none"
//
-// * Limit<0
+// - Limit<0
//
-// * len(Order)==0
+// - len(Order)==0
//
-// * Each filter must be either "uuid = ..." or "uuid in [...]".
+// - Each filter is either "uuid = ..." or "uuid in [...]".
//
-// * The maximum possible response size (total number of objects that
-// could potentially be matched by all of the specified filters)
-// exceeds the local cluster's response page size limit.
+// - The maximum possible response size (total number of objects
+// that could potentially be matched by all of the specified
+// filters) exceeds the local cluster's response page size limit.
//
// If the query involves multiple backends but doesn't meet these
// restrictions, an error is returned without calling fn.
//
// Thus, the caller can assume that either:
//
-// * splitListRequest() returns an error, or
+// - splitListRequest() returns an error, or
//
-// * fn is called exactly once, or
+// - fn is called exactly once, or
//
-// * fn is called more than once, with options that satisfy the above
-// restrictions.
+// - fn is called more than once, with options that satisfy the above
+// restrictions.
//
// Each call to fn indicates a single (local or remote) backend and a
// corresponding options argument suitable for sending to that
// backend.
func (conn *Conn) splitListRequest(ctx context.Context, opts arvados.ListOptions, fn func(context.Context, string, arvados.API, arvados.ListOptions) ([]string, error)) error {
+
+ if opts.BypassFederation || opts.ForwardedFor != "" {
+ // Client requested no federation. Pass through.
+ _, err := fn(ctx, conn.cluster.ClusterID, conn.local, opts)
+ return err
+ }
+ if opts.ClusterID != "" {
+ // Client explicitly selected cluster
+ _, err := fn(ctx, conn.cluster.ClusterID, conn.chooseBackend(opts.ClusterID), opts)
+ return err
+ }
+
cannotSplit := false
var matchAllFilters map[string]bool
for _, f := range opts.Filters {
}
}
- if len(todoByRemote) > 1 {
- if cannotSplit {
- return httpErrorf(http.StatusBadRequest, "cannot execute federated list query: each filter must be either 'uuid = ...' or 'uuid in [...]'")
- }
- if opts.Count != "none" {
- return httpErrorf(http.StatusBadRequest, "cannot execute federated list query unless count==\"none\"")
- }
- if opts.Limit >= 0 || opts.Offset != 0 || len(opts.Order) > 0 {
- return httpErrorf(http.StatusBadRequest, "cannot execute federated list query with limit, offset, or order parameter")
- }
- if max := conn.cluster.API.MaxItemsPerResponse; nUUIDs > max {
- return httpErrorf(http.StatusBadRequest, "cannot execute federated list query because number of UUIDs (%d) exceeds page size limit %d", nUUIDs, max)
- }
- selectingUUID := false
- for _, attr := range opts.Select {
- if attr == "uuid" {
- selectingUUID = true
- }
- }
- if opts.Select != nil && !selectingUUID {
- return httpErrorf(http.StatusBadRequest, "cannot execute federated list query with a select parameter that does not include uuid")
- }
+ if len(todoByRemote) == 0 {
+ return nil
+ }
+ if len(todoByRemote) == 1 && todoByRemote[conn.cluster.ClusterID] != nil {
+ // All UUIDs are local, so proxy a single request. The
+ // generic case has some limitations (see below) which
+ // we don't want to impose on local requests.
+ _, err := fn(ctx, conn.cluster.ClusterID, conn.local, opts)
+ return err
+ }
+ if cannotSplit {
+ return httpErrorf(http.StatusBadRequest, "cannot execute federated list query: each filter must be either 'uuid = ...' or 'uuid in [...]'")
+ }
+ if opts.Count != "none" {
+ return httpErrorf(http.StatusBadRequest, "cannot execute federated list query unless count==\"none\"")
+ }
+ if (opts.Limit >= 0 && opts.Limit < int64(nUUIDs)) || opts.Offset != 0 || len(opts.Order) > 0 {
+ return httpErrorf(http.StatusBadRequest, "cannot execute federated list query with limit (%d) < nUUIDs (%d), offset (%d) > 0, or order (%v) parameter", opts.Limit, nUUIDs, opts.Offset, opts.Order)
+ }
+ if max := conn.cluster.API.MaxItemsPerResponse; nUUIDs > max {
+ return httpErrorf(http.StatusBadRequest, "cannot execute federated list query because number of UUIDs (%d) exceeds page size limit %d", nUUIDs, max)
}
ctx, cancel := context.WithCancel(ctx)
return
}
remoteOpts := opts
+ if remoteOpts.Select != nil {
+ // We always need to select UUIDs to
+ // use the response, even if our
+ // caller doesn't.
+ remoteOpts.Select = append([]string{"uuid"}, remoteOpts.Select...)
+ }
for len(todo) > 0 {
if len(batch) > len(todo) {
// Reduce batch to just the todo's
done, err := fn(ctx, clusterID, backend, remoteOpts)
if err != nil {
- errs <- err
+ errs <- httpErrorf(http.StatusBadGateway, "%s", err.Error())
return
}
progress := false
delete(todo, uuid)
}
}
- if !progress {
- errs <- httpErrorf(http.StatusBadGateway, "cannot make progress in federated list query: cluster %q returned none of the requested UUIDs", clusterID)
+ if len(done) == 0 {
+ // Zero items == no more
+ // results exist, no need to
+ // get another page.
+ break
+ } else if !progress {
+ errs <- httpErrorf(http.StatusBadGateway, "cannot make progress in federated list query: cluster %q returned %d items but none had the requested UUIDs", clusterID, len(done))
return
}
}