14287: Merge list results from multiple backends.
authorTom Clegg <tclegg@veritasgenetics.com>
Wed, 3 Jul 2019 13:10:38 +0000 (09:10 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Wed, 3 Jul 2019 13:10:38 +0000 (09:10 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

12 files changed:
lib/controller/federation/conn.go
lib/controller/federation/generate.go [new file with mode: 0644]
lib/controller/federation/generated.go [new file with mode: 0755]
lib/controller/federation/generated_test.go [new file with mode: 0644]
lib/controller/federation/list.go [new file with mode: 0644]
lib/controller/federation/list_test.go [new file with mode: 0644]
lib/controller/handler.go
lib/controller/router/router.go
lib/controller/router/router_test.go
lib/controller/rpc/conn.go
sdk/go/arvadostest/api.go
sdk/go/httpserver/error.go

index e094953fc40d89989cca538f6f46098fb41fe958..3addcf4fa91fac63e622870000e32f0d6d260459 100644 (file)
@@ -27,7 +27,7 @@ type Conn struct {
        remotes map[string]backend
 }
 
-func New(cluster *arvados.Cluster) arvados.API {
+func New(cluster *arvados.Cluster) *Conn {
        local := railsproxy.NewConn(cluster)
        remotes := map[string]backend{}
        for id, remote := range cluster.RemoteClusters {
@@ -218,10 +218,6 @@ func (conn *Conn) CollectionGet(ctx context.Context, options arvados.GetOptions)
        }
 }
 
-func (conn *Conn) CollectionList(ctx context.Context, options arvados.ListOptions) (arvados.CollectionList, error) {
-       return conn.local.CollectionList(ctx, options)
-}
-
 func (conn *Conn) CollectionProvenance(ctx context.Context, options arvados.GetOptions) (map[string]interface{}, error) {
        return conn.chooseBackend(options.UUID).CollectionProvenance(ctx, options)
 }
@@ -254,10 +250,6 @@ func (conn *Conn) ContainerGet(ctx context.Context, options arvados.GetOptions)
        return conn.chooseBackend(options.UUID).ContainerGet(ctx, options)
 }
 
-func (conn *Conn) ContainerList(ctx context.Context, options arvados.ListOptions) (arvados.ContainerList, error) {
-       return conn.local.ContainerList(ctx, options)
-}
-
 func (conn *Conn) ContainerDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Container, error) {
        return conn.chooseBackend(options.UUID).ContainerDelete(ctx, options)
 }
@@ -282,10 +274,6 @@ func (conn *Conn) SpecimenGet(ctx context.Context, options arvados.GetOptions) (
        return conn.chooseBackend(options.UUID).SpecimenGet(ctx, options)
 }
 
-func (conn *Conn) SpecimenList(ctx context.Context, options arvados.ListOptions) (arvados.SpecimenList, error) {
-       return conn.local.SpecimenList(ctx, options)
-}
-
 func (conn *Conn) SpecimenDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Specimen, error) {
        return conn.chooseBackend(options.UUID).SpecimenDelete(ctx, options)
 }
