1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.curoverse.com/arvados.git/sdk/go/arvados"
21 "git.curoverse.com/arvados.git/sdk/go/httpserver"
22 "git.curoverse.com/arvados.git/sdk/go/keepclient"
25 type collectionFederatedRequestHandler struct {
30 func rewriteSignatures(clusterID string, expectHash string,
31 resp *http.Response, requestError error) (newResponse *http.Response, err error) {
33 if requestError != nil {
34 return resp, requestError
37 if resp.StatusCode != 200 {
41 originalBody := resp.Body
42 defer originalBody.Close()
44 var col arvados.Collection
45 err = json.NewDecoder(resp.Body).Decode(&col)
50 // rewriting signatures will make manifest text 5-10% bigger so calculate
51 // capacity accordingly
52 updatedManifest := bytes.NewBuffer(make([]byte, 0, int(float64(len(col.ManifestText))*1.1)))
55 mw := io.MultiWriter(hasher, updatedManifest)
58 scanner := bufio.NewScanner(strings.NewReader(col.ManifestText))
59 scanner.Buffer(make([]byte, 1048576), len(col.ManifestText))
61 line := scanner.Text()
62 tokens := strings.Split(line, " ")
64 return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
67 n, err := mw.Write([]byte(tokens[0]))
69 return nil, fmt.Errorf("Error updating manifest: %v", err)
72 for _, token := range tokens[1:] {
73 n, err = mw.Write([]byte(" "))
75 return nil, fmt.Errorf("Error updating manifest: %v", err)
79 m := keepclient.SignedLocatorRe.FindStringSubmatch(token)
81 // Rewrite the block signature to be a remote signature
82 _, err = fmt.Fprintf(updatedManifest, "%s%s%s+R%s-%s%s", m[1], m[2], m[3], clusterID, m[5][2:], m[8])
84 return nil, fmt.Errorf("Error updating manifest: %v", err)
87 // for hash checking, ignore signatures
88 n, err = fmt.Fprintf(hasher, "%s%s", m[1], m[2])
90 return nil, fmt.Errorf("Error updating manifest: %v", err)
94 n, err = mw.Write([]byte(token))
96 return nil, fmt.Errorf("Error updating manifest: %v", err)
101 n, err = mw.Write([]byte("\n"))
103 return nil, fmt.Errorf("Error updating manifest: %v", err)
108 // Check that expected hash is consistent with
109 // portable_data_hash field of the returned record
110 if expectHash == "" {
111 expectHash = col.PortableDataHash
112 } else if expectHash != col.PortableDataHash {
113 return nil, fmt.Errorf("portable_data_hash %q on returned record did not match expected hash %q ", expectHash, col.PortableDataHash)
116 // Certify that the computed hash of the manifest_text matches our expectation
117 sum := hasher.Sum(nil)
118 computedHash := fmt.Sprintf("%x+%v", sum, sz)
119 if computedHash != expectHash {
120 return nil, fmt.Errorf("Computed manifest_text hash %q did not match expected hash %q", computedHash, expectHash)
123 col.ManifestText = updatedManifest.String()
125 newbody, err := json.Marshal(col)
130 buf := bytes.NewBuffer(newbody)
131 resp.Body = ioutil.NopCloser(buf)
132 resp.ContentLength = int64(buf.Len())
133 resp.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len()))
138 func filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
139 if requestError != nil {
140 return resp, requestError
143 if resp.StatusCode == 404 {
144 // Suppress returning this result, because we want to
145 // search the federation.
151 type searchRemoteClusterForPDH struct {
156 sharedContext *context.Context
162 func (s *searchRemoteClusterForPDH) filterRemoteClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
167 // Another request already returned a response
171 if requestError != nil {
172 *s.errors = append(*s.errors, fmt.Sprintf("Request error contacting %q: %v", s.remoteID, requestError))
173 // Record the error and suppress response
177 if resp.StatusCode != 200 {
178 // Suppress returning unsuccessful result. Maybe
179 // another request will find it.
180 // TODO collect and return error responses.
181 *s.errors = append(*s.errors, fmt.Sprintf("Response from %q: %v", s.remoteID, resp.Status))
182 if resp.StatusCode != 404 {
183 // Got a non-404 error response, convert into BadGateway
184 *s.statusCode = http.StatusBadGateway
191 // This reads the response body. We don't want to hold the
192 // lock while doing this because other remote requests could
193 // also have made it to this point, and we don't want a
194 // slow response holding the lock to block a faster response
195 // that is waiting on the lock.
196 newResponse, err = rewriteSignatures(s.remoteID, s.pdh, resp, nil)
201 // Another request already returned a response
206 // Suppress returning unsuccessful result. Maybe
207 // another request will be successful.
208 *s.errors = append(*s.errors, fmt.Sprintf("Error parsing response from %q: %v", s.remoteID, err))
212 // We have a successful response. Suppress/cancel all the
213 // other requests/responses.
214 *s.sentResponse = true
217 return newResponse, nil
220 func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
221 if req.Method != "GET" {
222 // Only handle GET requests right now
223 h.next.ServeHTTP(w, req)
227 m := collectionByPDHRe.FindStringSubmatch(req.URL.Path)
229 // Not a collection PDH GET request
230 m = collectionRe.FindStringSubmatch(req.URL.Path)
237 if clusterId != "" && clusterId != h.handler.Cluster.ClusterID {
238 // request for remote collection by uuid
239 resp, err := h.handler.remoteClusterRequest(clusterId, req)
240 newResponse, err := rewriteSignatures(clusterId, "", resp, err)
241 h.handler.proxy.ForwardResponse(w, newResponse, err)
244 // not a collection UUID request, or it is a request
245 // for a local UUID, either way, continue down the
247 h.next.ServeHTTP(w, req)
251 // Request for collection by PDH. Search the federation.
253 // First, query the local cluster.
254 resp, err := h.handler.localClusterRequest(req)
255 newResp, err := filterLocalClusterResponse(resp, err)
256 if newResp != nil || err != nil {
257 h.handler.proxy.ForwardResponse(w, newResp, err)
261 sharedContext, cancelFunc := context.WithCancel(req.Context())
263 req = req.WithContext(sharedContext)
265 // Create a goroutine for each cluster in the
266 // RemoteClusters map. The first valid result gets
267 // returned to the client. When that happens, all
268 // other outstanding requests are cancelled or
270 sentResponse := false
272 wg := sync.WaitGroup{}
274 var errorCode int = 404
276 // use channel as a semaphore to limit the number of concurrent
277 // requests at a time
278 sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
280 for remoteID := range h.handler.Cluster.RemoteClusters {
281 if remoteID == h.handler.Cluster.ClusterID {
282 // No need to query local cluster again
285 // blocks until it can put a value into the
286 // channel (which has a max queue capacity)
291 search := &searchRemoteClusterForPDH{m[1], remoteID, &mtx, &sentResponse,
292 &sharedContext, cancelFunc, &errors, &errorCode}
295 resp, err := h.handler.remoteClusterRequest(search.remoteID, req)
296 newResp, err := search.filterRemoteClusterResponse(resp, err)
297 if newResp != nil || err != nil {
298 h.handler.proxy.ForwardResponse(w, newResp, err)
310 // No successful responses, so return the error
311 httpserver.Errors(w, errors, errorCode)