1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.curoverse.com/arvados.git/sdk/go/arvados"
21 "git.curoverse.com/arvados.git/sdk/go/httpserver"
22 "git.curoverse.com/arvados.git/sdk/go/keepclient"
25 type collectionFederatedRequestHandler struct {
30 func rewriteSignatures(clusterID string, expectHash string,
31 resp *http.Response, requestError error) (newResponse *http.Response, err error) {
33 if requestError != nil {
34 return resp, requestError
37 if resp.StatusCode != http.StatusOK {
41 originalBody := resp.Body
42 defer originalBody.Close()
44 var col arvados.Collection
45 err = json.NewDecoder(resp.Body).Decode(&col)
50 // rewriting signatures will make manifest text 5-10% bigger so calculate
51 // capacity accordingly
52 updatedManifest := bytes.NewBuffer(make([]byte, 0, int(float64(len(col.ManifestText))*1.1)))
55 mw := io.MultiWriter(hasher, updatedManifest)
58 scanner := bufio.NewScanner(strings.NewReader(col.ManifestText))
59 scanner.Buffer(make([]byte, 1048576), len(col.ManifestText))
61 line := scanner.Text()
62 tokens := strings.Split(line, " ")
64 return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
67 n, err := mw.Write([]byte(tokens[0]))
69 return nil, fmt.Errorf("Error updating manifest: %v", err)
72 for _, token := range tokens[1:] {
73 n, err = mw.Write([]byte(" "))
75 return nil, fmt.Errorf("Error updating manifest: %v", err)
79 m := keepclient.SignedLocatorRe.FindStringSubmatch(token)
81 // Rewrite the block signature to be a remote signature
82 _, err = fmt.Fprintf(updatedManifest, "%s%s%s+R%s-%s%s", m[1], m[2], m[3], clusterID, m[5][2:], m[8])
84 return nil, fmt.Errorf("Error updating manifest: %v", err)
87 // for hash checking, ignore signatures
88 n, err = fmt.Fprintf(hasher, "%s%s", m[1], m[2])
90 return nil, fmt.Errorf("Error updating manifest: %v", err)
94 n, err = mw.Write([]byte(token))
96 return nil, fmt.Errorf("Error updating manifest: %v", err)
101 n, err = mw.Write([]byte("\n"))
103 return nil, fmt.Errorf("Error updating manifest: %v", err)
108 // Check that expected hash is consistent with
109 // portable_data_hash field of the returned record
110 if expectHash == "" {
111 expectHash = col.PortableDataHash
112 } else if expectHash != col.PortableDataHash {
113 return nil, fmt.Errorf("portable_data_hash %q on returned record did not match expected hash %q ", expectHash, col.PortableDataHash)
116 // Certify that the computed hash of the manifest_text matches our expectation
117 sum := hasher.Sum(nil)
118 computedHash := fmt.Sprintf("%x+%v", sum, sz)
119 if computedHash != expectHash {
120 return nil, fmt.Errorf("Computed manifest_text hash %q did not match expected hash %q", computedHash, expectHash)
123 col.ManifestText = updatedManifest.String()
125 newbody, err := json.Marshal(col)
130 buf := bytes.NewBuffer(newbody)
131 resp.Body = ioutil.NopCloser(buf)
132 resp.ContentLength = int64(buf.Len())
133 resp.Header.Set("Content-Length", fmt.Sprintf("%v", buf.Len()))
138 func filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
139 if requestError != nil {
140 return resp, requestError
143 if resp.StatusCode == http.StatusNotFound {
144 // Suppress returning this result, because we want to
145 // search the federation.
151 type searchRemoteClusterForPDH struct {
156 sharedContext *context.Context
162 func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
163 if req.Method != "GET" {
164 // Only handle GET requests right now
165 h.next.ServeHTTP(w, req)
169 m := collectionByPDHRe.FindStringSubmatch(req.URL.Path)
171 // Not a collection PDH GET request
172 m = collectionRe.FindStringSubmatch(req.URL.Path)
179 if clusterId != "" && clusterId != h.handler.Cluster.ClusterID {
180 // request for remote collection by uuid
181 resp, err := h.handler.remoteClusterRequest(clusterId, req)
182 newResponse, err := rewriteSignatures(clusterId, "", resp, err)
183 h.handler.proxy.ForwardResponse(w, newResponse, err)
186 // not a collection UUID request, or it is a request
187 // for a local UUID, either way, continue down the
189 h.next.ServeHTTP(w, req)
193 // Request for collection by PDH. Search the federation.
195 // First, query the local cluster.
196 resp, err := h.handler.localClusterRequest(req)
197 newResp, err := filterLocalClusterResponse(resp, err)
198 if newResp != nil || err != nil {
199 h.handler.proxy.ForwardResponse(w, newResp, err)
203 // Create a goroutine for each cluster in the
204 // RemoteClusters map. The first valid result gets
205 // returned to the client. When that happens, all
206 // other outstanding requests are cancelled
207 sharedContext, cancelFunc := context.WithCancel(req.Context())
208 req = req.WithContext(sharedContext)
209 wg := sync.WaitGroup{}
211 success := make(chan *http.Response)
212 errorChan := make(chan error, len(h.handler.Cluster.RemoteClusters))
214 // use channel as a semaphore to limit the number of concurrent
215 // requests at a time
216 sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
220 for remoteID := range h.handler.Cluster.RemoteClusters {
221 if remoteID == h.handler.Cluster.ClusterID {
222 // No need to query local cluster again
227 go func(remote string) {
229 // blocks until it can put a value into the
230 // channel (which has a max queue capacity)
233 case <-sharedContext.Done():
238 resp, err := h.handler.remoteClusterRequest(remote, req)
241 if resp != nil && !wasSuccess {
249 if resp.StatusCode != http.StatusOK {
250 errorChan <- HTTPError{resp.Status, resp.StatusCode}
254 case <-sharedContext.Done():
259 newResponse, err := rewriteSignatures(remote, pdh, resp, nil)
265 case <-sharedContext.Done():
266 case success <- newResponse:
277 errorCode := http.StatusNotFound
281 case newResp = <-success:
282 h.handler.proxy.ForwardResponse(w, newResp, nil)
284 case <-sharedContext.Done():
286 for len(errorChan) > 0 {
288 if httperr, ok := err.(HTTPError); ok {
289 if httperr.Code != http.StatusNotFound {
290 errorCode = http.StatusBadGateway
293 errors = append(errors, err.Error())
295 httpserver.Errors(w, errors, errorCode)