1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.curoverse.com/arvados.git/sdk/go/arvados"
21 "git.curoverse.com/arvados.git/sdk/go/httpserver"
22 "git.curoverse.com/arvados.git/sdk/go/keepclient"
25 type collectionFederatedRequestHandler struct {
30 func rewriteSignatures(clusterID string, expectHash string,
31 resp *http.Response, requestError error) (newResponse *http.Response, err error) {
33 if requestError != nil {
34 return resp, requestError
37 if resp.StatusCode != http.StatusOK {
41 originalBody := resp.Body
42 defer originalBody.Close()
44 var col arvados.Collection
45 err = json.NewDecoder(resp.Body).Decode(&col)
50 // rewriting signatures will make manifest text 5-10% bigger so calculate
51 // capacity accordingly
52 updatedManifest := bytes.NewBuffer(make([]byte, 0, int(float64(len(col.ManifestText))*1.1)))
55 mw := io.MultiWriter(hasher, updatedManifest)
58 scanner := bufio.NewScanner(strings.NewReader(col.ManifestText))
59 scanner.Buffer(make([]byte, 1048576), len(col.ManifestText))
61 line := scanner.Text()
62 tokens := strings.Split(line, " ")
64 return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
67 n, err := mw.Write([]byte(tokens[0]))
69 return nil, fmt.Errorf("Error updating manifest: %v", err)
72 for _, token := range tokens[1:] {
73 n, err = mw.Write([]byte(" "))
75 return nil, fmt.Errorf("Error updating manifest: %v", err)
79 m := keepclient.SignedLocatorRe.FindStringSubmatch(token)
81 // Rewrite the block signature to be a remote signature
82 _, err = fmt.Fprintf(updatedManifest, "%s%s%s+R%s-%s%s", m[1], m[2], m[3], clusterID, m[5][2:], m[8])
84 return nil, fmt.Errorf("Error updating manifest: %v", err)
87 // for hash checking, ignore signatures
88 n, err = fmt.Fprintf(hasher, "%s%s", m[1], m[2])
90 return nil, fmt.Errorf("Error updating manifest: %v", err)
94 n, err = mw.Write([]byte(token))
96 return nil, fmt.Errorf("Error updating manifest: %v", err)
101 n, err = mw.Write([]byte("\n"))
103 return nil, fmt.Errorf("Error updating manifest: %v", err)
108 // Check that expected hash is consistent with
109 // portable_data_hash field of the returned record
110 if expectHash == "" {
111 expectHash = col.PortableDataHash
112 } else if expectHash != col.PortableDataHash {
113 return nil, fmt.Errorf("portable_data_hash %q on returned record did not match expected hash %q ", expectHash, col.PortableDataHash)
116 // Certify that the computed hash of the manifest_text matches our expectation
117 sum := hasher.Sum(nil)
118 computedHash := fmt.Sprintf("%x+%v", sum, sz)
119 if computedHash != expectHash {
120 return nil, fmt.Errorf("Computed manifest_text hash %q did not match expected hash %q", computedHash, expectHash)
123 col.ManifestText = updatedManifest.String()
125 newbody, err := json.Marshal(col)
130 buf := bytes.NewBuffer(newbody)
131 resp.Body = ioutil.NopCloser(buf)
132 resp.ContentLength = int64(buf.Len())
133 resp.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len()))
138 func filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
139 if requestError != nil {
140 return resp, requestError
143 if resp.StatusCode == http.StatusNotFound {
144 // Suppress returning this result, because we want to
145 // search the federation.
151 type searchRemoteClusterForPDH struct {
156 sharedContext *context.Context
162 func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
163 if req.Method != "GET" {
164 // Only handle GET requests right now
165 h.next.ServeHTTP(w, req)
169 m := collectionByPDHRe.FindStringSubmatch(req.URL.Path)
171 // Not a collection PDH GET request
172 m = collectionRe.FindStringSubmatch(req.URL.Path)
179 if clusterId != "" && clusterId != h.handler.Cluster.ClusterID {
180 // request for remote collection by uuid
181 resp, err := h.handler.remoteClusterRequest(clusterId, req)
182 newResponse, err := rewriteSignatures(clusterId, "", resp, err)
183 h.handler.proxy.ForwardResponse(w, newResponse, err)
186 // not a collection UUID request, or it is a request
187 // for a local UUID, either way, continue down the
189 h.next.ServeHTTP(w, req)
193 // Request for collection by PDH. Search the federation.
195 // First, query the local cluster.
196 resp, err := h.handler.localClusterRequest(req)
197 newResp, err := filterLocalClusterResponse(resp, err)
198 if newResp != nil || err != nil {
199 h.handler.proxy.ForwardResponse(w, newResp, err)
203 // Create a goroutine for each cluster in the
204 // RemoteClusters map. The first valid result gets
205 // returned to the client. When that happens, all
206 // other outstanding requests are cancelled
207 sharedContext, cancelFunc := context.WithCancel(req.Context())
208 req = req.WithContext(sharedContext)
209 wg := sync.WaitGroup{}
211 success := make(chan *http.Response)
212 errorChan := make(chan error)
214 // use channel as a semaphore to limit the number of concurrent
215 // requests at a time
216 sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
218 defer close(errorChan)
223 for remoteID := range h.handler.Cluster.RemoteClusters {
224 if remoteID == h.handler.Cluster.ClusterID {
225 // No need to query local cluster again
230 go func(remote string) {
232 // blocks until it can put a value into the
233 // channel (which has a max queue capacity)
236 case <-sharedContext.Done():
241 resp, err := h.handler.remoteClusterRequest(remote, req)
244 if resp != nil && !wasSuccess {
252 if resp.StatusCode != http.StatusOK {
253 errorChan <- HTTPError{resp.Status, resp.StatusCode}
257 case <-sharedContext.Done():
262 newResponse, err := rewriteSignatures(remote, pdh, resp, nil)
268 case <-sharedContext.Done():
269 case success <- newResponse:
281 errorCode := http.StatusNotFound
285 case newResp = <-success:
286 h.handler.proxy.ForwardResponse(w, newResp, nil)
288 case err := <-errorChan:
289 if httperr, ok := err.(HTTPError); ok {
290 if httperr.Code != http.StatusNotFound {
291 errorCode = http.StatusBadGateway
294 errors = append(errors, err.Error())
295 case <-sharedContext.Done():
296 httpserver.Errors(w, errors, errorCode)