import (
"bytes"
"context"
- "crypto/md5"
"encoding/json"
"errors"
"fmt"
local := localdb.NewConn(cluster)
remotes := map[string]backend{}
for id, remote := range cluster.RemoteClusters {
- if !remote.Proxy {
+ if !remote.Proxy || id == cluster.ClusterID {
continue
}
- remotes[id] = rpc.NewConn(id, &url.URL{Scheme: remote.Scheme, Host: remote.Host}, remote.Insecure, saltedTokenProvider(local, id))
+ conn := rpc.NewConn(id, &url.URL{Scheme: remote.Scheme, Host: remote.Host}, remote.Insecure, saltedTokenProvider(local, id))
+ // Older versions of controller rely on the Via header
+ // to detect loops.
+ conn.SendHeader = http.Header{"Via": {"HTTP/1.1 arvados-controller"}}
+ remotes[id] = conn
}
return &Conn{
} else if err != nil {
return nil, err
}
+ if strings.HasPrefix(aca.UUID, remoteID) {
+ // We have it cached here, but
+ // the token belongs to the
+ // remote target itself, so
+ // pass it through unmodified.
+ tokens = append(tokens, token)
+ continue
+ }
salted, err := auth.SaltToken(aca.TokenV2(), remoteID)
if err != nil {
return nil, err
}
}
+func (conn *Conn) localOrLoginCluster() backend {
+ if conn.cluster.Login.LoginCluster != "" {
+ return conn.chooseBackend(conn.cluster.Login.LoginCluster)
+ }
+ return conn.local
+}
+
// Call fn with the local backend; then, if fn returned 404, call fn
// on the available remote backends (possibly concurrently) until one
// succeeds.
// or "" for the local backend.
//
// A non-nil error means all backends failed.
-func (conn *Conn) tryLocalThenRemotes(ctx context.Context, fn func(context.Context, string, backend) error) error {
- if err := fn(ctx, "", conn.local); err == nil || errStatus(err) != http.StatusNotFound {
+func (conn *Conn) tryLocalThenRemotes(ctx context.Context, forwardedFor string, fn func(context.Context, string, backend) error) error {
+ if err := fn(ctx, "", conn.local); err == nil || errStatus(err) != http.StatusNotFound || forwardedFor != "" {
+ // Note: forwardedFor != "" means this request came
+ // from a remote cluster, so we don't take a second
+ // hop. This avoids cycles, redundant calls to a
+ // mutually reachable remote, and use of double-salted
+ // tokens.
return err
}
})
}
-// this could be in sdk/go/arvados
-func portableDataHash(mt string) string {
- h := md5.New()
- blkRe := regexp.MustCompile(`^ [0-9a-f]{32}\+\d+`)
- size := 0
- _ = regexp.MustCompile(` ?[^ ]*`).ReplaceAllFunc([]byte(mt), func(tok []byte) []byte {
- if m := blkRe.Find(tok); m != nil {
- // write hash+size, ignore remaining block hints
- tok = m
- }
- n, err := h.Write(tok)
- if err != nil {
- panic(err)
- }
- size += n
- return nil
- })
- return fmt.Sprintf("%x+%d", h.Sum(nil), size)
-}
-
func (conn *Conn) ConfigGet(ctx context.Context) (json.RawMessage, error) {
var buf bytes.Buffer
err := config.ExportJSON(&buf, conn.cluster)
return arvados.LoginResponse{
RedirectLocation: target.String(),
}, nil
- } else {
- return conn.local.Login(ctx, options)
}
+ return conn.local.Login(ctx, options)
+}
+
+func (conn *Conn) Logout(ctx context.Context, options arvados.LogoutOptions) (arvados.LogoutResponse, error) {
+ // If the logout request comes with an API token from a known
+ // remote cluster, redirect to that cluster's logout handler
+ // so it has an opportunity to clear sessions, expire tokens,
+ // etc. Otherwise use the local endpoint.
+ reqauth, ok := auth.FromContext(ctx)
+ if !ok || len(reqauth.Tokens) == 0 || len(reqauth.Tokens[0]) < 8 || !strings.HasPrefix(reqauth.Tokens[0], "v2/") {
+ return conn.local.Logout(ctx, options)
+ }
+ id := reqauth.Tokens[0][3:8]
+ if id == conn.cluster.ClusterID {
+ return conn.local.Logout(ctx, options)
+ }
+ remote, ok := conn.remotes[id]
+ if !ok {
+ return conn.local.Logout(ctx, options)
+ }
+ baseURL := remote.BaseURL()
+ target, err := baseURL.Parse(arvados.EndpointLogout.Path)
+ if err != nil {
+ return arvados.LogoutResponse{}, fmt.Errorf("internal error getting redirect target: %s", err)
+ }
+ target.RawQuery = url.Values{"return_to": {options.ReturnTo}}.Encode()
+ return arvados.LogoutResponse{RedirectLocation: target.String()}, nil
}
func (conn *Conn) CollectionGet(ctx context.Context, options arvados.GetOptions) (arvados.Collection, error) {
c.ManifestText = rewriteManifest(c.ManifestText, options.UUID[:5])
}
return c, err
- } else {
- // UUID is a PDH
- first := make(chan arvados.Collection, 1)
- err := conn.tryLocalThenRemotes(ctx, func(ctx context.Context, remoteID string, be backend) error {
- c, err := be.CollectionGet(ctx, options)
- if err != nil {
- return err
- }
- // options.UUID is either hash+size or
- // hash+size+hints; only hash+size need to
- // match the computed PDH.
- if pdh := portableDataHash(c.ManifestText); pdh != options.UUID && !strings.HasPrefix(options.UUID, pdh+"+") {
- err = httpErrorf(http.StatusBadGateway, "bad portable data hash %q received from remote %q (expected %q)", pdh, remoteID, options.UUID)
- ctxlog.FromContext(ctx).Warn(err)
- return err
- }
- if remoteID != "" {
- c.ManifestText = rewriteManifest(c.ManifestText, remoteID)
- }
- select {
- case first <- c:
- return nil
- default:
- // lost race, return value doesn't matter
- return nil
- }
- })
+ }
+ // UUID is a PDH
+ first := make(chan arvados.Collection, 1)
+ err := conn.tryLocalThenRemotes(ctx, options.ForwardedFor, func(ctx context.Context, remoteID string, be backend) error {
+ remoteOpts := options
+ remoteOpts.ForwardedFor = conn.cluster.ClusterID + "-" + options.ForwardedFor
+ c, err := be.CollectionGet(ctx, remoteOpts)
if err != nil {
- return arvados.Collection{}, err
+ return err
+ }
+ // options.UUID is either hash+size or
+ // hash+size+hints; only hash+size need to
+ // match the computed PDH.
+ if pdh := arvados.PortableDataHash(c.ManifestText); pdh != options.UUID && !strings.HasPrefix(options.UUID, pdh+"+") {
+ err = httpErrorf(http.StatusBadGateway, "bad portable data hash %q received from remote %q (expected %q)", pdh, remoteID, options.UUID)
+ ctxlog.FromContext(ctx).Warn(err)
+ return err
+ }
+ if remoteID != "" {
+ c.ManifestText = rewriteManifest(c.ManifestText, remoteID)
+ }
+ select {
+ case first <- c:
+ return nil
+ default:
+ // lost race, return value doesn't matter
+ return nil
}
- return <-first, nil
+ })
+ if err != nil {
+ return arvados.Collection{}, err
}
+ return <-first, nil
}
func (conn *Conn) CollectionList(ctx context.Context, options arvados.ListOptions) (arvados.CollectionList, error) {
return conn.chooseBackend(options.UUID).ContainerUnlock(ctx, options)
}
+func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (arvados.ContainerSSHConnection, error) {
+ return conn.chooseBackend(options.UUID).ContainerSSH(ctx, options)
+}
+
+func (conn *Conn) ContainerRequestList(ctx context.Context, options arvados.ListOptions) (arvados.ContainerRequestList, error) {
+ return conn.generated_ContainerRequestList(ctx, options)
+}
+
+func (conn *Conn) ContainerRequestCreate(ctx context.Context, options arvados.CreateOptions) (arvados.ContainerRequest, error) {
+ be := conn.chooseBackend(options.ClusterID)
+ if be == conn.local {
+ return be.ContainerRequestCreate(ctx, options)
+ }
+ if _, ok := options.Attrs["runtime_token"]; !ok {
+ // If runtime_token is not set, create a new token
+ aca, err := conn.local.APIClientAuthorizationCurrent(ctx, arvados.GetOptions{})
+ if err != nil {
+ // This should probably be StatusUnauthorized
+ // (need to update test in
+ // lib/controller/federation_test.go):
+ // When RoR is out of the picture this should be:
+ // return arvados.ContainerRequest{}, httpErrorf(http.StatusUnauthorized, "%w", err)
+ return arvados.ContainerRequest{}, httpErrorf(http.StatusForbidden, "%s", "invalid API token")
+ }
+ user, err := conn.local.UserGetCurrent(ctx, arvados.GetOptions{})
+ if err != nil {
+ return arvados.ContainerRequest{}, err
+ }
+ if len(aca.Scopes) == 0 || aca.Scopes[0] != "all" {
+ return arvados.ContainerRequest{}, httpErrorf(http.StatusForbidden, "token scope is not [all]")
+ }
+ if strings.HasPrefix(aca.UUID, conn.cluster.ClusterID) {
+ // Local user, submitting to a remote cluster.
+ // Create a new time-limited token.
+ local, ok := conn.local.(*localdb.Conn)
+ if !ok {
+ return arvados.ContainerRequest{}, httpErrorf(http.StatusInternalServerError, "bug: local backend is a %T, not a *localdb.Conn", conn.local)
+ }
+ aca, err = local.CreateAPIClientAuthorization(ctx, conn.cluster.SystemRootToken, rpc.UserSessionAuthInfo{UserUUID: user.UUID,
+ ExpiresAt: time.Now().UTC().Add(conn.cluster.Collections.BlobSigningTTL.Duration())})
+ if err != nil {
+ return arvados.ContainerRequest{}, err
+ }
+ options.Attrs["runtime_token"] = aca.TokenV2()
+ } else {
+ // Remote user. Container request will use the
+ // current token, minus the trailing portion
+ // (optional container uuid).
+ options.Attrs["runtime_token"] = aca.TokenV2()
+ }
+ }
+ return be.ContainerRequestCreate(ctx, options)
+}
+
+func (conn *Conn) ContainerRequestUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.ContainerRequest, error) {
+ return conn.chooseBackend(options.UUID).ContainerRequestUpdate(ctx, options)
+}
+
+func (conn *Conn) ContainerRequestGet(ctx context.Context, options arvados.GetOptions) (arvados.ContainerRequest, error) {
+ return conn.chooseBackend(options.UUID).ContainerRequestGet(ctx, options)
+}
+
+func (conn *Conn) ContainerRequestDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.ContainerRequest, error) {
+ return conn.chooseBackend(options.UUID).ContainerRequestDelete(ctx, options)
+}
+
+func (conn *Conn) GroupCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Group, error) {
+ return conn.chooseBackend(options.ClusterID).GroupCreate(ctx, options)
+}
+
+func (conn *Conn) GroupUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Group, error) {
+ return conn.chooseBackend(options.UUID).GroupUpdate(ctx, options)
+}
+
+func (conn *Conn) GroupGet(ctx context.Context, options arvados.GetOptions) (arvados.Group, error) {
+ return conn.chooseBackend(options.UUID).GroupGet(ctx, options)
+}
+
+func (conn *Conn) GroupList(ctx context.Context, options arvados.ListOptions) (arvados.GroupList, error) {
+ return conn.generated_GroupList(ctx, options)
+}
+
+func (conn *Conn) GroupContents(ctx context.Context, options arvados.ContentsOptions) (arvados.ObjectList, error) {
+ return conn.chooseBackend(options.UUID).GroupContents(ctx, options)
+}
+
+func (conn *Conn) GroupShared(ctx context.Context, options arvados.SharedOptions) (arvados.ObjectList, error) {
+ return conn.chooseBackend(options.UUID).GroupShared(ctx, options)
+}
+
+func (conn *Conn) GroupDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Group, error) {
+ return conn.chooseBackend(options.UUID).GroupDelete(ctx, options)
+}
+
+func (conn *Conn) GroupUntrash(ctx context.Context, options arvados.UntrashOptions) (arvados.Group, error) {
+ return conn.chooseBackend(options.UUID).GroupUntrash(ctx, options)
+}
+
func (conn *Conn) SpecimenList(ctx context.Context, options arvados.ListOptions) (arvados.SpecimenList, error) {
return conn.generated_SpecimenList(ctx, options)
}
}
var userAttrsCachedFromLoginCluster = map[string]bool{
- "created_at": true,
- "email": true,
- "first_name": true,
- "is_active": true,
- "is_admin": true,
- "last_name": true,
- "modified_at": true,
- "modified_by_client_uuid": true,
- "modified_by_user_uuid": true,
- "prefs": true,
- "username": true,
-
- "etag": false,
- "full_name": false,
- "identity_url": false,
- "is_invited": false,
- "owner_uuid": false,
- "uuid": false,
- "writable_by": false,
-}
-
-func (conn *Conn) UserList(ctx context.Context, options arvados.ListOptions) (arvados.UserList, error) {
+ "created_at": true,
+ "email": true,
+ "first_name": true,
+ "is_active": true,
+ "is_admin": true,
+ "last_name": true,
+ "modified_at": true,
+ "prefs": true,
+ "username": true,
+
+ "etag": false,
+ "full_name": false,
+ "identity_url": false,
+ "is_invited": false,
+ "modified_by_client_uuid": false,
+ "modified_by_user_uuid": false,
+ "owner_uuid": false,
+ "uuid": false,
+ "writable_by": false,
+}
+
+func (conn *Conn) batchUpdateUsers(ctx context.Context,
+ options arvados.ListOptions,
+ items []arvados.User) (err error) {
+
+ id := conn.cluster.Login.LoginCluster
logger := ctxlog.FromContext(ctx)
- if id := conn.cluster.Login.LoginCluster; id != "" && id != conn.cluster.ClusterID {
- resp, err := conn.chooseBackend(id).UserList(ctx, options)
- if err != nil {
- return resp, err
+ batchOpts := arvados.UserBatchUpdateOptions{Updates: map[string]map[string]interface{}{}}
+ for _, user := range items {
+ if !strings.HasPrefix(user.UUID, id) {
+ continue
+ }
+ logger.Debugf("cache user info for uuid %q", user.UUID)
+
+ // If the remote cluster has null timestamps
+ // (e.g., test server with incomplete
+ // fixtures) use dummy timestamps (instead of
+ // the zero time, which causes a Rails API
+ // error "year too big to marshal: 1 UTC").
+ if user.ModifiedAt.IsZero() {
+ user.ModifiedAt = time.Now()
+ }
+ if user.CreatedAt.IsZero() {
+ user.CreatedAt = time.Now()
}
- batchOpts := arvados.UserBatchUpdateOptions{Updates: map[string]map[string]interface{}{}}
- for _, user := range resp.Items {
- if !strings.HasPrefix(user.UUID, id) {
- continue
- }
- logger.Debugf("cache user info for uuid %q", user.UUID)
-
- // If the remote cluster has null timestamps
- // (e.g., test server with incomplete
- // fixtures) use dummy timestamps (instead of
- // the zero time, which causes a Rails API
- // error "year too big to marshal: 1 UTC").
- if user.ModifiedAt.IsZero() {
- user.ModifiedAt = time.Now()
- }
- if user.CreatedAt.IsZero() {
- user.CreatedAt = time.Now()
- }
- var allFields map[string]interface{}
- buf, err := json.Marshal(user)
- if err != nil {
- return arvados.UserList{}, fmt.Errorf("error encoding user record from remote response: %s", err)
- }
- err = json.Unmarshal(buf, &allFields)
- if err != nil {
- return arvados.UserList{}, fmt.Errorf("error transcoding user record from remote response: %s", err)
- }
- updates := allFields
- if len(options.Select) > 0 {
- updates = map[string]interface{}{}
- for _, k := range options.Select {
- if v, ok := allFields[k]; ok && userAttrsCachedFromLoginCluster[k] {
- updates[k] = v
- }
+ var allFields map[string]interface{}
+ buf, err := json.Marshal(user)
+ if err != nil {
+ return fmt.Errorf("error encoding user record from remote response: %s", err)
+ }
+ err = json.Unmarshal(buf, &allFields)
+ if err != nil {
+ return fmt.Errorf("error transcoding user record from remote response: %s", err)
+ }
+ updates := allFields
+ if len(options.Select) > 0 {
+ updates = map[string]interface{}{}
+ for _, k := range options.Select {
+ if v, ok := allFields[k]; ok && userAttrsCachedFromLoginCluster[k] {
+ updates[k] = v
}
- } else {
- for k := range updates {
- if !userAttrsCachedFromLoginCluster[k] {
- delete(updates, k)
- }
+ }
+ } else {
+ for k := range updates {
+ if !userAttrsCachedFromLoginCluster[k] {
+ delete(updates, k)
}
}
- batchOpts.Updates[user.UUID] = updates
}
- if len(batchOpts.Updates) > 0 {
- ctxRoot := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{conn.cluster.SystemRootToken}})
- _, err = conn.local.UserBatchUpdate(ctxRoot, batchOpts)
- if err != nil {
- return arvados.UserList{}, fmt.Errorf("error updating local user records: %s", err)
- }
+ batchOpts.Updates[user.UUID] = updates
+ }
+ if len(batchOpts.Updates) > 0 {
+ ctxRoot := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{conn.cluster.SystemRootToken}})
+ _, err = conn.local.UserBatchUpdate(ctxRoot, batchOpts)
+ if err != nil {
+ return fmt.Errorf("error updating local user records: %s", err)
+ }
+ }
+ return nil
+}
+
+func (conn *Conn) UserList(ctx context.Context, options arvados.ListOptions) (arvados.UserList, error) {
+ if id := conn.cluster.Login.LoginCluster; id != "" && id != conn.cluster.ClusterID && !options.BypassFederation {
+ resp, err := conn.chooseBackend(id).UserList(ctx, options)
+ if err != nil {
+ return resp, err
+ }
+ err = conn.batchUpdateUsers(ctx, options, resp.Items)
+ if err != nil {
+ return arvados.UserList{}, err
}
return resp, nil
- } else {
- return conn.generated_UserList(ctx, options)
}
+ return conn.generated_UserList(ctx, options)
}
func (conn *Conn) UserCreate(ctx context.Context, options arvados.CreateOptions) (arvados.User, error) {
}
func (conn *Conn) UserUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.User, error) {
- return conn.chooseBackend(options.UUID).UserUpdate(ctx, options)
+ if options.BypassFederation {
+ return conn.local.UserUpdate(ctx, options)
+ }
+ resp, err := conn.chooseBackend(options.UUID).UserUpdate(ctx, options)
+ if err != nil {
+ return resp, err
+ }
+ if !strings.HasPrefix(options.UUID, conn.cluster.ClusterID) {
+ // Copy the updated user record to the local cluster
+ err = conn.batchUpdateUsers(ctx, arvados.ListOptions{}, []arvados.User{resp})
+ if err != nil {
+ return arvados.User{}, err
+ }
+ }
+ return resp, err
}
func (conn *Conn) UserUpdateUUID(ctx context.Context, options arvados.UpdateUUIDOptions) (arvados.User, error) {
- return conn.chooseBackend(options.UUID).UserUpdateUUID(ctx, options)
+ return conn.local.UserUpdateUUID(ctx, options)
}
func (conn *Conn) UserMerge(ctx context.Context, options arvados.UserMergeOptions) (arvados.User, error) {
- return conn.chooseBackend(options.OldUserUUID).UserMerge(ctx, options)
+ return conn.local.UserMerge(ctx, options)
}
func (conn *Conn) UserActivate(ctx context.Context, options arvados.UserActivateOptions) (arvados.User, error) {
- return conn.chooseBackend(options.UUID).UserActivate(ctx, options)
+ return conn.localOrLoginCluster().UserActivate(ctx, options)
}
func (conn *Conn) UserSetup(ctx context.Context, options arvados.UserSetupOptions) (map[string]interface{}, error) {
- return conn.chooseBackend(options.UUID).UserSetup(ctx, options)
+ upstream := conn.localOrLoginCluster()
+ if upstream != conn.local {
+ // When LoginCluster is in effect, and we're setting
+ // up a remote user, and we want to give that user
+ // access to a local VM, we can't include the VM in
+ // the setup call, because the remote cluster won't
+ // recognize it.
+
+ // Similarly, if we want to create a git repo,
+ // it should be created on the local cluster,
+ // not the remote one.
+
+ upstreamOptions := options
+ upstreamOptions.VMUUID = ""
+ upstreamOptions.RepoName = ""
+
+ ret, err := upstream.UserSetup(ctx, upstreamOptions)
+ if err != nil {
+ return ret, err
+ }
+ }
+
+ return conn.local.UserSetup(ctx, options)
}
func (conn *Conn) UserUnsetup(ctx context.Context, options arvados.GetOptions) (arvados.User, error) {
- return conn.chooseBackend(options.UUID).UserUnsetup(ctx, options)
+ return conn.localOrLoginCluster().UserUnsetup(ctx, options)
}
func (conn *Conn) UserGet(ctx context.Context, options arvados.GetOptions) (arvados.User, error) {
- return conn.chooseBackend(options.UUID).UserGet(ctx, options)
+ resp, err := conn.chooseBackend(options.UUID).UserGet(ctx, options)
+ if err != nil {
+ return resp, err
+ }
+ if options.UUID != resp.UUID {
+ return arvados.User{}, httpErrorf(http.StatusBadGateway, "Had requested %v but response was for %v", options.UUID, resp.UUID)
+ }
+ if options.UUID[:5] != conn.cluster.ClusterID {
+ err = conn.batchUpdateUsers(ctx, arvados.ListOptions{Select: options.Select}, []arvados.User{resp})
+ if err != nil {
+ return arvados.User{}, err
+ }
+ }
+ return resp, nil
}
func (conn *Conn) UserGetCurrent(ctx context.Context, options arvados.GetOptions) (arvados.User, error) {
- return conn.chooseBackend(options.UUID).UserGetCurrent(ctx, options)
+ return conn.local.UserGetCurrent(ctx, options)
}
func (conn *Conn) UserGetSystem(ctx context.Context, options arvados.GetOptions) (arvados.User, error) {
return conn.local.UserBatchUpdate(ctx, options)
}
+func (conn *Conn) UserAuthenticate(ctx context.Context, options arvados.UserAuthenticateOptions) (arvados.APIClientAuthorization, error) {
+ return conn.local.UserAuthenticate(ctx, options)
+}
+
func (conn *Conn) APIClientAuthorizationCurrent(ctx context.Context, options arvados.GetOptions) (arvados.APIClientAuthorization, error) {
return conn.chooseBackend(options.UUID).APIClientAuthorizationCurrent(ctx, options)
}
func errStatus(err error) int {
if httpErr, ok := err.(interface{ HTTPStatus() int }); ok {
return httpErr.HTTPStatus()
- } else {
- return http.StatusInternalServerError
}
+ return http.StatusInternalServerError
}