1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.curoverse.com/arvados.git/sdk/go/arvados"
21 "git.curoverse.com/arvados.git/sdk/go/httpserver"
22 "git.curoverse.com/arvados.git/sdk/go/keepclient"
25 type collectionFederatedRequestHandler struct {
30 func rewriteSignatures(clusterID string, expectHash string,
31 resp *http.Response, requestError error) (newResponse *http.Response, err error) {
33 if requestError != nil {
34 return resp, requestError
37 if resp.StatusCode != http.StatusOK {
41 originalBody := resp.Body
42 defer originalBody.Close()
44 var col arvados.Collection
45 err = json.NewDecoder(resp.Body).Decode(&col)
50 // rewriting signatures will make manifest text 5-10% bigger so calculate
51 // capacity accordingly
52 updatedManifest := bytes.NewBuffer(make([]byte, 0, int(float64(len(col.ManifestText))*1.1)))
55 mw := io.MultiWriter(hasher, updatedManifest)
58 scanner := bufio.NewScanner(strings.NewReader(col.ManifestText))
59 scanner.Buffer(make([]byte, 1048576), len(col.ManifestText))
61 line := scanner.Text()
62 tokens := strings.Split(line, " ")
64 return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
67 n, err := mw.Write([]byte(tokens[0]))
69 return nil, fmt.Errorf("Error updating manifest: %v", err)
72 for _, token := range tokens[1:] {
73 n, err = mw.Write([]byte(" "))
75 return nil, fmt.Errorf("Error updating manifest: %v", err)
79 m := keepclient.SignedLocatorRe.FindStringSubmatch(token)
81 // Rewrite the block signature to be a remote signature
82 _, err = fmt.Fprintf(updatedManifest, "%s%s%s+R%s-%s%s", m[1], m[2], m[3], clusterID, m[5][2:], m[8])
84 return nil, fmt.Errorf("Error updating manifest: %v", err)
87 // for hash checking, ignore signatures
88 n, err = fmt.Fprintf(hasher, "%s%s", m[1], m[2])
90 return nil, fmt.Errorf("Error updating manifest: %v", err)
94 n, err = mw.Write([]byte(token))
96 return nil, fmt.Errorf("Error updating manifest: %v", err)
101 n, err = mw.Write([]byte("\n"))
103 return nil, fmt.Errorf("Error updating manifest: %v", err)
108 // Check that expected hash is consistent with
109 // portable_data_hash field of the returned record
110 if expectHash == "" {
111 expectHash = col.PortableDataHash
112 } else if expectHash != col.PortableDataHash {
113 return nil, fmt.Errorf("portable_data_hash %q on returned record did not match expected hash %q ", expectHash, col.PortableDataHash)
116 // Certify that the computed hash of the manifest_text matches our expectation
117 sum := hasher.Sum(nil)
118 computedHash := fmt.Sprintf("%x+%v", sum, sz)
119 if computedHash != expectHash {
120 return nil, fmt.Errorf("Computed manifest_text hash %q did not match expected hash %q", computedHash, expectHash)
123 col.ManifestText = updatedManifest.String()
125 newbody, err := json.Marshal(col)
130 buf := bytes.NewBuffer(newbody)
131 resp.Body = ioutil.NopCloser(buf)
132 resp.ContentLength = int64(buf.Len())
133 resp.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len()))
138 func filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
139 if requestError != nil {
140 return resp, requestError
143 if resp.StatusCode == http.StatusNotFound {
144 // Suppress returning this result, because we want to
145 // search the federation.
151 type searchRemoteClusterForPDH struct {
156 sharedContext *context.Context
162 func (s *searchRemoteClusterForPDH) filterRemoteClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
167 // Another request already returned a response
171 if requestError != nil {
172 *s.errors = append(*s.errors, fmt.Sprintf("Request error contacting %q: %v", s.remoteID, requestError))
173 // Record the error and suppress response
177 if resp.StatusCode != http.StatusOK {
178 // Suppress returning unsuccessful result. Maybe
179 // another request will find it.
180 *s.errors = append(*s.errors, fmt.Sprintf("Response to %q from %q: %v", resp.Header.Get(httpserver.HeaderRequestID), s.remoteID, resp.Status))
181 if resp.StatusCode != http.StatusNotFound {
182 // Got a non-404 error response, convert into BadGateway
183 *s.statusCode = http.StatusBadGateway
190 // This reads the response body. We don't want to hold the
191 // lock while doing this because other remote requests could
192 // also have made it to this point, and we don't want a
193 // slow response holding the lock to block a faster response
194 // that is waiting on the lock.
195 newResponse, err = rewriteSignatures(s.remoteID, s.pdh, resp, nil)
200 // Another request already returned a response
205 // Suppress returning unsuccessful result. Maybe
206 // another request will be successful.
207 *s.errors = append(*s.errors, fmt.Sprintf("Error parsing response from %q: %v", s.remoteID, err))
211 // We have a successful response. Suppress/cancel all the
212 // other requests/responses.
213 *s.sentResponse = true
216 return newResponse, nil
219 func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
220 if req.Method != "GET" {
221 // Only handle GET requests right now
222 h.next.ServeHTTP(w, req)
226 m := collectionByPDHRe.FindStringSubmatch(req.URL.Path)
228 // Not a collection PDH GET request
229 m = collectionRe.FindStringSubmatch(req.URL.Path)
236 if clusterId != "" && clusterId != h.handler.Cluster.ClusterID {
237 // request for remote collection by uuid
238 resp, cancel, err := h.handler.remoteClusterRequest(clusterId, req)
242 newResponse, err := rewriteSignatures(clusterId, "", resp, err)
243 h.handler.proxy.ForwardResponse(w, newResponse, err)
246 // not a collection UUID request, or it is a request
247 // for a local UUID, either way, continue down the
249 h.next.ServeHTTP(w, req)
253 // Request for collection by PDH. Search the federation.
255 // First, query the local cluster.
256 resp, localClusterRequestCancel, err := h.handler.localClusterRequest(req)
257 if localClusterRequestCancel != nil {
258 defer localClusterRequestCancel()
260 newResp, err := filterLocalClusterResponse(resp, err)
261 if newResp != nil || err != nil {
262 h.handler.proxy.ForwardResponse(w, newResp, err)
266 sharedContext, cancelFunc := context.WithCancel(req.Context())
268 req = req.WithContext(sharedContext)
270 // Create a goroutine for each cluster in the
271 // RemoteClusters map. The first valid result gets
272 // returned to the client. When that happens, all
273 // other outstanding requests are cancelled or
275 sentResponse := false
277 wg := sync.WaitGroup{}
279 var errorCode int = http.StatusNotFound
281 // use channel as a semaphore to limit the number of concurrent
282 // requests at a time
283 sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
285 for remoteID := range h.handler.Cluster.RemoteClusters {
286 if remoteID == h.handler.Cluster.ClusterID {
287 // No need to query local cluster again
290 // blocks until it can put a value into the
291 // channel (which has a max queue capacity)
296 search := &searchRemoteClusterForPDH{m[1], remoteID, &mtx, &sentResponse,
297 &sharedContext, cancelFunc, &errors, &errorCode}
300 resp, cancel, err := h.handler.remoteClusterRequest(search.remoteID, req)
304 newResp, err := search.filterRemoteClusterResponse(resp, err)
305 if newResp != nil || err != nil {
306 h.handler.proxy.ForwardResponse(w, newResp, err)
318 // No successful responses, so return the error
319 httpserver.Errors(w, errors, errorCode)