--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+ "os"
+ "path/filepath"
+)
+
+// IntegrationTestCluster returns the cluster that has been set up by
+// the integration test framework (see /build/run-tests.sh). It panics
+// on error.
+func IntegrationTestCluster() *Cluster {
+ config, err := GetConfig(filepath.Join(os.Getenv("WORKSPACE"), "tmp", "arvados.yml"))
+ if err != nil {
+ panic(err)
+ }
+ cluster, err := config.GetCluster("")
+ if err != nil {
+ panic(err)
+ }
+ return cluster
+}
const (
SpectatorToken = "zw2f4gwx8hw8cjre7yp6v1zylhrhn3m5gvjq73rtpwhmknrybu"
ActiveToken = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
+ ActiveTokenV2 = "v2/zzzzz-gj3su-077z32aux8dg2s1/3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
AdminToken = "4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h"
AnonymousToken = "4kg6k6lzmp9kj4cpkcoxie964cmvjahbt4fod9zru44k4jqdmi"
DataManagerToken = "320mkve8qkswstz7ff61glpk3mhgghmg67wmic7elw4z41pke1"
func RefreshServiceDiscovery() {
svcListCacheMtx.Lock()
defer svcListCacheMtx.Unlock()
+ var wg sync.WaitGroup
for _, ent := range svcListCache {
- ent.clear <- struct{}{}
+ wg.Add(1)
+ go func() {
+ ent.clear <- struct{}{}
+ wg.Done()
+ }()
}
+ wg.Wait()
}
// ClearCacheOnSIGHUP installs a signal handler that calls
var resp *http.Response
if resp, err = this.httpClient().Do(req); err != nil {
DebugPrintf("DEBUG: [%s] Upload failed %v error: %v", reqid, url, err.Error())
- upload_status <- uploadStatus{err, url, 0, 0, ""}
+ upload_status <- uploadStatus{err, url, 0, 0, err.Error()}
return
}
# Set current_user etc. based on the primary session token if a
# valid one is present. Otherwise, use the first valid token in
# reader_tokens.
+ accepted = false
auth = nil
[params["api_token"],
params["oauth_token"],
validate(token: supplied, remote: remote)
if try_auth.andand.user
auth = try_auth
+ accepted = supplied
break
end
end
Thread.current[:api_client_authorization] = auth
Thread.current[:api_client_uuid] = auth.andand.api_client.andand.uuid
Thread.current[:api_client] = auth.andand.api_client
+ Thread.current[:token] = accepted
Thread.current[:user] = auth.andand.user
@app.call env if @app
return nil
end
+ def token
+ v2token
+ end
+
+ def v1token
+ api_token
+ end
+
+ def v2token
+ 'v2/' + uuid + '/' + api_token
+ end
+
protected
def permission_to_create
# Check permissions on the collection manifest.
# If any signature cannot be verified, raise PermissionDeniedError
# which will return 403 Permission denied to the client.
- api_token = current_api_client_authorization.andand.api_token
+ api_token = Thread.current[:token]
signing_opts = {
api_token: api_token,
now: @validation_timestamp.to_i,
elsif is_trashed
return manifest_text
else
- token = current_api_client_authorization.andand.api_token
+ token = Thread.current[:token]
exp = [db_current_time.to_i + Rails.configuration.blob_signature_ttl,
trash_at].compact.map(&:to_i).min
self.class.sign_manifest manifest_text, token, exp
Rails.configuration.permit_create_collection_with_unsigned_manifest = isok
end
- def assert_signed_manifest manifest_text, label=''
+ def assert_signed_manifest manifest_text, label='', token: false
assert_not_nil manifest_text, "#{label} manifest_text was nil"
manifest_text.scan(/ [[:xdigit:]]{32}\S*/) do |tok|
assert_match(PERM_TOKEN_RE, tok,
"Locator in #{label} manifest_text was not signed")
+ if token
+ bare = tok.gsub(/\+A[^\+]*/, '').sub(/^ /, '')
+ exp = tok[/\+A[[:xdigit:]]+@([[:xdigit:]]+)/, 1].to_i(16)
+ sig = Blob.sign_locator(
+ bare,
+ key: Rails.configuration.blob_signing_key,
+ expire: exp,
+ api_token: token)[/\+A[^\+]*/, 0]
+ assert_includes tok, sig
+ end
end
end
refute_includes json_response, 'unsigned_manifest_text'
end
+ ['v1token', 'v2token'].each do |token_method|
+ test "correct signatures are given for #{token_method}" do
+ token = api_client_authorizations(:active).send(token_method)
+ authorize_with_token token
+ get :show, {id: collections(:foo_file).uuid}
+ assert_response :success
+ assert_signed_manifest json_response['manifest_text'], 'foo_file', token: token
+ end
+
+ test "signatures with #{token_method} are accepted" do
+ token = api_client_authorizations(:active).send(token_method)
+ signed = Blob.sign_locator(
+ 'acbd18db4cc2f85cedef654fccc4a4d8+3',
+ key: Rails.configuration.blob_signing_key,
+ api_token: token)
+ authorize_with_token token
+ put :update, {
+ id: collections(:collection_owned_by_active).uuid,
+ collection: {
+ manifest_text: ". #{signed} 0:3:foo.txt\n",
+ },
+ }
+ assert_response :success
+ assert_signed_manifest json_response['manifest_text'], 'updated', token: token
+ end
+ end
+
test "index with manifest_text selected returns signed locators" do
columns = %w(uuid owner_uuid manifest_text)
authorize_with :active
Thread.current[:api_client_authorization] = nil
Thread.current[:api_client_uuid] = nil
Thread.current[:api_client] = nil
+ Thread.current[:token] = nil
Thread.current[:user] = nil
restore_configuration
end
Thread.current[:api_client_authorization] = client_auth
Thread.current[:api_client] = client_auth.api_client
Thread.current[:user] = client_auth.user
+ Thread.current[:token] = client_auth.token
end
def expect_json
Thread.current[:api_client_authorization] = nil
Thread.current[:api_client_uuid] = nil
Thread.current[:api_client] = nil
+ Thread.current[:token] = nil
Thread.current[:user] = nil
end
end
setup do
# The Collection model needs to have a current token, not just a
# current user, to sign & verify manifests:
- Thread.current[:api_client_authorization] =
- api_client_authorizations(:active)
+ Thread.current[:token] = api_client_authorizations(:active).token
end
teardown do
- Thread.current[:api_client_authorization] = nil
+ Thread.current[:token] = nil
end
# "crrud" == "create read render update delete", not a typo
files_per_stream: 100,
blocks_per_file: 20,
bytes_per_block: 2**26,
- api_token: api_token(:active))
+ api_token: api_client_authorizations(:active).token)
end
act_as_user users(:active) do
c = time_block "new (manifest_text is #{bigmanifest.length>>20}MiB)" do
c.as_api_response(nil)
end
loc = Blob.sign_locator(Digest::MD5.hexdigest('foo') + '+3',
- api_token: api_token(:active))
+ api_token: api_client_authorizations(:active).token)
# Note Collection's strip_manifest_text method has now removed
# the signatures from c.manifest_text, so we have to start from
# bigmanifest again here instead of just appending with "+=".
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
)
+var testCluster = &arvados.Cluster{
+ ClusterID: "zzzzz",
+}
+
// A RequestTester represents the parameters for an HTTP request to
// be issued on behalf of a unit test.
type RequestTester struct {
if rt.apiToken != "" {
req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
}
- loggingRouter := MakeRESTRouter()
+ loggingRouter := MakeRESTRouter(testCluster)
loggingRouter.ServeHTTP(response, req)
return response
}
if rt.apiToken != "" {
req.Header.Set("Authorization", "Bearer "+rt.apiToken)
}
- loggingRouter := MakeRESTRouter()
+ loggingRouter := MakeRESTRouter(testCluster)
loggingRouter.ServeHTTP(response, req)
return response
}
ok := make(chan struct{})
go func() {
req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
- MakeRESTRouter().ServeHTTP(resp, req)
+ MakeRESTRouter(testCluster).ServeHTTP(resp, req)
ok <- struct{}{}
}()
package main
-// REST handlers for Keep are implemented here.
-//
-// GetBlockHandler (GET /locator)
-// PutBlockHandler (PUT /locator)
-// IndexHandler (GET /index, GET /index/prefix)
-// StatusHandler (GET /status.json)
-
import (
"container/list"
"context"
"github.com/gorilla/mux"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/health"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
)
type router struct {
*mux.Router
- limiter httpserver.RequestCounter
+ limiter httpserver.RequestCounter
+ cluster *arvados.Cluster
+ remoteProxy remoteProxy
}
// MakeRESTRouter returns a new router that forwards all Keep requests
// to the appropriate handlers.
-func MakeRESTRouter() http.Handler {
- rtr := &router{Router: mux.NewRouter()}
+func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
+ rtr := &router{
+ Router: mux.NewRouter(),
+ cluster: cluster,
+ }
rtr.HandleFunc(
- `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
+ `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
rtr.HandleFunc(
`/{hash:[0-9a-f]{32}}+{hints}`,
- GetBlockHandler).Methods("GET", "HEAD")
+ rtr.handleGET).Methods("GET", "HEAD")
- rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
+ rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
// List all blocks stored here. Privileged client only.
rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
}
-// GetBlockHandler is a HandleFunc to address Get block requests.
-func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
ctx, cancel := contextForResponse(context.TODO(), resp)
defer cancel()
+ locator := req.URL.Path[1:]
+ if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
+ rtr.remoteProxy.Get(resp, req, rtr.cluster)
+ return
+ }
+
if theConfig.RequireSignatures {
locator := req.URL.Path[1:] // strip leading slash
if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
}
}
-// PutBlockHandler is a HandleFunc to address Put block requests.
-func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
ctx, cancel := contextForResponse(context.TODO(), resp)
defer cancel()
return validLocatorRe.MatchString(loc)
}
-var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
+var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
// GetAPIToken returns the OAuth2 token from the Authorization
// header of a HTTP request, or an empty string if no matching
func GetAPIToken(req *http.Request) string {
if auth, ok := req.Header["Authorization"]; ok {
if match := authRe.FindStringSubmatch(auth[0]); match != nil {
- return match[1]
+ return match[2]
}
}
return ""
"syscall"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/config"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
}
}
+ var cluster *arvados.Cluster
+ cfg, err := arvados.GetConfig(arvados.DefaultConfigFile)
+ if err != nil && os.IsNotExist(err) {
+ log.Warnf("DEPRECATED: proceeding without cluster configuration file %q (%s)", arvados.DefaultConfigFile, err)
+ cluster = &arvados.Cluster{
+ ClusterID: "xxxxx",
+ }
+ } else if err != nil {
+ log.Fatalf("load config %q: %s", arvados.DefaultConfigFile, err)
+ } else {
+ cluster, err = cfg.GetCluster("")
+ if err != nil {
+ log.Fatalf("config error in %q: %s", arvados.DefaultConfigFile, err)
+ }
+ }
+
log.Println("keepstore starting, pid", os.Getpid())
defer log.Println("keepstore exiting, pid", os.Getpid())
KeepVM = MakeRRVolumeManager(theConfig.Volumes)
// Middleware/handler stack
- router := MakeRESTRouter()
+ router := MakeRESTRouter(cluster)
// Set up a TCP listener.
listener, err := net.Listen("tcp", theConfig.Listen)
theConfig = DefaultConfig()
theConfig.systemAuthToken = arvadostest.DataManagerToken
theConfig.Start()
- s.rtr = MakeRESTRouter()
+ s.rtr = MakeRESTRouter(testCluster)
}
func (s *MountsSuite) TearDownTest(c *check.C) {
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "io"
+ "net/http"
+ "strings"
+ "sync"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/auth"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
+
+type remoteProxy struct {
+ clients map[string]*keepclient.KeepClient
+ mtx sync.Mutex
+}
+
+func (rp *remoteProxy) Get(w http.ResponseWriter, r *http.Request, cluster *arvados.Cluster) {
+ var remoteClient *keepclient.KeepClient
+ var parts []string
+ for i, part := range strings.Split(r.URL.Path[1:], "+") {
+ switch {
+ case i == 0:
+ // don't try to parse hash part as hint
+ case strings.HasPrefix(part, "A"):
+ // drop local permission hint
+ continue
+ case len(part) > 7 && part[0] == 'R' && part[6] == '-':
+ remoteID := part[1:6]
+ remote, ok := cluster.RemoteClusters[remoteID]
+ if !ok {
+ http.Error(w, "remote cluster not configured", http.StatusBadGateway)
+ return
+ }
+ token := GetAPIToken(r)
+ if token == "" {
+ http.Error(w, "no token provided in Authorization header", http.StatusUnauthorized)
+ return
+ }
+ kc, err := rp.remoteClient(remoteID, remote, token)
+ if err == auth.ErrObsoleteToken {
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ } else if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ remoteClient = kc
+ part = "A" + part[7:]
+ }
+ parts = append(parts, part)
+ }
+ if remoteClient == nil {
+ http.Error(w, "bad request", http.StatusBadRequest)
+ return
+ }
+ locator := strings.Join(parts, "+")
+ rdr, _, _, err := remoteClient.Get(locator)
+ switch err.(type) {
+ case nil:
+ defer rdr.Close()
+ io.Copy(w, rdr)
+ case *keepclient.ErrNotFound:
+ http.Error(w, err.Error(), http.StatusNotFound)
+ default:
+ http.Error(w, err.Error(), http.StatusBadGateway)
+ }
+}
+
+func (rp *remoteProxy) remoteClient(remoteID string, remoteCluster arvados.RemoteCluster, token string) (*keepclient.KeepClient, error) {
+ rp.mtx.Lock()
+ kc, ok := rp.clients[remoteID]
+ rp.mtx.Unlock()
+ if !ok {
+ c := &arvados.Client{
+ APIHost: remoteCluster.Host,
+ AuthToken: "xxx",
+ Insecure: remoteCluster.Insecure,
+ }
+ ac, err := arvadosclient.New(c)
+ if err != nil {
+ return nil, err
+ }
+ kc, err = keepclient.MakeKeepClient(ac)
+ if err != nil {
+ return nil, err
+ }
+
+ rp.mtx.Lock()
+ if rp.clients == nil {
+ rp.clients = map[string]*keepclient.KeepClient{remoteID: kc}
+ } else {
+ rp.clients[remoteID] = kc
+ }
+ rp.mtx.Unlock()
+ }
+ accopy := *kc.Arvados
+ accopy.ApiToken = token
+ kccopy := *kc
+ kccopy.Arvados = &accopy
+ token, err := auth.SaltToken(token, remoteID)
+ if err != nil {
+ return nil, err
+ }
+ kccopy.Arvados.ApiToken = token
+ return &kccopy, nil
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "crypto/md5"
+ "encoding/json"
+ "fmt"
+ "net"
+ "net/http"
+ "net/http/httptest"
+ "strconv"
+ "strings"
+ "sync/atomic"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/auth"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&ProxyRemoteSuite{})
+
+type ProxyRemoteSuite struct {
+ cluster *arvados.Cluster
+ vm VolumeManager
+ rtr http.Handler
+
+ remoteClusterID string
+ remoteBlobSigningKey []byte
+ remoteKeepLocator string
+ remoteKeepData []byte
+ remoteKeepproxy *httptest.Server
+ remoteKeepRequests int64
+ remoteAPI *httptest.Server
+}
+
+func (s *ProxyRemoteSuite) remoteKeepproxyHandler(w http.ResponseWriter, r *http.Request) {
+ expectToken, err := auth.SaltToken(arvadostest.ActiveTokenV2, s.remoteClusterID)
+ if err != nil {
+ panic(err)
+ }
+ atomic.AddInt64(&s.remoteKeepRequests, 1)
+ var token string
+ if auth := strings.Split(r.Header.Get("Authorization"), " "); len(auth) == 2 && (auth[0] == "OAuth2" || auth[0] == "Bearer") {
+ token = auth[1]
+ }
+ if r.Method == "GET" && r.URL.Path == "/"+s.remoteKeepLocator && token == expectToken {
+ w.Write(s.remoteKeepData)
+ return
+ }
+ http.Error(w, "404", 404)
+}
+
+func (s *ProxyRemoteSuite) remoteAPIHandler(w http.ResponseWriter, r *http.Request) {
+ host, port, _ := net.SplitHostPort(strings.Split(s.remoteKeepproxy.URL, "//")[1])
+ portnum, _ := strconv.Atoi(port)
+ if r.URL.Path == "/arvados/v1/discovery/v1/rest" {
+ json.NewEncoder(w).Encode(arvados.DiscoveryDocument{})
+ return
+ }
+ if r.URL.Path == "/arvados/v1/keep_services/accessible" {
+ json.NewEncoder(w).Encode(arvados.KeepServiceList{
+ Items: []arvados.KeepService{
+ {
+ UUID: s.remoteClusterID + "-bi6l4-proxyproxyproxy",
+ ServiceType: "proxy",
+ ServiceHost: host,
+ ServicePort: portnum,
+ ServiceSSLFlag: false,
+ },
+ },
+ })
+ return
+ }
+ http.Error(w, "404", 404)
+}
+
+func (s *ProxyRemoteSuite) SetUpTest(c *check.C) {
+ s.remoteClusterID = "z0000"
+ s.remoteBlobSigningKey = []byte("3b6df6fb6518afe12922a5bc8e67bf180a358bc8")
+ s.remoteKeepproxy = httptest.NewServer(http.HandlerFunc(s.remoteKeepproxyHandler))
+ s.remoteAPI = httptest.NewUnstartedServer(http.HandlerFunc(s.remoteAPIHandler))
+ s.remoteAPI.StartTLS()
+ s.cluster = arvados.IntegrationTestCluster()
+ s.cluster.RemoteClusters = map[string]arvados.RemoteCluster{
+ s.remoteClusterID: arvados.RemoteCluster{
+ Host: strings.Split(s.remoteAPI.URL, "//")[1],
+ Proxy: true,
+ Scheme: "http",
+ Insecure: true,
+ },
+ }
+ s.vm = MakeTestVolumeManager(2)
+ KeepVM = s.vm
+ theConfig = DefaultConfig()
+ theConfig.systemAuthToken = arvadostest.DataManagerToken
+ theConfig.Start()
+ s.rtr = MakeRESTRouter(s.cluster)
+}
+
+func (s *ProxyRemoteSuite) TearDownTest(c *check.C) {
+ s.vm.Close()
+ KeepVM = nil
+ theConfig = DefaultConfig()
+ theConfig.Start()
+ s.remoteAPI.Close()
+ s.remoteKeepproxy.Close()
+}
+
+func (s *ProxyRemoteSuite) TestProxyRemote(c *check.C) {
+ data := []byte("foo bar")
+ s.remoteKeepData = data
+ locator := fmt.Sprintf("%x+%d", md5.Sum(data), len(data))
+ s.remoteKeepLocator = keepclient.SignLocator(locator, arvadostest.ActiveTokenV2, time.Now().Add(time.Minute), time.Minute, s.remoteBlobSigningKey)
+
+ path := "/" + strings.Replace(s.remoteKeepLocator, "+A", "+R"+s.remoteClusterID+"-", 1)
+
+ var req *http.Request
+ var resp *httptest.ResponseRecorder
+ tryWithToken := func(token string) {
+ req = httptest.NewRequest("GET", path, nil)
+ req.Header.Set("Authorization", "Bearer "+token)
+ resp = httptest.NewRecorder()
+ s.rtr.ServeHTTP(resp, req)
+ }
+
+ // Happy path
+ tryWithToken(arvadostest.ActiveTokenV2)
+ c.Check(s.remoteKeepRequests, check.Equals, int64(1))
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ c.Check(resp.Body.String(), check.Equals, string(data))
+
+ // Obsolete token
+ tryWithToken(arvadostest.ActiveToken)
+ c.Check(s.remoteKeepRequests, check.Equals, int64(1))
+ c.Check(resp.Code, check.Equals, http.StatusBadRequest)
+ c.Check(resp.Body.String(), check.Not(check.Equals), string(data))
+
+ // Bad token
+ tryWithToken(arvadostest.ActiveTokenV2[:len(arvadostest.ActiveTokenV2)-3] + "xxx")
+ c.Check(s.remoteKeepRequests, check.Equals, int64(2))
+ c.Check(resp.Code, check.Equals, http.StatusNotFound)
+ c.Check(resp.Body.String(), check.Not(check.Equals), string(data))
+}