13752: Merge branch 'master' into 13752-migrate-index-data
authorTom Clegg <tclegg@veritasgenetics.com>
Tue, 18 Sep 2018 19:43:28 +0000 (15:43 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Tue, 18 Sep 2018 19:43:28 +0000 (15:43 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

16 files changed:
sdk/go/arvados/integration_test_cluster.go [new file with mode: 0644]
sdk/go/arvadostest/fixtures.go
sdk/go/keepclient/discover.go
sdk/go/keepclient/support.go
services/api/app/middlewares/arvados_api_token.rb
services/api/app/models/api_client_authorization.rb
services/api/app/models/collection.rb
services/api/test/functional/arvados/v1/collections_controller_test.rb
services/api/test/test_helper.rb
services/api/test/unit/collection_performance_test.rb
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/mounts_test.go
services/keepstore/proxy_remote.go [new file with mode: 0644]
services/keepstore/proxy_remote_test.go [new file with mode: 0644]

diff --git a/sdk/go/arvados/integration_test_cluster.go b/sdk/go/arvados/integration_test_cluster.go
new file mode 100644 (file)
index 0000000..ebf93f8
--- /dev/null
@@ -0,0 +1,25 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+       "os"
+       "path/filepath"
+)
+
+// IntegrationTestCluster returns the cluster that has been set up by
+// the integration test framework (see /build/run-tests.sh). It panics
+// on error.
+func IntegrationTestCluster() *Cluster {
+       config, err := GetConfig(filepath.Join(os.Getenv("WORKSPACE"), "tmp", "arvados.yml"))
+       if err != nil {
+               panic(err)
+       }
+       cluster, err := config.GetCluster("")
+       if err != nil {
+               panic(err)
+       }
+       return cluster
+}
index 6a4b6232aceb82754dbee606504a8608ba96d054..e6984601f40819759ee43c8dc463afa2655f64eb 100644 (file)
@@ -8,6 +8,7 @@ package arvadostest
 const (
        SpectatorToken          = "zw2f4gwx8hw8cjre7yp6v1zylhrhn3m5gvjq73rtpwhmknrybu"
        ActiveToken             = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
+       ActiveTokenV2           = "v2/zzzzz-gj3su-077z32aux8dg2s1/3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
        AdminToken              = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
        AnonymousToken          = "4kg6k6lzmp9kj4cpkcoxie964cmvjahbt4fod9zru44k4jqdmi"
        DataManagerToken        = "320mkve8qkswstz7ff61glpk3mhgghmg67wmic7elw4z41pke1"
index 2140dceabbdfb320addb6193b0efcc4969b7cc0f..4377c1951528c4eab88a3525934669c9457f954c 100644 (file)
@@ -22,9 +22,15 @@ import (
 func RefreshServiceDiscovery() {
        svcListCacheMtx.Lock()
        defer svcListCacheMtx.Unlock()
+       var wg sync.WaitGroup
        for _, ent := range svcListCache {
-               ent.clear <- struct{}{}
+               wg.Add(1)
+               go func() {
+                       ent.clear <- struct{}{}
+                       wg.Done()
+               }()
        }
+       wg.Wait()
 }
 
 // ClearCacheOnSIGHUP installs a signal handler that calls
index 542827f5e0d83c5d074942ef4546955e59b46ba5..e589593fa8c7e9a8a6e9af7297896f48339ca780 100644 (file)
@@ -87,7 +87,7 @@ func (this *KeepClient) uploadToKeepServer(host string, hash string, body io.Rea
        var resp *http.Response
        if resp, err = this.httpClient().Do(req); err != nil {
                DebugPrintf("DEBUG: [%s] Upload failed %v error: %v", reqid, url, err.Error())
-               upload_status <- uploadStatus{err, url, 0, 0, ""}
+               upload_status <- uploadStatus{err, url, 0, 0, err.Error()}
                return
        }
 
index 4098fd72ca436bdf3ee806b5a0dfb938fe7a5a9b..acdc4858118fcb4c3fd5be1a1a65208ed72ff530 100644 (file)
@@ -39,6 +39,7 @@ class ArvadosApiToken
     # Set current_user etc. based on the primary session token if a
     # valid one is present. Otherwise, use the first valid token in
     # reader_tokens.
+    accepted = false
     auth = nil
     [params["api_token"],
      params["oauth_token"],
@@ -50,6 +51,7 @@ class ArvadosApiToken
                  validate(token: supplied, remote: remote)
       if try_auth.andand.user
         auth = try_auth
+        accepted = supplied
         break
       end
     end
@@ -58,6 +60,7 @@ class ArvadosApiToken
     Thread.current[:api_client_authorization] = auth
     Thread.current[:api_client_uuid] = auth.andand.api_client.andand.uuid
     Thread.current[:api_client] = auth.andand.api_client
+    Thread.current[:token] = accepted
     Thread.current[:user] = auth.andand.user
 
     @app.call env if @app
index 8ea9f7bd885a396541b2e1db9f6c9c55688ba870..12ef8eb3eb5a2abede54c35919a8b72c815a357c 100644 (file)
@@ -204,6 +204,18 @@ class ApiClientAuthorization < ArvadosModel
     return nil
   end
 
+  def token
+    v2token
+  end
+
+  def v1token
+    api_token
+  end
+
+  def v2token
+    'v2/' + uuid + '/' + api_token
+  end
+
   protected
 
   def permission_to_create
index 7e561712abb732aff7945132fb15bd77a704405e..525a80b9eef45871e6313861cb2accb0ae8a6680 100644 (file)
@@ -94,7 +94,7 @@ class Collection < ArvadosModel
       # Check permissions on the collection manifest.
       # If any signature cannot be verified, raise PermissionDeniedError
       # which will return 403 Permission denied to the client.
-      api_token = current_api_client_authorization.andand.api_token
+      api_token = Thread.current[:token]
       signing_opts = {
         api_token: api_token,
         now: @validation_timestamp.to_i,
@@ -249,7 +249,7 @@ class Collection < ArvadosModel
     elsif is_trashed
       return manifest_text
     else
-      token = current_api_client_authorization.andand.api_token
+      token = Thread.current[:token]
       exp = [db_current_time.to_i + Rails.configuration.blob_signature_ttl,
              trash_at].compact.map(&:to_i).min
       self.class.sign_manifest manifest_text, token, exp
index e6ecea219b9da1ea4c71fee07dddf025aa78aab3..98c4bd11e44db845e99a881d1239c6d9f0ddb87e 100644 (file)
@@ -14,11 +14,21 @@ class Arvados::V1::CollectionsControllerTest < ActionController::TestCase
     Rails.configuration.permit_create_collection_with_unsigned_manifest = isok
   end
 
-  def assert_signed_manifest manifest_text, label=''
+  def assert_signed_manifest manifest_text, label='', token: false
     assert_not_nil manifest_text, "#{label} manifest_text was nil"
     manifest_text.scan(/ [[:xdigit:]]{32}\S*/) do |tok|
       assert_match(PERM_TOKEN_RE, tok,
                    "Locator in #{label} manifest_text was not signed")
+      if token
+        bare = tok.gsub(/\+A[^\+]*/, '').sub(/^ /, '')
+        exp = tok[/\+A[[:xdigit:]]+@([[:xdigit:]]+)/, 1].to_i(16)
+        sig = Blob.sign_locator(
+          bare,
+          key: Rails.configuration.blob_signing_key,
+          expire: exp,
+          api_token: token)[/\+A[^\+]*/, 0]
+        assert_includes tok, sig
+      end
     end
   end
 
@@ -52,6 +62,33 @@ class Arvados::V1::CollectionsControllerTest < ActionController::TestCase
     refute_includes json_response, 'unsigned_manifest_text'
   end
 
+  ['v1token', 'v2token'].each do |token_method|
+    test "correct signatures are given for #{token_method}" do
+      token = api_client_authorizations(:active).send(token_method)
+      authorize_with_token token
+      get :show, {id: collections(:foo_file).uuid}
+      assert_response :success
+      assert_signed_manifest json_response['manifest_text'], 'foo_file', token: token
+    end
+
+    test "signatures with #{token_method} are accepted" do
+      token = api_client_authorizations(:active).send(token_method)
+      signed = Blob.sign_locator(
+        'acbd18db4cc2f85cedef654fccc4a4d8+3',
+        key: Rails.configuration.blob_signing_key,
+        api_token: token)
+      authorize_with_token token
+      put :update, {
+            id: collections(:collection_owned_by_active).uuid,
+            collection: {
+              manifest_text: ". #{signed} 0:3:foo.txt\n",
+            },
+          }
+      assert_response :success
+      assert_signed_manifest json_response['manifest_text'], 'updated', token: token
+    end
+  end
+
   test "index with manifest_text selected returns signed locators" do
     columns = %w(uuid owner_uuid manifest_text)
     authorize_with :active
index 6dbaa7550f55a8e49b035e6092c331304c6e4edb..73b45f95ec71a7b28564c8a5767eb48503ec5465 100644 (file)
@@ -65,6 +65,7 @@ class ActiveSupport::TestCase
     Thread.current[:api_client_authorization] = nil
     Thread.current[:api_client_uuid] = nil
     Thread.current[:api_client] = nil
+    Thread.current[:token] = nil
     Thread.current[:user] = nil
     restore_configuration
   end
@@ -110,6 +111,7 @@ class ActiveSupport::TestCase
     Thread.current[:api_client_authorization] = client_auth
     Thread.current[:api_client] = client_auth.api_client
     Thread.current[:user] = client_auth.user
+    Thread.current[:token] = client_auth.token
   end
 
   def expect_json
@@ -188,6 +190,7 @@ class ActionDispatch::IntegrationTest
     Thread.current[:api_client_authorization] = nil
     Thread.current[:api_client_uuid] = nil
     Thread.current[:api_client] = nil
+    Thread.current[:token] = nil
     Thread.current[:user] = nil
   end
 end
index 475202058e8987d06ad091f888087cfcbfa133d1..4efc947dd251d131b6746ca0059be95ac459db8a 100644 (file)
@@ -12,12 +12,11 @@ class CollectionModelPerformanceTest < ActiveSupport::TestCase
   setup do
     # The Collection model needs to have a current token, not just a
     # current user, to sign & verify manifests:
-    Thread.current[:api_client_authorization] =
-      api_client_authorizations(:active)
+    Thread.current[:token] = api_client_authorizations(:active).token
   end
 
   teardown do
-    Thread.current[:api_client_authorization] = nil
+    Thread.current[:token] = nil
   end
 
   # "crrud" == "create read render update delete", not a typo
@@ -27,7 +26,7 @@ class CollectionModelPerformanceTest < ActiveSupport::TestCase
                     files_per_stream: 100,
                     blocks_per_file: 20,
                     bytes_per_block: 2**26,
-                    api_token: api_token(:active))
+                    api_token: api_client_authorizations(:active).token)
     end
     act_as_user users(:active) do
       c = time_block "new (manifest_text is #{bigmanifest.length>>20}MiB)" do
@@ -50,7 +49,7 @@ class CollectionModelPerformanceTest < ActiveSupport::TestCase
         c.as_api_response(nil)
       end
       loc = Blob.sign_locator(Digest::MD5.hexdigest('foo') + '+3',
-                              api_token: api_token(:active))
+                              api_token: api_client_authorizations(:active).token)
       # Note Collection's strip_manifest_text method has now removed
       # the signatures from c.manifest_text, so we have to start from
       # bigmanifest again here instead of just appending with "+=".
index f012ea3902217c55502d7aebe12b072d004985ec..c37a4d112fb8b86aaa076431f08524930ce83d0b 100644 (file)
@@ -30,6 +30,10 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
 )
 
+var testCluster = &arvados.Cluster{
+       ClusterID: "zzzzz",
+}
+
 // A RequestTester represents the parameters for an HTTP request to
 // be issued on behalf of a unit test.
 type RequestTester struct {
@@ -823,7 +827,7 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
        if rt.apiToken != "" {
                req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
        }
-       loggingRouter := MakeRESTRouter()
+       loggingRouter := MakeRESTRouter(testCluster)
        loggingRouter.ServeHTTP(response, req)
        return response
 }
@@ -835,7 +839,7 @@ func IssueHealthCheckRequest(rt *RequestTester) *httptest.ResponseRecorder {
        if rt.apiToken != "" {
                req.Header.Set("Authorization", "Bearer "+rt.apiToken)
        }
-       loggingRouter := MakeRESTRouter()
+       loggingRouter := MakeRESTRouter(testCluster)
        loggingRouter.ServeHTTP(response, req)
        return response
 }
@@ -975,7 +979,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
        ok := make(chan struct{})
        go func() {
                req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
-               MakeRESTRouter().ServeHTTP(resp, req)
+               MakeRESTRouter(testCluster).ServeHTTP(resp, req)
                ok <- struct{}{}
        }()
 
index c31ab9c2e38fde497f451e95c2a99437735e4455..2426c9cbdacd4044a4c2fab06f4ee51edef2b4e9 100644 (file)
@@ -4,13 +4,6 @@
 
 package main
 
-// REST handlers for Keep are implemented here.
-//
-// GetBlockHandler (GET /locator)
-// PutBlockHandler (PUT /locator)
-// IndexHandler    (GET /index, GET /index/prefix)
-// StatusHandler   (GET /status.json)
-
 import (
        "container/list"
        "context"
@@ -29,27 +22,33 @@ import (
 
        "github.com/gorilla/mux"
 
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/health"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
 )
 
 type router struct {
        *mux.Router
-       limiter httpserver.RequestCounter
+       limiter     httpserver.RequestCounter
+       cluster     *arvados.Cluster
+       remoteProxy remoteProxy
 }
 
 // MakeRESTRouter returns a new router that forwards all Keep requests
 // to the appropriate handlers.
-func MakeRESTRouter() http.Handler {
-       rtr := &router{Router: mux.NewRouter()}
+func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
+       rtr := &router{
+               Router:  mux.NewRouter(),
+               cluster: cluster,
+       }
 
        rtr.HandleFunc(
-               `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
+               `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
        rtr.HandleFunc(
                `/{hash:[0-9a-f]{32}}+{hints}`,
-               GetBlockHandler).Methods("GET", "HEAD")
+               rtr.handleGET).Methods("GET", "HEAD")
 
-       rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
+       rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
        rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
        // List all blocks stored here. Privileged client only.
        rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
@@ -98,11 +97,16 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
        http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
 }
 
-// GetBlockHandler is a HandleFunc to address Get block requests.
-func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
        ctx, cancel := contextForResponse(context.TODO(), resp)
        defer cancel()
 
+       locator := req.URL.Path[1:]
+       if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
+               rtr.remoteProxy.Get(resp, req, rtr.cluster)
+               return
+       }
+
        if theConfig.RequireSignatures {
                locator := req.URL.Path[1:] // strip leading slash
                if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
@@ -177,8 +181,7 @@ func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([
        }
 }
 
-// PutBlockHandler is a HandleFunc to address Put block requests.
-func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
        ctx, cancel := contextForResponse(context.TODO(), resp)
        defer cancel()
 
@@ -826,7 +829,7 @@ func IsValidLocator(loc string) bool {
        return validLocatorRe.MatchString(loc)
 }
 
-var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
+var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
 
 // GetAPIToken returns the OAuth2 token from the Authorization
 // header of a HTTP request, or an empty string if no matching
@@ -834,7 +837,7 @@ var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
 func GetAPIToken(req *http.Request) string {
        if auth, ok := req.Header["Authorization"]; ok {
                if match := authRe.FindStringSubmatch(auth[0]); match != nil {
-                       return match[1]
+                       return match[2]
                }
        }
        return ""
index 79e3017d55a8f7e108ee8d6b2d7effc3950a7260..6ae414bf931ce9164f7beefcc0d9be294da6e9c5 100644 (file)
@@ -13,6 +13,7 @@ import (
        "syscall"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/config"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
@@ -149,6 +150,22 @@ func main() {
                }
        }
 
+       var cluster *arvados.Cluster
+       cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
+       if err != nil && os.IsNotExist(err) {
+               log.Warnf("DEPRECATED: proceeding without cluster configuration file %q (%s)", arvados.DefaultConfigFile, err)
+               cluster = &arvados.Cluster{
+                       ClusterID: "xxxxx",
+               }
+       } else if err != nil {
+               log.Fatalf("load config %q: %s", arvados.DefaultConfigFile, err)
+       } else {
+               cluster, err = cfg.GetCluster("")
+               if err != nil {
+                       log.Fatalf("config error in %q: %s", arvados.DefaultConfigFile, err)
+               }
+       }
+
        log.Println("keepstore starting, pid", os.Getpid())
        defer log.Println("keepstore exiting, pid", os.Getpid())
 
@@ -156,7 +173,7 @@ func main() {
        KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
        // Middleware/handler stack
-       router := MakeRESTRouter()
+       router := MakeRESTRouter(cluster)
 
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", theConfig.Listen)
index 0f7b6e97351a1a6049cb1ef49db69dcb1f29a6d7..9fa0090aa739be1d640b0d2ba3a693a659087284 100644 (file)
@@ -28,7 +28,7 @@ func (s *MountsSuite) SetUpTest(c *check.C) {
        theConfig = DefaultConfig()
        theConfig.systemAuthToken = arvadostest.DataManagerToken
        theConfig.Start()
-       s.rtr = MakeRESTRouter()
+       s.rtr = MakeRESTRouter(testCluster)
 }
 
 func (s *MountsSuite) TearDownTest(c *check.C) {
diff --git a/services/keepstore/proxy_remote.go b/services/keepstore/proxy_remote.go
new file mode 100644 (file)
index 0000000..2e3d663
--- /dev/null
@@ -0,0 +1,113 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "io"
+       "net/http"
+       "strings"
+       "sync"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/auth"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
+
+type remoteProxy struct {
+       clients map[string]*keepclient.KeepClient
+       mtx     sync.Mutex
+}
+
+func (rp *remoteProxy) Get(w http.ResponseWriter, r *http.Request, cluster *arvados.Cluster) {
+       var remoteClient *keepclient.KeepClient
+       var parts []string
+       for i, part := range strings.Split(r.URL.Path[1:], "+") {
+               switch {
+               case i == 0:
+                       // don't try to parse hash part as hint
+               case strings.HasPrefix(part, "A"):
+                       // drop local permission hint
+                       continue
+               case len(part) > 7 && part[0] == 'R' && part[6] == '-':
+                       remoteID := part[1:6]
+                       remote, ok := cluster.RemoteClusters[remoteID]
+                       if !ok {
+                               http.Error(w, "remote cluster not configured", http.StatusBadGateway)
+                               return
+                       }
+                       token := GetAPIToken(r)
+                       if token == "" {
+                               http.Error(w, "no token provided in Authorization header", http.StatusUnauthorized)
+                               return
+                       }
+                       kc, err := rp.remoteClient(remoteID, remote, token)
+                       if err == auth.ErrObsoleteToken {
+                               http.Error(w, err.Error(), http.StatusBadRequest)
+                               return
+                       } else if err != nil {
+                               http.Error(w, err.Error(), http.StatusInternalServerError)
+                               return
+                       }
+                       remoteClient = kc
+                       part = "A" + part[7:]
+               }
+               parts = append(parts, part)
+       }
+       if remoteClient == nil {
+               http.Error(w, "bad request", http.StatusBadRequest)
+               return
+       }
+       locator := strings.Join(parts, "+")
+       rdr, _, _, err := remoteClient.Get(locator)
+       switch err.(type) {
+       case nil:
+               defer rdr.Close()
+               io.Copy(w, rdr)
+       case *keepclient.ErrNotFound:
+               http.Error(w, err.Error(), http.StatusNotFound)
+       default:
+               http.Error(w, err.Error(), http.StatusBadGateway)
+       }
+}
+
+func (rp *remoteProxy) remoteClient(remoteID string, remoteCluster arvados.RemoteCluster, token string) (*keepclient.KeepClient, error) {
+       rp.mtx.Lock()
+       kc, ok := rp.clients[remoteID]
+       rp.mtx.Unlock()
+       if !ok {
+               c := &arvados.Client{
+                       APIHost:   remoteCluster.Host,
+                       AuthToken: "xxx",
+                       Insecure:  remoteCluster.Insecure,
+               }
+               ac, err := arvadosclient.New(c)
+               if err != nil {
+                       return nil, err
+               }
+               kc, err = keepclient.MakeKeepClient(ac)
+               if err != nil {
+                       return nil, err
+               }
+
+               rp.mtx.Lock()
+               if rp.clients == nil {
+                       rp.clients = map[string]*keepclient.KeepClient{remoteID: kc}
+               } else {
+                       rp.clients[remoteID] = kc
+               }
+               rp.mtx.Unlock()
+       }
+       accopy := *kc.Arvados
+       accopy.ApiToken = token
+       kccopy := *kc
+       kccopy.Arvados = &accopy
+       token, err := auth.SaltToken(token, remoteID)
+       if err != nil {
+               return nil, err
+       }
+       kccopy.Arvados.ApiToken = token
+       return &kccopy, nil
+}
diff --git a/services/keepstore/proxy_remote_test.go b/services/keepstore/proxy_remote_test.go
new file mode 100644 (file)
index 0000000..4c50513
--- /dev/null
@@ -0,0 +1,149 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "crypto/md5"
+       "encoding/json"
+       "fmt"
+       "net"
+       "net/http"
+       "net/http/httptest"
+       "strconv"
+       "strings"
+       "sync/atomic"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "git.curoverse.com/arvados.git/sdk/go/auth"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&ProxyRemoteSuite{})
+
+type ProxyRemoteSuite struct {
+       cluster *arvados.Cluster
+       vm      VolumeManager
+       rtr     http.Handler
+
+       remoteClusterID      string
+       remoteBlobSigningKey []byte
+       remoteKeepLocator    string
+       remoteKeepData       []byte
+       remoteKeepproxy      *httptest.Server
+       remoteKeepRequests   int64
+       remoteAPI            *httptest.Server
+}
+
+func (s *ProxyRemoteSuite) remoteKeepproxyHandler(w http.ResponseWriter, r *http.Request) {
+       expectToken, err := auth.SaltToken(arvadostest.ActiveTokenV2, s.remoteClusterID)
+       if err != nil {
+               panic(err)
+       }
+       atomic.AddInt64(&s.remoteKeepRequests, 1)
+       var token string
+       if auth := strings.Split(r.Header.Get("Authorization"), " "); len(auth) == 2 && (auth[0] == "OAuth2" || auth[0] == "Bearer") {
+               token = auth[1]
+       }
+       if r.Method == "GET" && r.URL.Path == "/"+s.remoteKeepLocator && token == expectToken {
+               w.Write(s.remoteKeepData)
+               return
+       }
+       http.Error(w, "404", 404)
+}
+
+func (s *ProxyRemoteSuite) remoteAPIHandler(w http.ResponseWriter, r *http.Request) {
+       host, port, _ := net.SplitHostPort(strings.Split(s.remoteKeepproxy.URL, "//")[1])
+       portnum, _ := strconv.Atoi(port)
+       if r.URL.Path == "/arvados/v1/discovery/v1/rest" {
+               json.NewEncoder(w).Encode(arvados.DiscoveryDocument{})
+               return
+       }
+       if r.URL.Path == "/arvados/v1/keep_services/accessible" {
+               json.NewEncoder(w).Encode(arvados.KeepServiceList{
+                       Items: []arvados.KeepService{
+                               {
+                                       UUID:           s.remoteClusterID + "-bi6l4-proxyproxyproxy",
+                                       ServiceType:    "proxy",
+                                       ServiceHost:    host,
+                                       ServicePort:    portnum,
+                                       ServiceSSLFlag: false,
+                               },
+                       },
+               })
+               return
+       }
+       http.Error(w, "404", 404)
+}
+
+func (s *ProxyRemoteSuite) SetUpTest(c *check.C) {
+       s.remoteClusterID = "z0000"
+       s.remoteBlobSigningKey = []byte("3b6df6fb6518afe12922a5bc8e67bf180a358bc8")
+       s.remoteKeepproxy = httptest.NewServer(http.HandlerFunc(s.remoteKeepproxyHandler))
+       s.remoteAPI = httptest.NewUnstartedServer(http.HandlerFunc(s.remoteAPIHandler))
+       s.remoteAPI.StartTLS()
+       s.cluster = arvados.IntegrationTestCluster()
+       s.cluster.RemoteClusters = map[string]arvados.RemoteCluster{
+               s.remoteClusterID: arvados.RemoteCluster{
+                       Host:     strings.Split(s.remoteAPI.URL, "//")[1],
+                       Proxy:    true,
+                       Scheme:   "http",
+                       Insecure: true,
+               },
+       }
+       s.vm = MakeTestVolumeManager(2)
+       KeepVM = s.vm
+       theConfig = DefaultConfig()
+       theConfig.systemAuthToken = arvadostest.DataManagerToken
+       theConfig.Start()
+       s.rtr = MakeRESTRouter(s.cluster)
+}
+
+func (s *ProxyRemoteSuite) TearDownTest(c *check.C) {
+       s.vm.Close()
+       KeepVM = nil
+       theConfig = DefaultConfig()
+       theConfig.Start()
+       s.remoteAPI.Close()
+       s.remoteKeepproxy.Close()
+}
+
+func (s *ProxyRemoteSuite) TestProxyRemote(c *check.C) {
+       data := []byte("foo bar")
+       s.remoteKeepData = data
+       locator := fmt.Sprintf("%x+%d", md5.Sum(data), len(data))
+       s.remoteKeepLocator = keepclient.SignLocator(locator, arvadostest.ActiveTokenV2, time.Now().Add(time.Minute), time.Minute, s.remoteBlobSigningKey)
+
+       path := "/" + strings.Replace(s.remoteKeepLocator, "+A", "+R"+s.remoteClusterID+"-", 1)
+
+       var req *http.Request
+       var resp *httptest.ResponseRecorder
+       tryWithToken := func(token string) {
+               req = httptest.NewRequest("GET", path, nil)
+               req.Header.Set("Authorization", "Bearer "+token)
+               resp = httptest.NewRecorder()
+               s.rtr.ServeHTTP(resp, req)
+       }
+
+       // Happy path
+       tryWithToken(arvadostest.ActiveTokenV2)
+       c.Check(s.remoteKeepRequests, check.Equals, int64(1))
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       c.Check(resp.Body.String(), check.Equals, string(data))
+
+       // Obsolete token
+       tryWithToken(arvadostest.ActiveToken)
+       c.Check(s.remoteKeepRequests, check.Equals, int64(1))
+       c.Check(resp.Code, check.Equals, http.StatusBadRequest)
+       c.Check(resp.Body.String(), check.Not(check.Equals), string(data))
+
+       // Bad token
+       tryWithToken(arvadostest.ActiveTokenV2[:len(arvadostest.ActiveTokenV2)-3] + "xxx")
+       c.Check(s.remoteKeepRequests, check.Equals, int64(2))
+       c.Check(resp.Code, check.Equals, http.StatusNotFound)
+       c.Check(resp.Body.String(), check.Not(check.Equals), string(data))
+}