Merge branch 'main' into 18842-arv-mount-disk-config
[arvados.git] / lib / controller / federation.go
index c5089fa23512907b26fb499a3fbe17c61e2c6762..93b8315a63be588a0b3e2e1b3182337e68defeff 100644 (file)
@@ -5,44 +5,31 @@
 package controller
 
 import (
-       "bufio"
        "bytes"
-       "context"
-       "crypto/md5"
        "database/sql"
        "encoding/json"
        "fmt"
        "io"
        "io/ioutil"
+       "mime"
        "net/http"
        "net/url"
        "regexp"
        "strings"
-       "sync"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/auth"
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/auth"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/jmcvetta/randutil"
 )
 
 var pathPattern = `^/arvados/v1/%s(/([0-9a-z]{5})-%s-[0-9a-z]{15})?(.*)$`
 var wfRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "workflows", "7fd4e"))
 var containersRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "containers", "dz642"))
 var containerRequestsRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "container_requests", "xvhdp"))
-var collectionRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "collections", "4zz18"))
-var collectionByPDHRe = regexp.MustCompile(`^/arvados/v1/collections/([0-9a-fA-F]{32}\+[0-9]+)+$`)
-
-type genericFederatedRequestHandler struct {
-       next    http.Handler
-       handler *Handler
-       matcher *regexp.Regexp
-}
-
-type collectionFederatedRequestHandler struct {
-       next    http.Handler
-       handler *Handler
-}
+var collectionsRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "collections", "4zz18"))
+var collectionsByPDHRe = regexp.MustCompile(`^/arvados/v1/collections/([0-9a-fA-F]{32}\+[0-9]+)+$`)
+var linksRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "links", "o0j2j"))
 
 func (h *Handler) remoteClusterRequest(remoteID string, req *http.Request) (*http.Response, error) {
        remote, ok := h.Cluster.RemoteClusters[remoteID]
@@ -68,7 +55,7 @@ func (h *Handler) remoteClusterRequest(remoteID string, req *http.Request) (*htt
        if remote.Insecure {
                client = h.insecureClient
        }
-       return h.proxy.ForwardRequest(saltedReq, urlOut, client)
+       return h.proxy.Do(saltedReq, urlOut, client)
 }
 
 // Buffer request body, parse form parameters in request, and then
@@ -76,7 +63,11 @@ func (h *Handler) remoteClusterRequest(remoteID string, req *http.Request) (*htt
 // downstream proxy steps.
 func loadParamsFromForm(req *http.Request) error {
        var postBody *bytes.Buffer
-       if req.Body != nil && req.Header.Get("Content-Type") == "application/x-www-form-urlencoded" {
+       if ct := req.Header.Get("Content-Type"); ct == "" {
+               // Assume application/octet-stream, i.e., no form to parse.
+       } else if ct, _, err := mime.ParseMediaType(ct); err != nil {
+               return err
+       } else if ct == "application/x-www-form-urlencoded" && req.Body != nil {
                var cl int64
                if req.ContentLength > 0 {
                        cl = req.ContentLength
@@ -98,607 +89,19 @@ func loadParamsFromForm(req *http.Request) error {
        return nil
 }
 
-type multiClusterQueryResponseCollector struct {
-       responses []map[string]interface{}
-       error     error
-       kind      string
-       clusterID string
-}
-
-func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response,
-       requestError error) (newResponse *http.Response, err error) {
-       if requestError != nil {
-               c.error = requestError
-               return nil, nil
-       }
-
-       defer resp.Body.Close()
-       var loadInto struct {
-               Kind   string                   `json:"kind"`
-               Items  []map[string]interface{} `json:"items"`
-               Errors []string                 `json:"errors"`
-       }
-       err = json.NewDecoder(resp.Body).Decode(&loadInto)
-
-       if err != nil {
-               c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, err)
-               return nil, nil
-       }
-       if resp.StatusCode != http.StatusOK {
-               c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto.Errors)
-               return nil, nil
-       }
-
-       c.responses = loadInto.Items
-       c.kind = loadInto.Kind
-
-       return nil, nil
-}
-
-func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
-       req *http.Request,
-       clusterID string, uuids []string) (rp []map[string]interface{}, kind string, err error) {
-
-       found := make(map[string]bool)
-       prev_len_uuids := len(uuids) + 1
-       // Loop while
-       // (1) there are more uuids to query
-       // (2) we're making progress - on each iteration the set of
-       // uuids we are expecting for must shrink.
-       for len(uuids) > 0 && len(uuids) < prev_len_uuids {
-               var remoteReq http.Request
-               remoteReq.Header = req.Header
-               remoteReq.Method = "POST"
-               remoteReq.URL = &url.URL{Path: req.URL.Path}
-               remoteParams := make(url.Values)
-               remoteParams.Set("_method", "GET")
-               remoteParams.Set("count", "none")
-               if req.Form.Get("select") != "" {
-                       remoteParams.Set("select", req.Form.Get("select"))
-               }
-               content, err := json.Marshal(uuids)
-               if err != nil {
-                       return nil, "", err
-               }
-               remoteParams["filters"] = []string{fmt.Sprintf(`[["uuid", "in", %s]]`, content)}
-               enc := remoteParams.Encode()
-               remoteReq.Body = ioutil.NopCloser(bytes.NewBufferString(enc))
-
-               rc := multiClusterQueryResponseCollector{clusterID: clusterID}
-
-               var resp *http.Response
-               if clusterID == h.handler.Cluster.ClusterID {
-                       resp, err = h.handler.localClusterRequest(&remoteReq)
-               } else {
-                       resp, err = h.handler.remoteClusterRequest(clusterID, &remoteReq)
-               }
-               rc.collectResponse(resp, err)
-
-               if rc.error != nil {
-                       return nil, "", rc.error
-               }
-
-               kind = rc.kind
-
-               if len(rc.responses) == 0 {
-                       // We got zero responses, no point in doing
-                       // another query.
-                       return rp, kind, nil
-               }
-
-               rp = append(rp, rc.responses...)
-
-               // Go through the responses and determine what was
-               // returned.  If there are remaining items, loop
-               // around and do another request with just the
-               // stragglers.
-               for _, i := range rc.responses {
-                       uuid, ok := i["uuid"].(string)
-                       if ok {
-                               found[uuid] = true
-                       }
-               }
-
-               l := []string{}
-               for _, u := range uuids {
-                       if !found[u] {
-                               l = append(l, u)
-                       }
-               }
-               prev_len_uuids = len(uuids)
-               uuids = l
-       }
-
-       return rp, kind, nil
-}
-
-func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter,
-       req *http.Request, clusterId *string) bool {
-
-       var filters [][]interface{}
-       err := json.Unmarshal([]byte(req.Form.Get("filters")), &filters)
-       if err != nil {
-               httpserver.Error(w, err.Error(), http.StatusBadRequest)
-               return true
-       }
-
-       // Split the list of uuids by prefix
-       queryClusters := make(map[string][]string)
-       expectCount := 0
-       for _, filter := range filters {
-               if len(filter) != 3 {
-                       return false
-               }
-
-               if lhs, ok := filter[0].(string); !ok || lhs != "uuid" {
-                       return false
-               }
-
-               op, ok := filter[1].(string)
-               if !ok {
-                       return false
-               }
-
-               if op == "in" {
-                       if rhs, ok := filter[2].([]interface{}); ok {
-                               for _, i := range rhs {
-                                       if u, ok := i.(string); ok {
-                                               *clusterId = u[0:5]
-                                               queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
-                                               expectCount += 1
-                                       }
-                               }
-                       }
-               } else if op == "=" {
-                       if u, ok := filter[2].(string); ok {
-                               *clusterId = u[0:5]
-                               queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
-                               expectCount += 1
-                       }
-               } else {
-                       return false
-               }
-
-       }
-
-       if len(queryClusters) <= 1 {
-               // Query does not search for uuids across multiple
-               // clusters.
-               return false
-       }
-
-       // Validations
-       count := req.Form.Get("count")
-       if count != "" && count != `none` && count != `"none"` {
-               httpserver.Error(w, "Federated multi-object query must have 'count=none'", http.StatusBadRequest)
-               return true
-       }
-       if req.Form.Get("limit") != "" || req.Form.Get("offset") != "" || req.Form.Get("order") != "" {
-               httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
-               return true
-       }
-       if expectCount > h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse() {
-               httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
-                       expectCount, h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse()), http.StatusBadRequest)
-               return true
-       }
-       if req.Form.Get("select") != "" {
-               foundUUID := false
-               var selects []string
-               err := json.Unmarshal([]byte(req.Form.Get("select")), &selects)
-               if err != nil {
-                       httpserver.Error(w, err.Error(), http.StatusBadRequest)
-                       return true
-               }
-
-               for _, r := range selects {
-                       if r == "uuid" {
-                               foundUUID = true
-                               break
-                       }
-               }
-               if !foundUUID {
-                       httpserver.Error(w, "Federated multi-object request must include 'uuid' in 'select'", http.StatusBadRequest)
-                       return true
-               }
-       }
-
-       // Perform concurrent requests to each cluster
-
-       // use channel as a semaphore to limit the number of concurrent
-       // requests at a time
-       sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
-       defer close(sem)
-       wg := sync.WaitGroup{}
-
-       req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
-       mtx := sync.Mutex{}
-       errors := []error{}
-       var completeResponses []map[string]interface{}
-       var kind string
-
-       for k, v := range queryClusters {
-               if len(v) == 0 {
-                       // Nothing to query
-                       continue
-               }
-
-               // blocks until it can put a value into the
-               // channel (which has a max queue capacity)
-               sem <- true
-               wg.Add(1)
-               go func(k string, v []string) {
-                       rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
-                       mtx.Lock()
-                       if err == nil {
-                               completeResponses = append(completeResponses, rp...)
-                               kind = kn
-                       } else {
-                               errors = append(errors, err)
-                       }
-                       mtx.Unlock()
-                       wg.Done()
-                       <-sem
-               }(k, v)
-       }
-       wg.Wait()
-
-       if len(errors) > 0 {
-               var strerr []string
-               for _, e := range errors {
-                       strerr = append(strerr, e.Error())
-               }
-               httpserver.Errors(w, strerr, http.StatusBadGateway)
-               return true
-       }
-
-       w.Header().Set("Content-Type", "application/json")
-       w.WriteHeader(http.StatusOK)
-       itemList := make(map[string]interface{})
-       itemList["items"] = completeResponses
-       itemList["kind"] = kind
-       json.NewEncoder(w).Encode(itemList)
-
-       return true
-}
-
-func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
-       m := h.matcher.FindStringSubmatch(req.URL.Path)
-       clusterId := ""
-
-       if len(m) > 0 && m[2] != "" {
-               clusterId = m[2]
-       }
-
-       // Get form parameters from URL and form body (if POST).
-       if err := loadParamsFromForm(req); err != nil {
-               httpserver.Error(w, err.Error(), http.StatusBadRequest)
-               return
-       }
-
-       // Check if the parameters have an explicit cluster_id
-       if req.Form.Get("cluster_id") != "" {
-               clusterId = req.Form.Get("cluster_id")
-       }
-
-       // Handle the POST-as-GET special case (workaround for large
-       // GET requests that potentially exceed maximum URL length,
-       // like multi-object queries where the filter has 100s of
-       // items)
-       effectiveMethod := req.Method
-       if req.Method == "POST" && req.Form.Get("_method") != "" {
-               effectiveMethod = req.Form.Get("_method")
-       }
-
-       if effectiveMethod == "GET" &&
-               clusterId == "" &&
-               req.Form.Get("filters") != "" &&
-               h.handleMultiClusterQuery(w, req, &clusterId) {
-               return
-       }
-
-       if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
-               h.next.ServeHTTP(w, req)
-       } else {
-               resp, err := h.handler.remoteClusterRequest(clusterId, req)
-               h.handler.proxy.ForwardResponse(w, resp, err)
-       }
-}
-
-func rewriteSignatures(clusterID string, expectHash string,
-       resp *http.Response, requestError error) (newResponse *http.Response, err error) {
-
-       if requestError != nil {
-               return resp, requestError
-       }
-
-       if resp.StatusCode != 200 {
-               return resp, nil
-       }
-
-       originalBody := resp.Body
-       defer originalBody.Close()
-
-       var col arvados.Collection
-       err = json.NewDecoder(resp.Body).Decode(&col)
-       if err != nil {
-               return nil, err
-       }
-
-       // rewriting signatures will make manifest text 5-10% bigger so calculate
-       // capacity accordingly
-       updatedManifest := bytes.NewBuffer(make([]byte, 0, int(float64(len(col.ManifestText))*1.1)))
-
-       hasher := md5.New()
-       mw := io.MultiWriter(hasher, updatedManifest)
-       sz := 0
-
-       scanner := bufio.NewScanner(strings.NewReader(col.ManifestText))
-       scanner.Buffer(make([]byte, 1048576), len(col.ManifestText))
-       for scanner.Scan() {
-               line := scanner.Text()
-               tokens := strings.Split(line, " ")
-               if len(tokens) < 3 {
-                       return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
-               }
-
-               n, err := mw.Write([]byte(tokens[0]))
-               if err != nil {
-                       return nil, fmt.Errorf("Error updating manifest: %v", err)
-               }
-               sz += n
-               for _, token := range tokens[1:] {
-                       n, err = mw.Write([]byte(" "))
-                       if err != nil {
-                               return nil, fmt.Errorf("Error updating manifest: %v", err)
-                       }
-                       sz += n
-
-                       m := keepclient.SignedLocatorRe.FindStringSubmatch(token)
-                       if m != nil {
-                               // Rewrite the block signature to be a remote signature
-                               _, err = fmt.Fprintf(updatedManifest, "%s%s%s+R%s-%s%s", m[1], m[2], m[3], clusterID, m[5][2:], m[8])
-                               if err != nil {
-                                       return nil, fmt.Errorf("Error updating manifest: %v", err)
-                               }
-
-                               // for hash checking, ignore signatures
-                               n, err = fmt.Fprintf(hasher, "%s%s", m[1], m[2])
-                               if err != nil {
-                                       return nil, fmt.Errorf("Error updating manifest: %v", err)
-                               }
-                               sz += n
-                       } else {
-                               n, err = mw.Write([]byte(token))
-                               if err != nil {
-                                       return nil, fmt.Errorf("Error updating manifest: %v", err)
-                               }
-                               sz += n
-                       }
-               }
-               n, err = mw.Write([]byte("\n"))
-               if err != nil {
-                       return nil, fmt.Errorf("Error updating manifest: %v", err)
-               }
-               sz += n
-       }
-
-       // Check that expected hash is consistent with
-       // portable_data_hash field of the returned record
-       if expectHash == "" {
-               expectHash = col.PortableDataHash
-       } else if expectHash != col.PortableDataHash {
-               return nil, fmt.Errorf("portable_data_hash %q on returned record did not match expected hash %q ", expectHash, col.PortableDataHash)
-       }
-
-       // Certify that the computed hash of the manifest_text matches our expectation
-       sum := hasher.Sum(nil)
-       computedHash := fmt.Sprintf("%x+%v", sum, sz)
-       if computedHash != expectHash {
-               return nil, fmt.Errorf("Computed manifest_text hash %q did not match expected hash %q", computedHash, expectHash)
-       }
-
-       col.ManifestText = updatedManifest.String()
-
-       newbody, err := json.Marshal(col)
-       if err != nil {
-               return nil, err
-       }
-
-       buf := bytes.NewBuffer(newbody)
-       resp.Body = ioutil.NopCloser(buf)
-       resp.ContentLength = int64(buf.Len())
-       resp.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len()))
-
-       return resp, nil
-}
-
-func filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
-       if requestError != nil {
-               return resp, requestError
-       }
-
-       if resp.StatusCode == 404 {
-               // Suppress returning this result, because we want to
-               // search the federation.
-               return nil, nil
-       }
-       return resp, nil
-}
-
-type searchRemoteClusterForPDH struct {
-       pdh           string
-       remoteID      string
-       mtx           *sync.Mutex
-       sentResponse  *bool
-       sharedContext *context.Context
-       cancelFunc    func()
-       errors        *[]string
-       statusCode    *int
-}
-
-func (s *searchRemoteClusterForPDH) filterRemoteClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
-       s.mtx.Lock()
-       defer s.mtx.Unlock()
-
-       if *s.sentResponse {
-               // Another request already returned a response
-               return nil, nil
-       }
-
-       if requestError != nil {
-               *s.errors = append(*s.errors, fmt.Sprintf("Request error contacting %q: %v", s.remoteID, requestError))
-               // Record the error and suppress response
-               return nil, nil
-       }
-
-       if resp.StatusCode != 200 {
-               // Suppress returning unsuccessful result.  Maybe
-               // another request will find it.
-               // TODO collect and return error responses.
-               *s.errors = append(*s.errors, fmt.Sprintf("Response from %q: %v", s.remoteID, resp.Status))
-               if resp.StatusCode != 404 {
-                       // Got a non-404 error response, convert into BadGateway
-                       *s.statusCode = http.StatusBadGateway
-               }
-               return nil, nil
-       }
-
-       s.mtx.Unlock()
-
-       // This reads the response body.  We don't want to hold the
-       // lock while doing this because other remote requests could
-       // also have made it to this point, and we don't want a
-       // slow response holding the lock to block a faster response
-       // that is waiting on the lock.
-       newResponse, err = rewriteSignatures(s.remoteID, s.pdh, resp, nil)
-
-       s.mtx.Lock()
-
-       if *s.sentResponse {
-               // Another request already returned a response
-               return nil, nil
-       }
-
-       if err != nil {
-               // Suppress returning unsuccessful result.  Maybe
-               // another request will be successful.
-               *s.errors = append(*s.errors, fmt.Sprintf("Error parsing response from %q: %v", s.remoteID, err))
-               return nil, nil
-       }
-
-       // We have a successful response.  Suppress/cancel all the
-       // other requests/responses.
-       *s.sentResponse = true
-       s.cancelFunc()
-
-       return newResponse, nil
-}
-
-func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
-       if req.Method != "GET" {
-               // Only handle GET requests right now
-               h.next.ServeHTTP(w, req)
-               return
-       }
-
-       m := collectionByPDHRe.FindStringSubmatch(req.URL.Path)
-       if len(m) != 2 {
-               // Not a collection PDH GET request
-               m = collectionRe.FindStringSubmatch(req.URL.Path)
-               clusterId := ""
-
-               if len(m) > 0 {
-                       clusterId = m[2]
-               }
-
-               if clusterId != "" && clusterId != h.handler.Cluster.ClusterID {
-                       // request for remote collection by uuid
-                       resp, err := h.handler.remoteClusterRequest(clusterId, req)
-                       newResponse, err := rewriteSignatures(clusterId, "", resp, err)
-                       h.handler.proxy.ForwardResponse(w, newResponse, err)
-                       return
-               }
-               // not a collection UUID request, or it is a request
-               // for a local UUID, either way, continue down the
-               // handler stack.
-               h.next.ServeHTTP(w, req)
-               return
-       }
-
-       // Request for collection by PDH.  Search the federation.
-
-       // First, query the local cluster.
-       resp, err := h.handler.localClusterRequest(req)
-       newResp, err := filterLocalClusterResponse(resp, err)
-       if newResp != nil || err != nil {
-               h.handler.proxy.ForwardResponse(w, newResp, err)
-               return
-       }
-
-       sharedContext, cancelFunc := context.WithCancel(req.Context())
-       defer cancelFunc()
-       req = req.WithContext(sharedContext)
-
-       // Create a goroutine for each cluster in the
-       // RemoteClusters map.  The first valid result gets
-       // returned to the client.  When that happens, all
-       // other outstanding requests are cancelled or
-       // suppressed.
-       sentResponse := false
-       mtx := sync.Mutex{}
-       wg := sync.WaitGroup{}
-       var errors []string
-       var errorCode int = 404
-
-       // use channel as a semaphore to limit the number of concurrent
-       // requests at a time
-       sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
-       defer close(sem)
-       for remoteID := range h.handler.Cluster.RemoteClusters {
-               if remoteID == h.handler.Cluster.ClusterID {
-                       // No need to query local cluster again
-                       continue
-               }
-               // blocks until it can put a value into the
-               // channel (which has a max queue capacity)
-               sem <- true
-               if sentResponse {
-                       break
-               }
-               search := &searchRemoteClusterForPDH{m[1], remoteID, &mtx, &sentResponse,
-                       &sharedContext, cancelFunc, &errors, &errorCode}
-               wg.Add(1)
-               go func() {
-                       resp, err := h.handler.remoteClusterRequest(search.remoteID, req)
-                       newResp, err := search.filterRemoteClusterResponse(resp, err)
-                       if newResp != nil || err != nil {
-                               h.handler.proxy.ForwardResponse(w, newResp, err)
-                       }
-                       wg.Done()
-                       <-sem
-               }()
-       }
-       wg.Wait()
-
-       if sentResponse {
-               return
-       }
-
-       // No successful responses, so return the error
-       httpserver.Errors(w, errors, errorCode)
-}
-
 func (h *Handler) setupProxyRemoteCluster(next http.Handler) http.Handler {
        mux := http.NewServeMux()
-       mux.Handle("/arvados/v1/workflows", &genericFederatedRequestHandler{next, h, wfRe})
-       mux.Handle("/arvados/v1/workflows/", &genericFederatedRequestHandler{next, h, wfRe})
-       mux.Handle("/arvados/v1/containers", &genericFederatedRequestHandler{next, h, containersRe})
-       mux.Handle("/arvados/v1/containers/", &genericFederatedRequestHandler{next, h, containersRe})
-       mux.Handle("/arvados/v1/container_requests", &genericFederatedRequestHandler{next, h, containerRequestsRe})
-       mux.Handle("/arvados/v1/container_requests/", &genericFederatedRequestHandler{next, h, containerRequestsRe})
-       mux.Handle("/arvados/v1/collections", next)
-       mux.Handle("/arvados/v1/collections/", &collectionFederatedRequestHandler{next, h})
+
+       wfHandler := &genericFederatedRequestHandler{next, h, wfRe, nil}
+       containersHandler := &genericFederatedRequestHandler{next, h, containersRe, nil}
+       linksRequestsHandler := &genericFederatedRequestHandler{next, h, linksRe, nil}
+
+       mux.Handle("/arvados/v1/workflows", wfHandler)
+       mux.Handle("/arvados/v1/workflows/", wfHandler)
+       mux.Handle("/arvados/v1/containers", containersHandler)
+       mux.Handle("/arvados/v1/containers/", containersHandler)
+       mux.Handle("/arvados/v1/links", linksRequestsHandler)
+       mux.Handle("/arvados/v1/links/", linksRequestsHandler)
        mux.Handle("/", next)
 
        return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
@@ -718,8 +121,6 @@ func (h *Handler) setupProxyRemoteCluster(next http.Handler) http.Handler {
 
                mux.ServeHTTP(w, req)
        })
