13994: Proxy to remote cluster if +R hint given.
authorTom Clegg <tclegg@veritasgenetics.com>
Tue, 11 Sep 2018 16:54:41 +0000 (12:54 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Wed, 12 Sep 2018 20:10:51 +0000 (16:10 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

sdk/go/arvadostest/fixtures.go
sdk/go/arvadostest/integration_test_cluster.go [new file with mode: 0644]
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/mounts_test.go
services/keepstore/proxy_remote.go [new file with mode: 0644]
services/keepstore/proxy_remote_test.go [new file with mode: 0644]

index 6a4b6232aceb82754dbee606504a8608ba96d054..e6984601f40819759ee43c8dc463afa2655f64eb 100644 (file)
@@ -8,6 +8,7 @@ package arvadostest
 const (
        SpectatorToken          = "zw2f4gwx8hw8cjre7yp6v1zylhrhn3m5gvjq73rtpwhmknrybu"
        ActiveToken             = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
+       ActiveTokenV2           = "v2/zzzzz-gj3su-077z32aux8dg2s1/3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
        AdminToken              = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
        AnonymousToken          = "4kg6k6lzmp9kj4cpkcoxie964cmvjahbt4fod9zru44k4jqdmi"
        DataManagerToken        = "320mkve8qkswstz7ff61glpk3mhgghmg67wmic7elw4z41pke1"
diff --git a/sdk/go/arvadostest/integration_test_cluster.go b/sdk/go/arvadostest/integration_test_cluster.go
new file mode 100644 (file)
index 0000000..ac08ce1
--- /dev/null
@@ -0,0 +1,21 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvadostest
+
+import (
+       "os"
+       "path/filepath"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       check "gopkg.in/check.v1"
+)
+
+func IntegrationTestCluster(c *check.C) *arvados.Cluster {
+       config, err := arvados.GetConfig(filepath.Join(os.Getenv("WORKSPACE"), "tmp", "arvados.yml"))
+       c.Assert(err, check.IsNil)
+       cluster, err := config.GetCluster("")
+       c.Assert(err, check.IsNil)
+       return cluster
+}
index f012ea3902217c55502d7aebe12b072d004985ec..c37a4d112fb8b86aaa076431f08524930ce83d0b 100644 (file)
@@ -30,6 +30,10 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
 )
 
+var testCluster = &arvados.Cluster{
+       ClusterID: "zzzzz",
+}
+
 // A RequestTester represents the parameters for an HTTP request to
 // be issued on behalf of a unit test.
 type RequestTester struct {
@@ -823,7 +827,7 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
        if rt.apiToken != "" {
                req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
        }
-       loggingRouter := MakeRESTRouter()
+       loggingRouter := MakeRESTRouter(testCluster)
        loggingRouter.ServeHTTP(response, req)
        return response
 }
@@ -835,7 +839,7 @@ func IssueHealthCheckRequest(rt *RequestTester) *httptest.ResponseRecorder {
        if rt.apiToken != "" {
                req.Header.Set("Authorization", "Bearer "+rt.apiToken)
        }
-       loggingRouter := MakeRESTRouter()
+       loggingRouter := MakeRESTRouter(testCluster)
        loggingRouter.ServeHTTP(response, req)
        return response
 }
@@ -975,7 +979,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
        ok := make(chan struct{})
        go func() {
                req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
-               MakeRESTRouter().ServeHTTP(resp, req)
+               MakeRESTRouter(testCluster).ServeHTTP(resp, req)
                ok <- struct{}{}
        }()
 
index c31ab9c2e38fde497f451e95c2a99437735e4455..2426c9cbdacd4044a4c2fab06f4ee51edef2b4e9 100644 (file)
@@ -4,13 +4,6 @@
 
 package main
 
-// REST handlers for Keep are implemented here.
-//
-// GetBlockHandler (GET /locator)
-// PutBlockHandler (PUT /locator)
-// IndexHandler    (GET /index, GET /index/prefix)
-// StatusHandler   (GET /status.json)
-
 import (
        "container/list"
        "context"
@@ -29,27 +22,33 @@ import (
 
        "github.com/gorilla/mux"
 
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/health"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
 )
 
 type router struct {
        *mux.Router
-       limiter httpserver.RequestCounter
+       limiter     httpserver.RequestCounter
+       cluster     *arvados.Cluster
+       remoteProxy remoteProxy
 }
 
 // MakeRESTRouter returns a new router that forwards all Keep requests
 // to the appropriate handlers.
-func MakeRESTRouter() http.Handler {
-       rtr := &router{Router: mux.NewRouter()}
+func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
+       rtr := &router{
+               Router:  mux.NewRouter(),
+               cluster: cluster,
+       }
 
        rtr.HandleFunc(
-               `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
+               `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
        rtr.HandleFunc(
                `/{hash:[0-9a-f]{32}}+{hints}`,
-               GetBlockHandler).Methods("GET", "HEAD")
+               rtr.handleGET).Methods("GET", "HEAD")
 
-       rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
+       rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
        rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
        // List all blocks stored here. Privileged client only.
        rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
@@ -98,11 +97,16 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
        http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
 }
 
-// GetBlockHandler is a HandleFunc to address Get block requests.
-func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
        ctx, cancel := contextForResponse(context.TODO(), resp)
        defer cancel()
 
+       locator := req.URL.Path[1:]
+       if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
+               rtr.remoteProxy.Get(resp, req, rtr.cluster)
+               return
+       }
+
        if theConfig.RequireSignatures {
                locator := req.URL.Path[1:] // strip leading slash
                if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
@@ -177,8 +181,7 @@ func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([
        }
 }
 
-// PutBlockHandler is a HandleFunc to address Put block requests.
-func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
        ctx, cancel := contextForResponse(context.TODO(), resp)
        defer cancel()
 
@@ -826,7 +829,7 @@ func IsValidLocator(loc string) bool {
        return validLocatorRe.MatchString(loc)
 }
 
-var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
+var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
 
 // GetAPIToken returns the OAuth2 token from the Authorization
 // header of a HTTP request, or an empty string if no matching
@@ -834,7 +837,7 @@ var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
 func GetAPIToken(req *http.Request) string {
        if auth, ok := req.Header["Authorization"]; ok {
                if match := authRe.FindStringSubmatch(auth[0]); match != nil {
-                       return match[1]
+                       return match[2]
                }
        }
        return ""
index 79e3017d55a8f7e108ee8d6b2d7effc3950a7260..6ae414bf931ce9164f7beefcc0d9be294da6e9c5 100644 (file)
@@ -13,6 +13,7 @@ import (
        "syscall"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/config"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
@@ -149,6 +150,22 @@ func main() {
                }
        }
 
+       var cluster *arvados.Cluster
+       cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
+       if err != nil && os.IsNotExist(err) {
+               log.Warnf("DEPRECATED: proceeding without cluster configuration file %q (%s)", arvados.DefaultConfigFile, err)
+               cluster = &arvados.Cluster{
+                       ClusterID: "xxxxx",
+               }
+       } else if err != nil {
+               log.Fatalf("load config %q: %s", arvados.DefaultConfigFile, err)
+       } else {
+               cluster, err = cfg.GetCluster("")
+               if err != nil {
+                       log.Fatalf("config error in %q: %s", arvados.DefaultConfigFile, err)
+               }
+       }
+
        log.Println("keepstore starting, pid", os.Getpid())
        defer log.Println("keepstore exiting, pid", os.Getpid())
 
@@ -156,7 +173,7 @@ func main() {
        KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
        // Middleware/handler stack
-       router := MakeRESTRouter()
+       router := MakeRESTRouter(cluster)
 
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", theConfig.Listen)
index 0f7b6e97351a1a6049cb1ef49db69dcb1f29a6d7..9fa0090aa739be1d640b0d2ba3a693a659087284 100644 (file)
@@ -28,7 +28,7 @@ func (s *MountsSuite) SetUpTest(c *check.C) {
        theConfig = DefaultConfig()
        theConfig.systemAuthToken = arvadostest.DataManagerToken
        theConfig.Start()
-       s.rtr = MakeRESTRouter()
+       s.rtr = MakeRESTRouter(testCluster)
 }
 
 func (s *MountsSuite) TearDownTest(c *check.C) {
diff --git a/services/keepstore/proxy_remote.go b/services/keepstore/proxy_remote.go
new file mode 100644 (file)
index 0000000..2e3d663
--- /dev/null
@@ -0,0 +1,113 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "io"
+       "net/http"
+       "strings"
+       "sync"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/auth"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
+
+type remoteProxy struct {
+       clients map[string]*keepclient.KeepClient
+       mtx     sync.Mutex
+}
+
+func (rp *remoteProxy) Get(w http.ResponseWriter, r *http.Request, cluster *arvados.Cluster) {
+       var remoteClient *keepclient.KeepClient
+       var parts []string
+       for i, part := range strings.Split(r.URL.Path[1:], "+") {
+               switch {
+               case i == 0:
+                       // don't try to parse hash part as hint
+               case strings.HasPrefix(part, "A"):
+                       // drop local permission hint
+                       continue
+               case len(part) > 7 && part[0] == 'R' && part[6] == '-':
+                       remoteID := part[1:6]
+                       remote, ok := cluster.RemoteClusters[remoteID]
+                       if !ok {
+                               http.Error(w, "remote cluster not configured", http.StatusBadGateway)
+                               return
+                       }
+                       token := GetAPIToken(r)
+                       if token == "" {
+                               http.Error(w, "no token provided in Authorization header", http.StatusUnauthorized)
+                               return
+                       }
+                       kc, err := rp.remoteClient(remoteID, remote, token)
+                       if err == auth.ErrObsoleteToken {
+                               http.Error(w, err.Error(), http.StatusBadRequest)
+                               return
+                       } else if err != nil {
+                               http.Error(w, err.Error(), http.StatusInternalServerError)
+                               return
+                       }
+                       remoteClient = kc
+                       part = "A" + part[7:]
+               }
+               parts = append(parts, part)
+       }
+       if remoteClient == nil {
+               http.Error(w, "bad request", http.StatusBadRequest)
+               return
+       }
+       locator := strings.Join(parts, "+")
+       rdr, _, _, err := remoteClient.Get(locator)
+       switch err.(type) {
+       case nil:
+               defer rdr.Close()
+               io.Copy(w, rdr)
+       case *keepclient.ErrNotFound:
+               http.Error(w, err.Error(), http.StatusNotFound)
+       default:
+               http.Error(w, err.Error(), http.StatusBadGateway)
+       }
+}
+
+func (rp *remoteProxy) remoteClient(remoteID string, remoteCluster arvados.RemoteCluster, token string) (*keepclient.KeepClient, error) {
+       rp.mtx.Lock()
+       kc, ok := rp.clients[remoteID]
+       rp.mtx.Unlock()
+       if !ok {
+               c := &arvados.Client{
+                       APIHost:   remoteCluster.Host,
+                       AuthToken: "xxx",
+                       Insecure:  remoteCluster.Insecure,
+               }
+               ac, err := arvadosclient.New(c)
+               if err != nil {
+                       return nil, err
+               }
+               kc, err = keepclient.MakeKeepClient(ac)
+               if err != nil {
+                       return nil, err
+               }
+
+               rp.mtx.Lock()
+               if rp.clients == nil {
+                       rp.clients = map[string]*keepclient.KeepClient{remoteID: kc}
+               } else {
+                       rp.clients[remoteID] = kc
+               }
+               rp.mtx.Unlock()
+       }
+       accopy := *kc.Arvados
+       accopy.ApiToken = token
+       kccopy := *kc
+       kccopy.Arvados = &accopy
+       token, err := auth.SaltToken(token, remoteID)
+       if err != nil {
+               return nil, err
+       }
+       kccopy.Arvados.ApiToken = token
+       return &kccopy, nil
+}
diff --git a/services/keepstore/proxy_remote_test.go b/services/keepstore/proxy_remote_test.go
new file mode 100644 (file)
index 0000000..84c84d6
--- /dev/null
@@ -0,0 +1,149 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "crypto/md5"
+       "encoding/json"
+       "fmt"
+       "net"
+       "net/http"
+       "net/http/httptest"
+       "strconv"
+       "strings"
+       "sync/atomic"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "git.curoverse.com/arvados.git/sdk/go/auth"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&ProxyRemoteSuite{})
+
+type ProxyRemoteSuite struct {
+       cluster *arvados.Cluster
+       vm      VolumeManager
+       rtr     http.Handler
+
+       remoteClusterID      string
+       remoteBlobSigningKey []byte
+       remoteKeepLocator    string
+       remoteKeepData       []byte
+       remoteKeepproxy      *httptest.Server
+       remoteKeepRequests   int64
+       remoteAPI            *httptest.Server
+}
+
+func (s *ProxyRemoteSuite) remoteKeepproxyHandler(w http.ResponseWriter, r *http.Request) {
+       expectToken, err := auth.SaltToken(arvadostest.ActiveTokenV2, s.remoteClusterID)
+       if err != nil {
+               panic(err)
+       }
+       atomic.AddInt64(&s.remoteKeepRequests, 1)
+       var token string
+       if auth := strings.Split(r.Header.Get("Authorization"), " "); len(auth) == 2 && (auth[0] == "OAuth2" || auth[0] == "Bearer") {
+               token = auth[1]
+       }
+       if r.Method == "GET" && r.URL.Path == "/"+s.remoteKeepLocator && token == expectToken {
+               w.Write(s.remoteKeepData)
+               return
+       }
+       http.Error(w, "404", 404)
+}
+
+func (s *ProxyRemoteSuite) remoteAPIHandler(w http.ResponseWriter, r *http.Request) {
+       host, port, _ := net.SplitHostPort(strings.Split(s.remoteKeepproxy.URL, "//")[1])
+       portnum, _ := strconv.Atoi(port)
+       if r.URL.Path == "/arvados/v1/discovery/v1/rest" {
+               json.NewEncoder(w).Encode(arvados.DiscoveryDocument{})
+               return
+       }
+       if r.URL.Path == "/arvados/v1/keep_services/accessible" {
+               json.NewEncoder(w).Encode(arvados.KeepServiceList{
+                       Items: []arvados.KeepService{
+                               {
+                                       UUID:           s.remoteClusterID + "-bi6l4-proxyproxyproxy",
+                                       ServiceType:    "proxy",
+                                       ServiceHost:    host,
+                                       ServicePort:    portnum,
+                                       ServiceSSLFlag: false,
+                               },
+                       },
+               })
+               return
+       }
+       http.Error(w, "404", 404)
+}
+
+func (s *ProxyRemoteSuite) SetUpTest(c *check.C) {
+       s.remoteClusterID = "z0000"
+       s.remoteBlobSigningKey = []byte("3b6df6fb6518afe12922a5bc8e67bf180a358bc8")
+       s.remoteKeepproxy = httptest.NewServer(http.HandlerFunc(s.remoteKeepproxyHandler))
+       s.remoteAPI = httptest.NewUnstartedServer(http.HandlerFunc(s.remoteAPIHandler))
+       s.remoteAPI.StartTLS()
+       s.cluster = arvadostest.IntegrationTestCluster(c)
+       s.cluster.RemoteClusters = map[string]arvados.RemoteCluster{
+               s.remoteClusterID: arvados.RemoteCluster{
+                       Host:     strings.Split(s.remoteAPI.URL, "//")[1],
+                       Proxy:    true,
+                       Scheme:   "http",
+                       Insecure: true,
+               },
+       }
+       s.vm = MakeTestVolumeManager(2)
+       KeepVM = s.vm
+       theConfig = DefaultConfig()
+       theConfig.systemAuthToken = arvadostest.DataManagerToken
+       theConfig.Start()
+       s.rtr = MakeRESTRouter(s.cluster)
+}
+
+func (s *ProxyRemoteSuite) TearDownTest(c *check.C) {
+       s.vm.Close()
+       KeepVM = nil
+       theConfig = DefaultConfig()
+       theConfig.Start()
+       s.remoteAPI.Close()
+       s.remoteKeepproxy.Close()
+}
+
+func (s *ProxyRemoteSuite) TestProxyRemote(c *check.C) {
+       data := []byte("foo bar")
+       s.remoteKeepData = data
+       locator := fmt.Sprintf("%x+%d", md5.Sum(data), len(data))
+       s.remoteKeepLocator = keepclient.SignLocator(locator, arvadostest.ActiveTokenV2, time.Now().Add(time.Minute), time.Minute, s.remoteBlobSigningKey)
+
+       path := "/" + strings.Replace(s.remoteKeepLocator, "+A", "+R"+s.remoteClusterID+"-", 1)
+
+       var req *http.Request
+       var resp *httptest.ResponseRecorder
+       tryWithToken := func(token string) {
+               req = httptest.NewRequest("GET", path, nil)
+               req.Header.Set("Authorization", "Bearer "+token)
+               resp = httptest.NewRecorder()
+               s.rtr.ServeHTTP(resp, req)
+       }
+
+       // Happy path
+       tryWithToken(arvadostest.ActiveTokenV2)
+       c.Check(s.remoteKeepRequests, check.Equals, int64(1))
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       c.Check(resp.Body.String(), check.Equals, string(data))
+
+       // Obsolete token
+       tryWithToken(arvadostest.ActiveToken)
+       c.Check(s.remoteKeepRequests, check.Equals, int64(1))
+       c.Check(resp.Code, check.Equals, http.StatusBadRequest)
+       c.Check(resp.Body.String(), check.Not(check.Equals), string(data))
+
+       // Bad token
+       tryWithToken(arvadostest.ActiveTokenV2[:len(arvadostest.ActiveTokenV2)-3] + "xxx")
+       c.Check(s.remoteKeepRequests, check.Equals, int64(2))
+       c.Check(resp.Code, check.Equals, http.StatusNotFound)
+       c.Check(resp.Body.String(), check.Not(check.Equals), string(data))
+}