1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.curoverse.com/arvados.git/sdk/go/arvados"
21 "git.curoverse.com/arvados.git/sdk/go/httpserver"
22 "git.curoverse.com/arvados.git/sdk/go/keepclient"
25 func rewriteSignatures(clusterID string, expectHash string,
26 resp *http.Response, requestError error) (newResponse *http.Response, err error) {
28 if requestError != nil {
29 return resp, requestError
32 if resp.StatusCode != http.StatusOK {
36 originalBody := resp.Body
37 defer originalBody.Close()
39 var col arvados.Collection
40 err = json.NewDecoder(resp.Body).Decode(&col)
45 // rewriting signatures will make manifest text 5-10% bigger so calculate
46 // capacity accordingly
47 updatedManifest := bytes.NewBuffer(make([]byte, 0, int(float64(len(col.ManifestText))*1.1)))
50 mw := io.MultiWriter(hasher, updatedManifest)
53 scanner := bufio.NewScanner(strings.NewReader(col.ManifestText))
54 scanner.Buffer(make([]byte, 1048576), len(col.ManifestText))
56 line := scanner.Text()
57 tokens := strings.Split(line, " ")
59 return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
62 n, err := mw.Write([]byte(tokens[0]))
64 return nil, fmt.Errorf("Error updating manifest: %v", err)
67 for _, token := range tokens[1:] {
68 n, err = mw.Write([]byte(" "))
70 return nil, fmt.Errorf("Error updating manifest: %v", err)
74 m := keepclient.SignedLocatorRe.FindStringSubmatch(token)
76 // Rewrite the block signature to be a remote signature
77 _, err = fmt.Fprintf(updatedManifest, "%s%s%s+R%s-%s%s", m[1], m[2], m[3], clusterID, m[5][2:], m[8])
79 return nil, fmt.Errorf("Error updating manifest: %v", err)
82 // for hash checking, ignore signatures
83 n, err = fmt.Fprintf(hasher, "%s%s", m[1], m[2])
85 return nil, fmt.Errorf("Error updating manifest: %v", err)
89 n, err = mw.Write([]byte(token))
91 return nil, fmt.Errorf("Error updating manifest: %v", err)
96 n, err = mw.Write([]byte("\n"))
98 return nil, fmt.Errorf("Error updating manifest: %v", err)
103 // Check that expected hash is consistent with
104 // portable_data_hash field of the returned record
105 if expectHash == "" {
106 expectHash = col.PortableDataHash
107 } else if expectHash != col.PortableDataHash {
108 return nil, fmt.Errorf("portable_data_hash %q on returned record did not match expected hash %q ", expectHash, col.PortableDataHash)
111 // Certify that the computed hash of the manifest_text matches our expectation
112 sum := hasher.Sum(nil)
113 computedHash := fmt.Sprintf("%x+%v", sum, sz)
114 if computedHash != expectHash {
115 return nil, fmt.Errorf("Computed manifest_text hash %q did not match expected hash %q", computedHash, expectHash)
118 col.ManifestText = updatedManifest.String()
120 newbody, err := json.Marshal(col)
125 buf := bytes.NewBuffer(newbody)
126 resp.Body = ioutil.NopCloser(buf)
127 resp.ContentLength = int64(buf.Len())
128 resp.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len()))
133 func filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
134 if requestError != nil {
135 return resp, requestError
138 if resp.StatusCode == http.StatusNotFound {
139 // Suppress returning this result, because we want to
140 // search the federation.
146 type searchRemoteClusterForPDH struct {
151 sharedContext *context.Context
157 func fetchRemoteCollectionByUUID(
158 h *genericFederatedRequestHandler,
159 effectiveMethod string,
163 w http.ResponseWriter,
164 req *http.Request) bool {
166 if effectiveMethod != "GET" {
167 // Only handle GET requests right now
172 // Collection UUID GET request
173 *clusterId = uuid[0:5]
174 if *clusterId != "" && *clusterId != h.handler.Cluster.ClusterID {
175 // request for remote collection by uuid
176 resp, err := h.handler.remoteClusterRequest(*clusterId, req)
177 newResponse, err := rewriteSignatures(*clusterId, "", resp, err)
178 h.handler.proxy.ForwardResponse(w, newResponse, err)
186 func fetchRemoteCollectionByPDH(
187 h *genericFederatedRequestHandler,
188 effectiveMethod string,
192 w http.ResponseWriter,
193 req *http.Request) bool {
195 if effectiveMethod != "GET" {
196 // Only handle GET requests right now
200 m := collectionsByPDHRe.FindStringSubmatch(req.URL.Path)
205 // Request for collection by PDH. Search the federation.
207 // First, query the local cluster.
208 resp, err := h.handler.localClusterRequest(req)
209 newResp, err := filterLocalClusterResponse(resp, err)
210 if newResp != nil || err != nil {
211 h.handler.proxy.ForwardResponse(w, newResp, err)
215 // Create a goroutine for each cluster in the
216 // RemoteClusters map. The first valid result gets
217 // returned to the client. When that happens, all
218 // other outstanding requests are cancelled
219 sharedContext, cancelFunc := context.WithCancel(req.Context())
222 req = req.WithContext(sharedContext)
223 wg := sync.WaitGroup{}
225 success := make(chan *http.Response)
226 errorChan := make(chan error, len(h.handler.Cluster.RemoteClusters))
228 acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification)
230 for remoteID := range h.handler.Cluster.RemoteClusters {
231 if remoteID == h.handler.Cluster.ClusterID {
232 // No need to query local cluster again
237 go func(remote string) {
242 case <-sharedContext.Done():
247 resp, err := h.handler.remoteClusterRequest(remote, req)
250 if resp != nil && !wasSuccess {
258 if resp.StatusCode != http.StatusOK {
259 errorChan <- HTTPError{resp.Status, resp.StatusCode}
263 case <-sharedContext.Done():
268 newResponse, err := rewriteSignatures(remote, pdh, resp, nil)
274 case <-sharedContext.Done():
275 case success <- newResponse:
285 errorCode := http.StatusNotFound
289 case newResp = <-success:
290 h.handler.proxy.ForwardResponse(w, newResp, nil)
292 case <-sharedContext.Done():
294 for len(errorChan) > 0 {
296 if httperr, ok := err.(HTTPError); ok {
297 if httperr.Code != http.StatusNotFound {
298 errorCode = http.StatusBadGateway
301 errors = append(errors, err.Error())
303 httpserver.Errors(w, errors, errorCode)
308 // shouldn't ever get here