-
-       return mux
 }
 
 type CurrentUser struct {
@@ -727,12 +128,95 @@ type CurrentUser struct {
        UUID          string
 }
 
-func (h *Handler) validateAPItoken(req *http.Request, user *CurrentUser) error {
-       db, err := h.db(req)
+// validateAPItoken extracts the token from the provided http request,
+// checks it again api_client_authorizations table in the database,
+// and fills in the token scope and user UUID.  Does not handle remote
+// tokens unless they are already in the database and not expired.
+//
+// Return values are:
+//
+// nil, false, non-nil -- if there was an internal error
+//
+// nil, false, nil -- if the token is invalid
+//
+// non-nil, true, nil -- if the token is valid
+func (h *Handler) validateAPItoken(req *http.Request, token string) (*CurrentUser, bool, error) {
+       user := CurrentUser{Authorization: arvados.APIClientAuthorization{APIToken: token}}
+       db, err := h.dbConnector.GetDB(req.Context())
        if err != nil {
-               return err
+               ctxlog.FromContext(req.Context()).WithError(err).Debugf("validateAPItoken(%s): database error", token)
+               return nil, false, err
+       }
+
+       var uuid string
+       if strings.HasPrefix(token, "v2/") {
+               sp := strings.Split(token, "/")
+               uuid = sp[1]
+               token = sp[2]
+       }
+       user.Authorization.APIToken = token
+       var scopes string
+       err = db.QueryRowContext(req.Context(), `SELECT api_client_authorizations.uuid, api_client_authorizations.scopes, users.uuid FROM api_client_authorizations JOIN users on api_client_authorizations.user_id=users.id WHERE api_token=$1 AND (expires_at IS NULL OR expires_at > current_timestamp AT TIME ZONE 'UTC') LIMIT 1`, token).Scan(&user.Authorization.UUID, &scopes, &user.UUID)
+       if err == sql.ErrNoRows {
+               ctxlog.FromContext(req.Context()).Debugf("validateAPItoken(%s): not found in database", token)
+               return nil, false, nil
+       } else if err != nil {
+               ctxlog.FromContext(req.Context()).WithError(err).Debugf("validateAPItoken(%s): database error", token)
+               return nil, false, err
+       }
+       if uuid != "" && user.Authorization.UUID != uuid {
+               // secret part matches, but UUID doesn't -- somewhat surprising
+               ctxlog.FromContext(req.Context()).Debugf("validateAPItoken(%s): secret part found, but with different UUID: %s", token, user.Authorization.UUID)
+               return nil, false, nil
        }
-       return db.QueryRowContext(req.Context(), `SELECT api_client_authorizations.uuid, users.uuid FROM api_client_authorizations JOIN users on api_client_authorizations.user_id=users.id WHERE api_token=$1 AND (expires_at IS NULL OR expires_at > current_timestamp) LIMIT 1`, user.Authorization.APIToken).Scan(&user.Authorization.UUID, &user.UUID)
+       err = json.Unmarshal([]byte(scopes), &user.Authorization.Scopes)
+       if err != nil {
+               ctxlog.FromContext(req.Context()).WithError(err).Debugf("validateAPItoken(%s): error parsing scopes from db", token)
+               return nil, false, err
+       }
+       ctxlog.FromContext(req.Context()).Debugf("validateAPItoken(%s): ok", token)
+       return &user, true, nil
+}
+
+func (h *Handler) createAPItoken(req *http.Request, userUUID string, scopes []string) (*arvados.APIClientAuthorization, error) {
+       db, err := h.dbConnector.GetDB(req.Context())
+       if err != nil {
+               return nil, err
+       }
+       rd, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
+       if err != nil {
+               return nil, err
+       }
+       uuid := fmt.Sprintf("%v-gj3su-%v", h.Cluster.ClusterID, rd)
+       token, err := randutil.String(50, "abcdefghijklmnopqrstuvwxyz0123456789")
+       if err != nil {
+               return nil, err
+       }
+       if len(scopes) == 0 {
+               scopes = append(scopes, "all")
+       }
+       scopesjson, err := json.Marshal(scopes)
+       if err != nil {
+               return nil, err
+       }
+       _, err = db.ExecContext(req.Context(),
+               `INSERT INTO api_client_authorizations
+(uuid, api_token, expires_at, scopes,
+user_id,
+api_client_id, created_at, updated_at)
+VALUES ($1, $2, CURRENT_TIMESTAMP AT TIME ZONE 'UTC' + INTERVAL '2 weeks', $3,
+(SELECT id FROM users WHERE users.uuid=$4 LIMIT 1),
+0, CURRENT_TIMESTAMP AT TIME ZONE 'UTC', CURRENT_TIMESTAMP AT TIME ZONE 'UTC')`,
+               uuid, token, string(scopesjson), userUUID)
+
+       if err != nil {
+               return nil, err
+       }
+
+       return &arvados.APIClientAuthorization{
+               UUID:     uuid,
+               APIToken: token,
+               Scopes:   scopes}, nil
 }
 
 // Extract the auth token supplied in req, and replace it with a
@@ -768,19 +252,21 @@ func (h *Handler) saltAuthToken(req *http.Request, remote string) (updatedReq *h
                return updatedReq, nil
        }
 
+       ctxlog.FromContext(req.Context()).Debugf("saltAuthToken: cluster %s token %s remote %s", h.Cluster.ClusterID, creds.Tokens[0], remote)
        token, err := auth.SaltToken(creds.Tokens[0], remote)
 
-       if err == auth.ErrObsoleteToken {
-               // If the token exists in our own database, salt it
-               // for the remote. Otherwise, assume it was issued by
-               // the remote, and pass it through unmodified.
-               currentUser := CurrentUser{Authorization: arvados.APIClientAuthorization{APIToken: creds.Tokens[0]}}
-               err = h.validateAPItoken(req, &currentUser)
-               if err == sql.ErrNoRows {
-                       // Not ours; pass through unmodified.
-                       token = currentUser.Authorization.APIToken
-               } else if err != nil {
+       if err == auth.ErrObsoleteToken || err == auth.ErrTokenFormat {
+               // If the token exists in our own database for our own
+               // user, salt it for the remote. Otherwise, assume it
+               // was issued by the remote, and pass it through
+               // unmodified.
+               currentUser, ok, err := h.validateAPItoken(req, creds.Tokens[0])
+               if err != nil {
                        return nil, err
+               } else if !ok || strings.HasPrefix(currentUser.UUID, remote) {
+                       // Unknown, or cached + belongs to remote;
+                       // pass through unmodified.
+                       token = creds.Tokens[0]
                } else {
                        // Found; make V2 version and salt it.
                        token, err = auth.SaltToken(currentUser.Authorization.TokenV2(), remote)
@@ -799,7 +285,7 @@ func (h *Handler) saltAuthToken(req *http.Request, remote string) (updatedReq *h
        }
        updatedReq.Header.Set("Authorization", "Bearer "+token)
 
-       // Remove api_token=... from the the query string, in case we
+       // Remove api_token=... from the query string, in case we
        // end up forwarding the request.
        if values, err := url.ParseQuery(updatedReq.URL.RawQuery); err != nil {
                return nil, err