1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
23 "git.curoverse.com/arvados.git/sdk/go/arvados"
24 "git.curoverse.com/arvados.git/sdk/go/auth"
25 "git.curoverse.com/arvados.git/sdk/go/httpserver"
26 "git.curoverse.com/arvados.git/sdk/go/keepclient"
29 var pathPattern = `^/arvados/v1/%s(/([0-9a-z]{5})-%s-[0-9a-z]{15})?(.*)$`
30 var wfRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "workflows", "7fd4e"))
31 var containersRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "containers", "dz642"))
32 var containerRequestsRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "container_requests", "xvhdp"))
33 var collectionRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "collections", "4zz18"))
34 var collectionByPDHRe = regexp.MustCompile(`^/arvados/v1/collections/([0-9a-fA-F]{32}\+[0-9]+)+$`)
36 type genericFederatedRequestHandler struct {
39 matcher *regexp.Regexp
42 type collectionFederatedRequestHandler struct {
47 func (h *Handler) remoteClusterRequest(remoteID string, req *http.Request) (*http.Response, error) {
48 remote, ok := h.Cluster.RemoteClusters[remoteID]
50 return nil, HTTPError{fmt.Sprintf("no proxy available for cluster %v", remoteID), http.StatusNotFound}
52 scheme := remote.Scheme
56 saltedReq, err := h.saltAuthToken(req, remoteID)
63 Path: saltedReq.URL.Path,
64 RawPath: saltedReq.URL.RawPath,
65 RawQuery: saltedReq.URL.RawQuery,
67 client := h.secureClient
69 client = h.insecureClient
71 return h.proxy.ForwardRequest(saltedReq, urlOut, client)
74 // Buffer request body, parse form parameters in request, and then
75 // replace original body with the buffer so it can be re-read by
76 // downstream proxy steps.
77 func loadParamsFromForm(req *http.Request) error {
78 var postBody *bytes.Buffer
79 if req.Body != nil && req.Header.Get("Content-Type") == "application/x-www-form-urlencoded" {
81 if req.ContentLength > 0 {
82 cl = req.ContentLength
84 postBody = bytes.NewBuffer(make([]byte, 0, cl))
85 originalBody := req.Body
86 defer originalBody.Close()
87 req.Body = ioutil.NopCloser(io.TeeReader(req.Body, postBody))
90 err := req.ParseForm()
95 if req.Body != nil && postBody != nil {
96 req.Body = ioutil.NopCloser(postBody)
101 type multiClusterQueryResponseCollector struct {
102 responses []map[string]interface{}
108 func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response,
109 requestError error) (newResponse *http.Response, err error) {
110 if requestError != nil {
111 c.error = requestError
115 defer resp.Body.Close()
116 var loadInto struct {
117 Kind string `json:"kind"`
118 Items []map[string]interface{} `json:"items"`
119 Errors []string `json:"errors"`
121 err = json.NewDecoder(resp.Body).Decode(&loadInto)
124 c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, err)
127 if resp.StatusCode != http.StatusOK {
128 c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto.Errors)
132 c.responses = loadInto.Items
133 c.kind = loadInto.Kind
138 func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
140 clusterID string, uuids []string) (rp []map[string]interface{}, kind string, err error) {
142 found := make(map[string]bool)
143 prev_len_uuids := len(uuids) + 1
145 // (1) there are more uuids to query
146 // (2) we're making progress - on each iteration the set of
147 // uuids we are expecting for must shrink.
148 for len(uuids) > 0 && len(uuids) < prev_len_uuids {
149 var remoteReq http.Request
150 remoteReq.Header = req.Header
151 remoteReq.Method = "POST"
152 remoteReq.URL = &url.URL{Path: req.URL.Path}
153 remoteParams := make(url.Values)
154 remoteParams.Set("_method", "GET")
155 remoteParams.Set("count", "none")
156 if req.Form.Get("select") != "" {
157 remoteParams.Set("select", req.Form.Get("select"))
159 content, err := json.Marshal(uuids)
163 remoteParams["filters"] = []string{fmt.Sprintf(`[["uuid", "in", %s]]`, content)}
164 enc := remoteParams.Encode()
165 remoteReq.Body = ioutil.NopCloser(bytes.NewBufferString(enc))
167 rc := multiClusterQueryResponseCollector{clusterID: clusterID}
169 var resp *http.Response
170 if clusterID == h.handler.Cluster.ClusterID {
171 resp, err = h.handler.localClusterRequest(&remoteReq)
173 resp, err = h.handler.remoteClusterRequest(clusterID, &remoteReq)
175 rc.collectResponse(resp, err)
178 return nil, "", rc.error
183 if len(rc.responses) == 0 {
184 // We got zero responses, no point in doing
189 rp = append(rp, rc.responses...)
191 // Go through the responses and determine what was
192 // returned. If there are remaining items, loop
193 // around and do another request with just the
195 for _, i := range rc.responses {
196 uuid, ok := i["uuid"].(string)
203 for _, u := range uuids {
208 prev_len_uuids = len(uuids)
215 func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter,
216 req *http.Request, clusterId *string) bool {
218 var filters [][]interface{}
219 err := json.Unmarshal([]byte(req.Form.Get("filters")), &filters)
221 httpserver.Error(w, err.Error(), http.StatusBadRequest)
225 // Split the list of uuids by prefix
226 queryClusters := make(map[string][]string)
228 for _, filter := range filters {
229 if len(filter) != 3 {
233 if lhs, ok := filter[0].(string); !ok || lhs != "uuid" {
237 op, ok := filter[1].(string)
243 if rhs, ok := filter[2].([]interface{}); ok {
244 for _, i := range rhs {
245 if u, ok := i.(string); ok {
247 queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
252 } else if op == "=" {
253 if u, ok := filter[2].(string); ok {
255 queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
264 if len(queryClusters) <= 1 {
265 // Query does not search for uuids across multiple
271 count := req.Form.Get("count")
272 if count != "" && count != `none` && count != `"none"` {
273 httpserver.Error(w, "Federated multi-object query must have 'count=none'", http.StatusBadRequest)
276 if req.Form.Get("limit") != "" || req.Form.Get("offset") != "" || req.Form.Get("order") != "" {
277 httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
280 if expectCount > h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse() {
281 httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
282 expectCount, h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse()), http.StatusBadRequest)
285 if req.Form.Get("select") != "" {
288 err := json.Unmarshal([]byte(req.Form.Get("select")), &selects)
290 httpserver.Error(w, err.Error(), http.StatusBadRequest)
294 for _, r := range selects {
301 httpserver.Error(w, "Federated multi-object request must include 'uuid' in 'select'", http.StatusBadRequest)
306 // Perform concurrent requests to each cluster
308 // use channel as a semaphore to limit the number of concurrent
309 // requests at a time
310 sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
312 wg := sync.WaitGroup{}
314 req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
317 var completeResponses []map[string]interface{}
320 for k, v := range queryClusters {
326 // blocks until it can put a value into the
327 // channel (which has a max queue capacity)
330 go func(k string, v []string) {
331 rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
334 completeResponses = append(completeResponses, rp...)
337 errors = append(errors, err)
348 for _, e := range errors {
349 strerr = append(strerr, e.Error())
351 httpserver.Errors(w, strerr, http.StatusBadGateway)
355 w.Header().Set("Content-Type", "application/json")
356 w.WriteHeader(http.StatusOK)
357 itemList := make(map[string]interface{})
358 itemList["items"] = completeResponses
359 itemList["kind"] = kind
360 json.NewEncoder(w).Encode(itemList)
365 func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
366 m := h.matcher.FindStringSubmatch(req.URL.Path)
369 if len(m) > 0 && m[2] != "" {
373 // Get form parameters from URL and form body (if POST).
374 if err := loadParamsFromForm(req); err != nil {
375 httpserver.Error(w, err.Error(), http.StatusBadRequest)
379 // Check if the parameters have an explicit cluster_id
380 if req.Form.Get("cluster_id") != "" {
381 clusterId = req.Form.Get("cluster_id")
384 // Handle the POST-as-GET special case (workaround for large
385 // GET requests that potentially exceed maximum URL length,
386 // like multi-object queries where the filter has 100s of
388 effectiveMethod := req.Method
389 if req.Method == "POST" && req.Form.Get("_method") != "" {
390 effectiveMethod = req.Form.Get("_method")
393 if effectiveMethod == "GET" &&
395 req.Form.Get("filters") != "" &&
396 h.handleMultiClusterQuery(w, req, &clusterId) {
400 if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
401 h.next.ServeHTTP(w, req)
403 resp, err := h.handler.remoteClusterRequest(clusterId, req)
404 h.handler.proxy.ForwardResponse(w, resp, err)
408 func rewriteSignatures(clusterID string, expectHash string,
409 resp *http.Response, requestError error) (newResponse *http.Response, err error) {
411 if requestError != nil {
412 return resp, requestError
415 if resp.StatusCode != 200 {
419 originalBody := resp.Body
420 defer originalBody.Close()
422 var col arvados.Collection
423 err = json.NewDecoder(resp.Body).Decode(&col)
428 // rewriting signatures will make manifest text 5-10% bigger so calculate
429 // capacity accordingly
430 updatedManifest := bytes.NewBuffer(make([]byte, 0, int(float64(len(col.ManifestText))*1.1)))
433 mw := io.MultiWriter(hasher, updatedManifest)
436 scanner := bufio.NewScanner(strings.NewReader(col.ManifestText))
437 scanner.Buffer(make([]byte, 1048576), len(col.ManifestText))
439 line := scanner.Text()
440 tokens := strings.Split(line, " ")
442 return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
445 n, err := mw.Write([]byte(tokens[0]))
447 return nil, fmt.Errorf("Error updating manifest: %v", err)
450 for _, token := range tokens[1:] {
451 n, err = mw.Write([]byte(" "))
453 return nil, fmt.Errorf("Error updating manifest: %v", err)
457 m := keepclient.SignedLocatorRe.FindStringSubmatch(token)
459 // Rewrite the block signature to be a remote signature
460 _, err = fmt.Fprintf(updatedManifest, "%s%s%s+R%s-%s%s", m[1], m[2], m[3], clusterID, m[5][2:], m[8])
462 return nil, fmt.Errorf("Error updating manifest: %v", err)
465 // for hash checking, ignore signatures
466 n, err = fmt.Fprintf(hasher, "%s%s", m[1], m[2])
468 return nil, fmt.Errorf("Error updating manifest: %v", err)
472 n, err = mw.Write([]byte(token))
474 return nil, fmt.Errorf("Error updating manifest: %v", err)
479 n, err = mw.Write([]byte("\n"))
481 return nil, fmt.Errorf("Error updating manifest: %v", err)
486 // Check that expected hash is consistent with
487 // portable_data_hash field of the returned record
488 if expectHash == "" {
489 expectHash = col.PortableDataHash
490 } else if expectHash != col.PortableDataHash {
491 return nil, fmt.Errorf("portable_data_hash %q on returned record did not match expected hash %q ", expectHash, col.PortableDataHash)
494 // Certify that the computed hash of the manifest_text matches our expectation
495 sum := hasher.Sum(nil)
496 computedHash := fmt.Sprintf("%x+%v", sum, sz)
497 if computedHash != expectHash {
498 return nil, fmt.Errorf("Computed manifest_text hash %q did not match expected hash %q", computedHash, expectHash)
501 col.ManifestText = updatedManifest.String()
503 newbody, err := json.Marshal(col)
508 buf := bytes.NewBuffer(newbody)
509 resp.Body = ioutil.NopCloser(buf)
510 resp.ContentLength = int64(buf.Len())
511 resp.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len()))
516 func filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
517 if requestError != nil {
518 return resp, requestError
521 if resp.StatusCode == 404 {
522 // Suppress returning this result, because we want to
523 // search the federation.
529 type searchRemoteClusterForPDH struct {
534 sharedContext *context.Context
540 func (s *searchRemoteClusterForPDH) filterRemoteClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
545 // Another request already returned a response
549 if requestError != nil {
550 *s.errors = append(*s.errors, fmt.Sprintf("Request error contacting %q: %v", s.remoteID, requestError))
551 // Record the error and suppress response
555 if resp.StatusCode != 200 {
556 // Suppress returning unsuccessful result. Maybe
557 // another request will find it.
558 // TODO collect and return error responses.
559 *s.errors = append(*s.errors, fmt.Sprintf("Response from %q: %v", s.remoteID, resp.Status))
560 if resp.StatusCode != 404 {
561 // Got a non-404 error response, convert into BadGateway
562 *s.statusCode = http.StatusBadGateway
569 // This reads the response body. We don't want to hold the
570 // lock while doing this because other remote requests could
571 // also have made it to this point, and we don't want a
572 // slow response holding the lock to block a faster response
573 // that is waiting on the lock.
574 newResponse, err = rewriteSignatures(s.remoteID, s.pdh, resp, nil)
579 // Another request already returned a response
584 // Suppress returning unsuccessful result. Maybe
585 // another request will be successful.
586 *s.errors = append(*s.errors, fmt.Sprintf("Error parsing response from %q: %v", s.remoteID, err))
590 // We have a successful response. Suppress/cancel all the
591 // other requests/responses.
592 *s.sentResponse = true
595 return newResponse, nil
598 func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
599 if req.Method != "GET" {
600 // Only handle GET requests right now
601 h.next.ServeHTTP(w, req)
605 m := collectionByPDHRe.FindStringSubmatch(req.URL.Path)
607 // Not a collection PDH GET request
608 m = collectionRe.FindStringSubmatch(req.URL.Path)
615 if clusterId != "" && clusterId != h.handler.Cluster.ClusterID {
616 // request for remote collection by uuid
617 resp, err := h.handler.remoteClusterRequest(clusterId, req)
618 newResponse, err := rewriteSignatures(clusterId, "", resp, err)
619 h.handler.proxy.ForwardResponse(w, newResponse, err)
622 // not a collection UUID request, or it is a request
623 // for a local UUID, either way, continue down the
625 h.next.ServeHTTP(w, req)
629 // Request for collection by PDH. Search the federation.
631 // First, query the local cluster.
632 resp, err := h.handler.localClusterRequest(req)
633 newResp, err := filterLocalClusterResponse(resp, err)
634 if newResp != nil || err != nil {
635 h.handler.proxy.ForwardResponse(w, newResp, err)
639 sharedContext, cancelFunc := context.WithCancel(req.Context())
641 req = req.WithContext(sharedContext)
643 // Create a goroutine for each cluster in the
644 // RemoteClusters map. The first valid result gets
645 // returned to the client. When that happens, all
646 // other outstanding requests are cancelled or
648 sentResponse := false
650 wg := sync.WaitGroup{}
652 var errorCode int = 404
654 // use channel as a semaphore to limit the number of concurrent
655 // requests at a time
656 sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
658 for remoteID := range h.handler.Cluster.RemoteClusters {
659 if remoteID == h.handler.Cluster.ClusterID {
660 // No need to query local cluster again
663 // blocks until it can put a value into the
664 // channel (which has a max queue capacity)
669 search := &searchRemoteClusterForPDH{m[1], remoteID, &mtx, &sentResponse,
670 &sharedContext, cancelFunc, &errors, &errorCode}
673 resp, err := h.handler.remoteClusterRequest(search.remoteID, req)
674 newResp, err := search.filterRemoteClusterResponse(resp, err)
675 if newResp != nil || err != nil {
676 h.handler.proxy.ForwardResponse(w, newResp, err)
688 // No successful responses, so return the error
689 httpserver.Errors(w, errors, errorCode)
692 func (h *Handler) setupProxyRemoteCluster(next http.Handler) http.Handler {
693 mux := http.NewServeMux()
694 mux.Handle("/arvados/v1/workflows", &genericFederatedRequestHandler{next, h, wfRe})
695 mux.Handle("/arvados/v1/workflows/", &genericFederatedRequestHandler{next, h, wfRe})
696 mux.Handle("/arvados/v1/containers", &genericFederatedRequestHandler{next, h, containersRe})
697 mux.Handle("/arvados/v1/containers/", &genericFederatedRequestHandler{next, h, containersRe})
698 mux.Handle("/arvados/v1/container_requests", &genericFederatedRequestHandler{next, h, containerRequestsRe})
699 mux.Handle("/arvados/v1/container_requests/", &genericFederatedRequestHandler{next, h, containerRequestsRe})
700 mux.Handle("/arvados/v1/collections", next)
701 mux.Handle("/arvados/v1/collections/", &collectionFederatedRequestHandler{next, h})
702 mux.Handle("/", next)
704 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
705 parts := strings.Split(req.Header.Get("Authorization"), "/")
706 alreadySalted := (len(parts) == 3 && parts[0] == "Bearer v2" && len(parts[2]) == 40)
709 strings.Index(req.Header.Get("Via"), "arvados-controller") != -1 {
710 // The token is already salted, or this is a
711 // request from another instance of
712 // arvados-controller. In either case, we
713 // don't want to proxy this query, so just
714 // continue down the instance handler stack.
715 next.ServeHTTP(w, req)
719 mux.ServeHTTP(w, req)
725 type CurrentUser struct {
726 Authorization arvados.APIClientAuthorization
730 func (h *Handler) validateAPItoken(req *http.Request, user *CurrentUser) error {
735 return db.QueryRowContext(req.Context(), `SELECT api_client_authorizations.uuid, users.uuid FROM api_client_authorizations JOIN users on api_client_authorizations.user_id=users.id WHERE api_token=$1 AND (expires_at IS NULL OR expires_at > current_timestamp) LIMIT 1`, user.Authorization.APIToken).Scan(&user.Authorization.UUID, &user.UUID)
738 // Extract the auth token supplied in req, and replace it with a
739 // salted token for the remote cluster.
740 func (h *Handler) saltAuthToken(req *http.Request, remote string) (updatedReq *http.Request, err error) {
741 updatedReq = (&http.Request{
746 ContentLength: req.ContentLength,
748 }).WithContext(req.Context())
750 creds := auth.NewCredentials()
751 creds.LoadTokensFromHTTPRequest(updatedReq)
752 if len(creds.Tokens) == 0 && updatedReq.Header.Get("Content-Type") == "application/x-www-form-encoded" {
753 // Override ParseForm's 10MiB limit by ensuring
754 // req.Body is a *http.maxBytesReader.
755 updatedReq.Body = http.MaxBytesReader(nil, updatedReq.Body, 1<<28) // 256MiB. TODO: use MaxRequestSize from discovery doc or config.
756 if err := creds.LoadTokensFromHTTPRequestBody(updatedReq); err != nil {
759 // Replace req.Body with a buffer that re-encodes the
760 // form without api_token, in case we end up
761 // forwarding the request.
762 if updatedReq.PostForm != nil {
763 updatedReq.PostForm.Del("api_token")
765 updatedReq.Body = ioutil.NopCloser(bytes.NewBufferString(updatedReq.PostForm.Encode()))
767 if len(creds.Tokens) == 0 {
768 return updatedReq, nil
771 token, err := auth.SaltToken(creds.Tokens[0], remote)
773 if err == auth.ErrObsoleteToken {
774 // If the token exists in our own database, salt it
775 // for the remote. Otherwise, assume it was issued by
776 // the remote, and pass it through unmodified.
777 currentUser := CurrentUser{Authorization: arvados.APIClientAuthorization{APIToken: creds.Tokens[0]}}
778 err = h.validateAPItoken(req, ¤tUser)
779 if err == sql.ErrNoRows {
780 // Not ours; pass through unmodified.
781 token = currentUser.Authorization.APIToken
782 } else if err != nil {
785 // Found; make V2 version and salt it.
786 token, err = auth.SaltToken(currentUser.Authorization.TokenV2(), remote)
791 } else if err != nil {
794 updatedReq.Header = http.Header{}
795 for k, v := range req.Header {
796 if k != "Authorization" {
797 updatedReq.Header[k] = v
800 updatedReq.Header.Set("Authorization", "Bearer "+token)
802 // Remove api_token=... from the the query string, in case we
803 // end up forwarding the request.
804 if values, err := url.ParseQuery(updatedReq.URL.RawQuery); err != nil {
806 } else if _, ok := values["api_token"]; ok {
807 delete(values, "api_token")
808 updatedReq.URL = &url.URL{
809 Scheme: req.URL.Scheme,
812 RawPath: req.URL.RawPath,
813 RawQuery: values.Encode(),
816 return updatedReq, nil