diff --git a/lib/controller/federation/generate.go b/lib/controller/federation/generate.go
new file mode 100644 (file)
index 0000000..1f37df8
--- /dev/null
@@ -0,0 +1,95 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+// +build ignore
+
+package main
+
+import (
+       "bytes"
+       "io"
+       "io/ioutil"
+       "os"
+       "os/exec"
+       "regexp"
+)
+
+func main() {
+       checkOnly := false
+       if len(os.Args) == 2 && os.Args[1] == "-check" {
+               checkOnly = true
+       } else if len(os.Args) != 1 {
+               panic("usage: go run generate.go [-check]")
+       }
+
+       in, err := os.Open("list.go")
+       if err != nil {
+               panic(err)
+       }
+       buf, err := ioutil.ReadAll(in)
+       if err != nil {
+               panic(err)
+       }
+       orig := regexp.MustCompile(`(?ms)\nfunc [^\n]*CollectionList\(.*?\n}\n`).Find(buf)
+       if len(orig) == 0 {
+               panic("can't find CollectionList func")
+       }
+
+       outfile, err := os.OpenFile("generated.go~", os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0777)
+       if err != nil {
+               panic(err)
+       }
+
+       gofmt := exec.Command("goimports")
+       gofmt.Stdout = outfile
+       gofmt.Stderr = os.Stderr
+       out, err := gofmt.StdinPipe()
+       if err != nil {
+               panic(err)
+       }
+       go func() {
+               out.Write(regexp.MustCompile(`(?ms)^.*package .*?import.*?\n\)\n`).Find(buf))
+               io.WriteString(out, "//\n// -- this file is auto-generated -- do not edit -- edit list.go and run \"go generate\" instead --\n//\n\n")
+               for _, t := range []string{"Container", "Specimen"} {
+                       _, err := out.Write(bytes.ReplaceAll(orig, []byte("Collection"), []byte(t)))
+                       if err != nil {
+                               panic(err)
+                       }
+               }
+               err = out.Close()
+               if err != nil {
+                       panic(err)
+               }
+       }()
+       err = gofmt.Run()
+       if err != nil {
+               panic(err)
+       }
+       err = outfile.Close()
+       if err != nil {
+               panic(err)
+       }
+       if checkOnly {
+               diff := exec.Command("diff", "-u", "/dev/fd/3", "/dev/fd/4")
+               for _, fnm := range []string{"generated.go", "generated.go~"} {
+                       f, err := os.Open(fnm)
+                       if err != nil {
+                               panic(err)
+                       }
+                       defer f.Close()
+                       diff.ExtraFiles = append(diff.ExtraFiles, f)
+               }
+               diff.Stdout = os.Stdout
+               diff.Stderr = os.Stderr
+               err = diff.Run()
+               if err != nil {
+                       os.Exit(1)
+               }
+       } else {
+               err = os.Rename("generated.go~", "generated.go")
+               if err != nil {
+                       panic(err)
+               }
+       }
+}
diff --git a/lib/controller/federation/generated.go b/lib/controller/federation/generated.go
new file mode 100755 (executable)
index 0000000..b34b9b1
--- /dev/null
@@ -0,0 +1,67 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package federation
+
+import (
+       "context"
+       "sort"
+       "sync"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+//
+// -- this file is auto-generated -- do not edit -- edit list.go and run "go generate" instead --
+//
+
+func (conn *Conn) ContainerList(ctx context.Context, options arvados.ListOptions) (arvados.ContainerList, error) {
+       var mtx sync.Mutex
+       var merged arvados.ContainerList
+       err := conn.splitListRequest(ctx, options, func(ctx context.Context, _ string, backend arvados.API, options arvados.ListOptions) ([]string, error) {
+               cl, err := backend.ContainerList(ctx, options)
+               if err != nil {
+                       return nil, err
+               }
+               mtx.Lock()
+               defer mtx.Unlock()
+               if len(merged.Items) == 0 {
+                       merged = cl
+               } else {
+                       merged.Items = append(merged.Items, cl.Items...)
+               }
+               uuids := make([]string, 0, len(cl.Items))
+               for _, item := range cl.Items {
+                       uuids = append(uuids, item.UUID)
+               }
+               return uuids, nil
+       })
+       sort.Slice(merged.Items, func(i, j int) bool { return merged.Items[i].UUID < merged.Items[j].UUID })
+       return merged, err
+}
+
+func (conn *Conn) SpecimenList(ctx context.Context, options arvados.ListOptions) (arvados.SpecimenList, error) {
+       var mtx sync.Mutex
+       var merged arvados.SpecimenList
+       err := conn.splitListRequest(ctx, options, func(ctx context.Context, _ string, backend arvados.API, options arvados.ListOptions) ([]string, error) {
+               cl, err := backend.SpecimenList(ctx, options)
+               if err != nil {
+                       return nil, err
+               }
+               mtx.Lock()
+               defer mtx.Unlock()
+               if len(merged.Items) == 0 {
+                       merged = cl
+               } else {
+                       merged.Items = append(merged.Items, cl.Items...)
+               }
+               uuids := make([]string, 0, len(cl.Items))
+               for _, item := range cl.Items {
+                       uuids = append(uuids, item.UUID)
+               }
+               return uuids, nil
+       })
+       sort.Slice(merged.Items, func(i, j int) bool { return merged.Items[i].UUID < merged.Items[j].UUID })
+       return merged, err
+}
diff --git a/lib/controller/federation/generated_test.go b/lib/controller/federation/generated_test.go
new file mode 100644 (file)
index 0000000..0e571f2
--- /dev/null
@@ -0,0 +1,23 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package federation
+
+import (
+       "os/exec"
+
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&UptodateSuite{})
+
+type UptodateSuite struct{}
+
+func (*UptodateSuite) TestUpToDate(c *check.C) {
+       output, err := exec.Command("go", "run", "generate.go", "-check").CombinedOutput()
+       if err != nil {
+               c.Log(string(output))
+               c.Error("generated.go is out of date -- run 'go generate' to update it")
+       }
+}
diff --git a/lib/controller/federation/list.go b/lib/controller/federation/list.go
new file mode 100644 (file)
index 0000000..5a171c9
--- /dev/null
@@ -0,0 +1,247 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package federation
+
+import (
+       "context"
+       "fmt"
+       "net/http"
+       "sort"
+       "sync"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+)
+
+//go:generate go run generate.go
+
+// CollectionList is used as a template to auto-generate List()
+// methods for other types; see generate.go.
+
+func (conn *Conn) CollectionList(ctx context.Context, options arvados.ListOptions) (arvados.CollectionList, error) {
+       var mtx sync.Mutex
+       var merged arvados.CollectionList
+       err := conn.splitListRequest(ctx, options, func(ctx context.Context, _ string, backend arvados.API, options arvados.ListOptions) ([]string, error) {
+               cl, err := backend.CollectionList(ctx, options)
+               if err != nil {
+                       return nil, err
+               }
+               mtx.Lock()
+               defer mtx.Unlock()
+               if len(merged.Items) == 0 {
+                       merged = cl
+               } else {
+                       merged.Items = append(merged.Items, cl.Items...)
+               }
+               uuids := make([]string, 0, len(cl.Items))
+               for _, item := range cl.Items {
+                       uuids = append(uuids, item.UUID)
+               }
+               return uuids, nil
+       })
+       sort.Slice(merged.Items, func(i, j int) bool { return merged.Items[i].UUID < merged.Items[j].UUID })
+       return merged, err
+}
+
+// Call fn on one or more local/remote backends if opts indicates a
+// federation-wide list query, i.e.:
+//
+// * There is at least one filter of the form
+//   ["uuid","in",[a,b,c,...]] or ["uuid","=",a]
+//
+// * One or more of the supplied UUIDs (a,b,c,...) has a non-local
+//   prefix.
+//
+// * There are no other filters
+//
+// (If opts doesn't indicate a federation-wide list query, fn is just
+// called once with the local backend.)
+//
+// fn is called more than once only if the query meets the following
+// restrictions:
+//
+// * Count=="none"
+//
+// * Limit<0
+//
+// * len(Order)==0
+//
+// * there are no filters other than the "uuid = ..." and "uuid in
+//   ..." filters mentioned above.
+//
+// * The maximum possible response size (total number of objects that
+//   could potentially be matched by all of the specified filters)
+//   exceeds the local cluster's response page size limit.
+//
+// If the query involves multiple backends but doesn't meet these
+// restrictions, an error is returned without calling fn.
+//
+// Thus, the caller can assume that either:
+//
+// * splitListRequest() returns an error, or
+//
+// * fn is called exactly once, or
+//
+// * fn is called more than once, with options that satisfy the above
+//   restrictions.
+//
+// Each call to fn indicates a single (local or remote) backend and a
+// corresponding options argument suitable for sending to that
+// backend.
+func (conn *Conn) splitListRequest(ctx context.Context, opts arvados.ListOptions, fn func(context.Context, string, arvados.API, arvados.ListOptions) ([]string, error)) error {
+       cannotSplit := false
+       var matchAllFilters map[string]bool
+       for _, f := range opts.Filters {
+               matchThisFilter := map[string]bool{}
+               if f.Attr != "uuid" {
+                       cannotSplit = true
+                       continue
+               }
+               if f.Operator == "=" {
+                       if uuid, ok := f.Operand.(string); ok {
+                               matchThisFilter[uuid] = true
+                       } else {
+                               return httpErrorf(http.StatusBadRequest, "invalid operand type %T for filter %q", f.Operand, f)
+                       }
+               } else if f.Operator == "in" {
+                       if operand, ok := f.Operand.([]interface{}); ok {
+                               // skip any elements that aren't
+                               // strings (thus can't match a UUID,
+                               // thus can't affect the response).
+                               for _, v := range operand {
+                                       if uuid, ok := v.(string); ok {
+                                               matchThisFilter[uuid] = true
+                                       }
+                               }
+                       } else if strings, ok := f.Operand.([]string); ok {
+                               for _, uuid := range strings {
+                                       matchThisFilter[uuid] = true
+                               }
+                       } else {
+                               return httpErrorf(http.StatusBadRequest, "invalid operand type %T in filter %q", f.Operand, f)
+                       }
+               } else {
+                       cannotSplit = true
+                       continue
+               }
+
+               if matchAllFilters == nil {
+                       matchAllFilters = matchThisFilter
+               } else {
+                       // matchAllFilters = intersect(matchAllFilters, matchThisFilter)
+                       for uuid := range matchAllFilters {
+                               if !matchThisFilter[uuid] {
+                                       delete(matchAllFilters, uuid)
+                               }
+                       }
+               }
+       }
+
+       nUUIDs := 0
+       todoByRemote := map[string]map[string]bool{}
+       for uuid := range matchAllFilters {
+               if len(uuid) != 27 {
+                       // Cannot match anything, just drop it
+               } else {
+                       if todoByRemote[uuid[:5]] == nil {
+                               todoByRemote[uuid[:5]] = map[string]bool{}
+                       }
+                       todoByRemote[uuid[:5]][uuid] = true
+                       nUUIDs++
+               }
+       }
+
+       if len(todoByRemote) > 1 {
+               if cannotSplit {
+                       return httpErrorf(http.StatusBadRequest, "cannot execute federated list query with filters other than 'uuid = ...' and 'uuid in [...]'")
+               }
+               if opts.Count != "none" {
+                       return httpErrorf(http.StatusBadRequest, "cannot execute federated list query unless count==\"none\"")
+               }
+               if opts.Limit >= 0 || opts.Offset != 0 || len(opts.Order) > 0 {
+                       return httpErrorf(http.StatusBadRequest, "cannot execute federated list query with limit, offset, or order parameter")
+               }
+               if max := conn.cluster.API.MaxItemsPerResponse; nUUIDs > max {
+                       return httpErrorf(http.StatusBadRequest, "cannot execute federated list query because number of UUIDs (%d) exceeds page size limit %d", nUUIDs, max)
+               }
+               selectingUUID := false
+               for _, attr := range opts.Select {
+                       if attr == "uuid" {
+                               selectingUUID = true
+                       }
+               }
+               if opts.Select != nil && !selectingUUID {
+                       return httpErrorf(http.StatusBadRequest, "cannot execute federated list query with a select parameter that does not include uuid")
+               }
+       }
+
+       ctx, cancel := context.WithCancel(ctx)
+       defer cancel()
+       errs := make(chan error, len(todoByRemote))
+       for clusterID, todo := range todoByRemote {
+               clusterID, todo := clusterID, todo
+               batch := make([]string, 0, len(todo))
+               for uuid := range todo {
+                       batch = append(batch, uuid)
+               }
+               go func() {
+                       // This goroutine sends exactly one value to
+                       // errs.
+                       var backend arvados.API
+                       if clusterID == conn.cluster.ClusterID {
+                               backend = conn.local
+                       } else if backend = conn.remotes[clusterID]; backend == nil {
+                               errs <- httpErrorf(http.StatusNotFound, "cannot execute federated list query: no proxy available for cluster %q", clusterID)
+                               return
+                       }
+                       remoteOpts := opts
+                       for len(todo) > 0 {
+                               if len(batch) > len(todo) {
+                                       // Reduce batch to just the todo's
+                                       batch = batch[:0]
+                                       for uuid := range todo {
+                                               batch = append(batch, uuid)
+                                       }
+                               }
+                               remoteOpts.Filters = []arvados.Filter{{"uuid", "in", batch}}
+
+                               done, err := fn(ctx, clusterID, backend, remoteOpts)
+                               if err != nil {
+                                       errs <- err
+                                       return
+                               }
+                               progress := false
+                               for _, uuid := range done {
+                                       if _, ok := todo[uuid]; ok {
+                                               progress = true
+                                               delete(todo, uuid)
+                                       }
+                               }
+                               if !progress {
+                                       errs <- httpErrorf(http.StatusBadGateway, "cannot make progress in federated list query: cluster %q returned none of the requested UUIDs", clusterID)
+                                       return
+                               }
+                       }
+                       errs <- nil
+               }()
+       }
+
+       // Wait for all goroutines to return, then return the first
+       // non-nil error, if any.
+       var firstErr error
+       for i := 0; i < len(todoByRemote); i++ {
+               if err := <-errs; err != nil && firstErr == nil {
+                       firstErr = err
+                       // Signal to any remaining fn() calls that
+                       // further effort is futile.
+                       cancel()
+               }
+       }
+       return firstErr
+}
+
+func httpErrorf(code int, format string, args ...interface{}) error {
+       return httpserver.ErrorWithStatus(fmt.Errorf(format, args...), code)
+}
diff --git a/lib/controller/federation/list_test.go b/lib/controller/federation/list_test.go
new file mode 100644 (file)
index 0000000..b28609c
--- /dev/null
@@ -0,0 +1,438 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package federation
+
+import (
+       "context"
+       "fmt"
+       "net/http"
+       "net/url"
+       "os"
+       "testing"
+
+       "git.curoverse.com/arvados.git/lib/controller/router"
+       "git.curoverse.com/arvados.git/lib/controller/rpc"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "git.curoverse.com/arvados.git/sdk/go/auth"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
+
+var (
+       _ = check.Suite(&FederationSuite{})
+       _ = check.Suite(&CollectionListSuite{})
+)
+
+type FederationSuite struct {
+       cluster *arvados.Cluster
+       ctx     context.Context
+       fed     *Conn
+}
+
+func (s *FederationSuite) SetUpTest(c *check.C) {
+       s.cluster = &arvados.Cluster{
+               ClusterID: "aaaaa",
+               RemoteClusters: map[string]arvados.RemoteCluster{
+                       "aaaaa": arvados.RemoteCluster{
+                               Host: os.Getenv("ARVADOS_API_HOST"),
+                       },
+               },
+       }
+       arvadostest.SetServiceURL(&s.cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
+       s.cluster.TLS.Insecure = true
+       s.cluster.API.MaxItemsPerResponse = 3
+
+       ctx := context.Background()
+       ctx = ctxlog.Context(ctx, ctxlog.TestLogger(c))
+       ctx = auth.NewContext(ctx, &auth.Credentials{Tokens: []string{arvadostest.ActiveTokenV2}})
+       s.ctx = ctx
+
+       s.fed = New(s.cluster)
+}
+
+func (s *FederationSuite) addDirectRemote(c *check.C, id string, backend arvados.API) {
+       s.cluster.RemoteClusters[id] = arvados.RemoteCluster{
+               Host: "in-process.local",
+       }
+       s.fed.remotes[id] = backend
+}
+
+func (s *FederationSuite) addHTTPRemote(c *check.C, id string, backend arvados.API) {
+       srv := httpserver.Server{Addr: ":"}
+       srv.Handler = router.New(backend)
+       c.Check(srv.Start(), check.IsNil)
+       s.cluster.RemoteClusters[id] = arvados.RemoteCluster{
+               Host:  srv.Addr,
+               Proxy: true,
+       }
+       s.fed.remotes[id] = rpc.NewConn(id, &url.URL{Scheme: "http", Host: srv.Addr}, true, saltedTokenProvider(s.fed.local, id))
+}
+
+type collectionLister struct {
+       arvadostest.APIStub
+       ItemsToReturn []arvados.Collection
+       MaxPageSize   int
+}
+
+func (cl *collectionLister) matchFilters(c arvados.Collection, filters []arvados.Filter) bool {
+nextfilter:
+       for _, f := range filters {
+               if f.Attr == "uuid" && f.Operator == "=" {
+                       s, ok := f.Operand.(string)
+                       if ok && s == c.UUID {
+                               continue nextfilter
+                       }
+               } else if f.Attr == "uuid" && f.Operator == "in" {
+                       if operand, ok := f.Operand.([]string); ok {
+                               for _, s := range operand {
+                                       if s == c.UUID {
+                                               continue nextfilter
+                                       }
+                               }
+                       } else if operand, ok := f.Operand.([]interface{}); ok {
+                               for _, s := range operand {
+                                       if s, ok := s.(string); ok && s == c.UUID {
+                                               continue nextfilter
+                                       }
+                               }
+                       }
+               }
+               return false
+       }
+       return true
+}
+
+func (cl *collectionLister) CollectionList(ctx context.Context, options arvados.ListOptions) (resp arvados.CollectionList, _ error) {
+       cl.APIStub.CollectionList(ctx, options)
+       for _, c := range cl.ItemsToReturn {
+               if cl.MaxPageSize > 0 && len(resp.Items) >= cl.MaxPageSize {
+                       break
+               }
+               if cl.matchFilters(c, options.Filters) {
+                       resp.Items = append(resp.Items, c)
+               }
+       }
+       return
+}
+
+type CollectionListSuite struct {
+       FederationSuite
+       ids      []string   // aaaaa, bbbbb, ccccc
+       uuids    [][]string // [[aa-*, aa-*, aa-*], [bb-*, bb-*, ...], ...]
+       backends []*collectionLister
+}
+
+func (s *CollectionListSuite) SetUpTest(c *check.C) {
+       s.FederationSuite.SetUpTest(c)
+
+       s.ids = nil
+       s.uuids = nil
+       s.backends = nil
+       for i, id := range []string{"aaaaa", "bbbbb", "ccccc"} {
+               cl := &collectionLister{}
+               s.ids = append(s.ids, id)
+               s.uuids = append(s.uuids, nil)
+               for j := 0; j < 5; j++ {
+                       uuid := fmt.Sprintf("%s-4zz18-%s%010d", id, id, j)
+                       s.uuids[i] = append(s.uuids[i], uuid)
+                       cl.ItemsToReturn = append(cl.ItemsToReturn, arvados.Collection{
+                               UUID: uuid,
+                       })
+               }
+               s.backends = append(s.backends, cl)
+               if i == 0 {
+                       s.fed.local = cl
+               } else if i%1 == 0 {
+                       // call some backends directly via API
+                       s.addDirectRemote(c, id, cl)
+               } else {
+                       // call some backends through rpc->router->API
+                       // to ensure nothing is lost in translation
+                       s.addHTTPRemote(c, id, cl)
+               }
+       }
+}
+
+type listTrial struct {
+       count        string
+       limit        int
+       offset       int
+       order        []string
+       filters      []arvados.Filter
+       expectUUIDs  []string
+       expectCalls  []int // number of API calls to backends
+       expectStatus int
+}
+
+func (s *CollectionListSuite) TestCollectionListOneLocal(c *check.C) {
+       s.test(c, listTrial{
+               count:       "none",
+               limit:       -1,
+               filters:     []arvados.Filter{{"uuid", "=", s.uuids[0][0]}},
+               expectUUIDs: []string{s.uuids[0][0]},
+               expectCalls: []int{1, 0, 0},
+       })
+}
+
+func (s *CollectionListSuite) TestCollectionListOneRemote(c *check.C) {
+       s.test(c, listTrial{
+               count:       "none",
+               limit:       -1,
+               filters:     []arvados.Filter{{"uuid", "=", s.uuids[1][0]}},
+               expectUUIDs: []string{s.uuids[1][0]},
+               expectCalls: []int{0, 1, 0},
+       })
+}
+
+func (s *CollectionListSuite) TestCollectionListOneLocalUsingInOperator(c *check.C) {
+       s.test(c, listTrial{
+               count:       "none",
+               limit:       -1,
+               filters:     []arvados.Filter{{"uuid", "in", []string{s.uuids[0][0]}}},
+               expectUUIDs: []string{s.uuids[0][0]},
+               expectCalls: []int{1, 0, 0},
+       })
+}
+
+func (s *CollectionListSuite) TestCollectionListOneRemoteUsingInOperator(c *check.C) {
+       s.test(c, listTrial{
+               count:       "none",
+               limit:       -1,
+               filters:     []arvados.Filter{{"uuid", "in", []string{s.uuids[1][1]}}},
+               expectUUIDs: []string{s.uuids[1][1]},
+               expectCalls: []int{0, 1, 0},
+       })
+}
+
+func (s *CollectionListSuite) TestCollectionListOneLocalOneRemote(c *check.C) {
+       s.test(c, listTrial{
+               count:       "none",
+               limit:       -1,
+               filters:     []arvados.Filter{{"uuid", "in", []string{s.uuids[0][0], s.uuids[1][0]}}},
+               expectUUIDs: []string{s.uuids[0][0], s.uuids[1][0]},
+               expectCalls: []int{1, 1, 0},
+       })
+}
+
+func (s *CollectionListSuite) TestCollectionListTwoRemotes(c *check.C) {
+       s.test(c, listTrial{
+               count:       "none",
+               limit:       -1,
+               filters:     []arvados.Filter{{"uuid", "in", []string{s.uuids[2][0], s.uuids[1][0]}}},
+               expectUUIDs: []string{s.uuids[1][0], s.uuids[2][0]},
+               expectCalls: []int{0, 1, 1},
+       })
+}
+
+func (s *CollectionListSuite) TestCollectionListSatisfyAllFilters(c *check.C) {
+       s.cluster.API.MaxItemsPerResponse = 2
+       s.test(c, listTrial{
+               count: "none",
+               limit: -1,
+               filters: []arvados.Filter{
+                       {"uuid", "in", []string{s.uuids[0][0], s.uuids[1][1], s.uuids[2][0], s.uuids[2][1], s.uuids[2][2]}},
+                       {"uuid", "in", []string{s.uuids[0][0], s.uuids[1][2], s.uuids[2][1]}},
+               },
+               expectUUIDs: []string{s.uuids[0][0], s.uuids[2][1]},
+               expectCalls: []int{1, 0, 1},
+       })
+}
+
+func (s *CollectionListSuite) TestCollectionListEmptySet(c *check.C) {
+       s.test(c, listTrial{
+               count:       "none",
+               limit:       -1,
+               filters:     []arvados.Filter{{"uuid", "in", []string{}}},
+               expectUUIDs: []string{},
+               expectCalls: []int{0, 0, 0},
+       })
+}
+
+func (s *CollectionListSuite) TestCollectionListUnmatchableUUID(c *check.C) {
+       s.test(c, listTrial{
+               count: "none",
+               limit: -1,
+               filters: []arvados.Filter{
+                       {"uuid", "in", []string{s.uuids[0][0], "abcdefg"}},
+                       {"uuid", "in", []string{s.uuids[0][0], "bbbbb-4zz18-bogus"}},
+                       {"uuid", "in", []string{s.uuids[0][0], "bogus-4zz18-bogus"}},
+               },
+               expectUUIDs: []string{s.uuids[0][0]},
+               expectCalls: []int{1, 0, 0},
+       })
+}
+
+func (s *CollectionListSuite) TestCollectionListMultiPage(c *check.C) {
+       for i := range s.backends {
+               s.uuids[i] = s.uuids[i][:3]
+               s.backends[i].ItemsToReturn = s.backends[i].ItemsToReturn[:3]
+       }
+       s.cluster.API.MaxItemsPerResponse = 9
+       for _, stub := range s.backends {
+               stub.MaxPageSize = 2
+       }
+       allUUIDs := append(append(append([]string(nil), s.uuids[0]...), s.uuids[1]...), s.uuids[2]...)
+       s.test(c, listTrial{
+               count:       "none",
+               limit:       -1,
+               filters:     []arvados.Filter{{"uuid", "in", append([]string(nil), allUUIDs...)}},
+               expectUUIDs: allUUIDs,
+               expectCalls: []int{2, 2, 2},
+       })
+}
+
+func (s *CollectionListSuite) TestCollectionListMultiSiteExtraFilters(c *check.C) {
+       // not [yet] supported
+       s.test(c, listTrial{
+               count: "none",
+               limit: -1,
+               filters: []arvados.Filter{
+                       {"uuid", "in", []string{s.uuids[0][0], s.uuids[1][0]}},
+                       {"uuid", "is_a", "teapot"},
+               },
+               expectCalls:  []int{0, 0, 0},
+               expectStatus: http.StatusBadRequest,
+       })
+}
+
+func (s *CollectionListSuite) TestCollectionListMultiSiteWithCount(c *check.C) {
+       for _, count := range []string{"", "exact"} {
+               s.test(c, listTrial{
+                       count: count,
+                       limit: -1,
+                       filters: []arvados.Filter{
+                               {"uuid", "in", []string{s.uuids[0][0], s.uuids[1][0]}},
+                               {"uuid", "is_a", "teapot"},
+                       },
+                       expectCalls:  []int{0, 0, 0},
+                       expectStatus: http.StatusBadRequest,
+               })
+       }
+}
+
+func (s *CollectionListSuite) TestCollectionListMultiSiteWithLimit(c *check.C) {
+       for _, limit := range []int{0, 1, 2} {
+               s.test(c, listTrial{
+                       count: "none",
+                       limit: limit,
+                       filters: []arvados.Filter{
+                               {"uuid", "in", []string{s.uuids[0][0], s.uuids[1][0]}},
+                               {"uuid", "is_a", "teapot"},
+                       },
+                       expectCalls:  []int{0, 0, 0},
+                       expectStatus: http.StatusBadRequest,
+               })
+       }
+}
+
+func (s *CollectionListSuite) TestCollectionListMultiSiteWithOffset(c *check.C) {
+       s.test(c, listTrial{
+               count:  "none",
+               limit:  -1,
+               offset: 1,
+               filters: []arvados.Filter{
+                       {"uuid", "in", []string{s.uuids[0][0], s.uuids[1][0]}},
+                       {"uuid", "is_a", "teapot"},
+               },
+               expectCalls:  []int{0, 0, 0},
+               expectStatus: http.StatusBadRequest,
+       })
+}
+
+func (s *CollectionListSuite) TestCollectionListMultiSiteWithOrder(c *check.C) {
+       s.test(c, listTrial{
+               count: "none",
+               limit: -1,
+               order: []string{"uuid desc"},
+               filters: []arvados.Filter{
+                       {"uuid", "in", []string{s.uuids[0][0], s.uuids[1][0]}},
+                       {"uuid", "is_a", "teapot"},
+               },
+               expectCalls:  []int{0, 0, 0},
+               expectStatus: http.StatusBadRequest,
+       })
+}
+
+func (s *CollectionListSuite) TestCollectionListInvalidFilters(c *check.C) {
+       s.test(c, listTrial{
+               count: "none",
+               limit: -1,
+               filters: []arvados.Filter{
+                       {"uuid", "in", "teapot"},
+               },
+               expectCalls:  []int{0, 0, 0},
+               expectStatus: http.StatusBadRequest,
+       })
+}
+
+func (s *CollectionListSuite) TestCollectionListRemoteUnknown(c *check.C) {
+       s.test(c, listTrial{
+               count: "none",
+               limit: -1,
+               filters: []arvados.Filter{
+                       {"uuid", "in", []string{s.uuids[0][0], "bogus-4zz18-000001111122222"}},
+               },
+               expectStatus: http.StatusNotFound,
+       })
+}
+
+func (s *CollectionListSuite) TestCollectionListRemoteError(c *check.C) {
+       s.addDirectRemote(c, "bbbbb", &arvadostest.APIStub{})
+       s.test(c, listTrial{
+               count: "none",
+               limit: -1,
+               filters: []arvados.Filter{
+                       {"uuid", "in", []string{s.uuids[0][0], s.uuids[1][0]}},
+               },
+               expectStatus: http.StatusBadGateway,
+       })
+}
+
+func (s *CollectionListSuite) test(c *check.C, trial listTrial) {
+       resp, err := s.fed.CollectionList(s.ctx, arvados.ListOptions{
+               Count:   trial.count,
+               Limit:   trial.limit,
+               Offset:  trial.offset,
+               Order:   trial.order,
+               Filters: trial.filters,
+       })
+       if trial.expectStatus != 0 {
+               c.Assert(err, check.NotNil)
+               err, _ := err.(interface{ HTTPStatus() int })
+               c.Assert(err, check.NotNil) // err must implement HTTPStatus()
+               c.Check(err.HTTPStatus(), check.Equals, trial.expectStatus)
+               c.Logf("returned error is %#v", err)
+               c.Logf("returned error string is %q", err)
+       } else {
+               c.Check(err, check.IsNil)
+               var expectItems []arvados.Collection
+               for _, uuid := range trial.expectUUIDs {
+                       expectItems = append(expectItems, arvados.Collection{UUID: uuid})
+               }
+               c.Check(resp, check.DeepEquals, arvados.CollectionList{
+                       Items: expectItems,
+               })
+       }
+
+       for i, stub := range s.backends {
+               if i >= len(trial.expectCalls) {
+                       break
+               }
+               calls := stub.Calls(nil)
+               c.Check(calls, check.HasLen, trial.expectCalls[i])
+               if len(calls) == 0 {
+                       continue
+               }
+               opts := calls[0].Options.(arvados.ListOptions)
+               c.Check(opts.Limit, check.Equals, -1)
+       }
+}
index d524195e4429a2358ea560a784acf469350f7751..852327fd89f04a2b9c67d2238e6e47219e6565ca 100644 (file)
@@ -18,6 +18,7 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/lib/config"
+       "git.curoverse.com/arvados.git/lib/controller/federation"
        "git.curoverse.com/arvados.git/lib/controller/railsproxy"
        "git.curoverse.com/arvados.git/lib/controller/router"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -91,7 +92,7 @@ func (h *Handler) setup() {
        }))
 
        if h.Cluster.EnableBetaController14287 {
-               rtr := router.New(h.Cluster)
+               rtr := router.New(federation.New(h.Cluster))
                mux.Handle("/arvados/v1/collections", rtr)
                mux.Handle("/arvados/v1/collections/", rtr)
        }
index f37c7ea9073ac51c0553ecf03c91ff4a9b1b2e92..9c2c1f3a11f6dac93eeabeb6478bea8653b9aedb 100644 (file)
@@ -10,7 +10,6 @@ import (
        "net/http"
        "strings"
 
-       "git.curoverse.com/arvados.git/lib/controller/federation"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/auth"
        "git.curoverse.com/arvados.git/sdk/go/ctxlog"
@@ -24,10 +23,10 @@ type router struct {
        fed arvados.API
 }
 
-func New(cluster *arvados.Cluster) *router {
+func New(fed arvados.API) *router {
        rtr := &router{
                mux: httprouter.New(),
-               fed: federation.New(cluster),
+               fed: fed,
        }
        rtr.addRoutes()
        return rtr
index 4e6b1617330ffa47c405d2c9e708ee69e979e641..3a7045aa4de2681f53af5645d7008d290b035321 100644 (file)
@@ -10,11 +10,13 @@ import (
        "io"
        "net/http"
        "net/http/httptest"
+       "net/url"
        "os"
        "strings"
        "testing"
        "time"
 
+       "git.curoverse.com/arvados.git/lib/controller/rpc"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        "github.com/julienschmidt/httprouter"
@@ -158,7 +160,8 @@ func (s *RouterIntegrationSuite) SetUpTest(c *check.C) {
        cluster := &arvados.Cluster{}
        cluster.TLS.Insecure = true
        arvadostest.SetServiceURL(&cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
-       s.rtr = New(cluster)
+       url, _ := url.Parse("https://" + os.Getenv("ARVADOS_TEST_API_HOST"))
+       s.rtr = New(rpc.NewConn("zzzzz", url, true, rpc.PassthroughTokenProvider))
 }
 
 func (s *RouterIntegrationSuite) TearDownSuite(c *check.C) {
index e07eaf40affbe3ec0dc0d78422686926eea550c0..ea3d6fb2dd6e4a537c2a50396f75e02508860c53 100644 (file)
@@ -8,6 +8,7 @@ import (
        "context"
        "crypto/tls"
        "encoding/json"
+       "errors"
        "fmt"
        "io"
        "net"
@@ -17,10 +18,19 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/auth"
 )
 
 type TokenProvider func(context.Context) ([]string, error)
 
+func PassthroughTokenProvider(ctx context.Context) ([]string, error) {
+       if incoming, ok := auth.FromContext(ctx); !ok {
+               return nil, errors.New("no token provided")
+       } else {
+               return incoming.Tokens, nil
+       }
+}
+
 type Conn struct {
        clusterID     string
        httpClient    http.Client
index a3cacf3f6c5a12911dc9107d25b2a6cb6edc1be1..77a26bcba75526771cdd623a42270f8646f721a4 100644 (file)
@@ -7,6 +7,8 @@ package arvadostest
 import (
        "context"
        "errors"
+       "reflect"
+       "runtime"
        "sync"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -121,7 +123,9 @@ func (as *APIStub) Calls(method interface{}) []APIStubCall {
        defer as.mtx.Unlock()
        var calls []APIStubCall
        for _, call := range as.calls {
-               if method == nil || call.Method == method {
+
+               if method == nil || (runtime.FuncForPC(reflect.ValueOf(call.Method).Pointer()).Name() ==
+                       runtime.FuncForPC(reflect.ValueOf(method).Pointer()).Name()) {
                        calls = append(calls, call)
                }
        }
index b222e18ea1159e67b9069c086207dbc3585c8e26..f1817d3374ae11e07f4479de77b1b1b67e4b3cf9 100644 (file)
@@ -9,6 +9,19 @@ import (
        "net/http"
 )
 
+func ErrorWithStatus(err error, status int) error {
+       return errorWithStatus{err, status}
+}
+
+type errorWithStatus struct {
+       error
+       Status int
+}
+
+func (ews errorWithStatus) HTTPStatus() int {
+       return ews.Status
+}
+
 type ErrorResponse struct {
        Errors []string `json:"errors"`
 }