"bufio"
"bytes"
"context"
+ "crypto/md5"
"database/sql"
"encoding/json"
"fmt"
+ "io"
"io/ioutil"
- "log"
"net/http"
"net/url"
"regexp"
if remote.Insecure {
client = h.insecureClient
}
- log.Printf("Remote cluster request to %v %v", remoteID, urlOut)
h.proxy.Do(w, req, urlOut, client, filter)
}
h.handler.remoteClusterRequest(m[1], w, req, nil)
}
-type rewriteSignaturesClusterId string
+type rewriteSignaturesClusterId struct {
+ clusterID string
+ expectHash string
+}
-func (clusterId rewriteSignaturesClusterId) rewriteSignatures(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+func (rw rewriteSignaturesClusterId) rewriteSignatures(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
if requestError != nil {
return resp, requestError
}
// capacity accordingly
updatedManifest := bytes.NewBuffer(make([]byte, 0, int(float64(len(col.ManifestText))*1.1)))
+ hasher := md5.New()
+ mw := io.MultiWriter(hasher, updatedManifest)
+ sz := 0
+
scanner := bufio.NewScanner(strings.NewReader(col.ManifestText))
scanner.Buffer(make([]byte, 1048576), len(col.ManifestText))
for scanner.Scan() {
return nil, fmt.Errorf("Invalid stream (<3 tokens): %q", line)
}
- updatedManifest.WriteString(tokens[0])
+ n, err := mw.Write([]byte(tokens[0]))
+ if err != nil {
+ return nil, fmt.Errorf("Error updating manifest: %v", err)
+ }
+ sz += n
for _, token := range tokens[1:] {
- updatedManifest.WriteString(" ")
+ n, err = mw.Write([]byte(" "))
+ if err != nil {
+ return nil, fmt.Errorf("Error updating manifest: %v", err)
+ }
+ sz += n
+
m := keepclient.SignedLocatorRe.FindStringSubmatch(token)
if m != nil {
// Rewrite the block signature to be a remote signature
- fmt.Fprintf(updatedManifest, "%s%s%s+R%s-%s%s", m[1], m[2], m[3], clusterId, m[5][2:], m[8])
+ _, err = fmt.Fprintf(updatedManifest, "%s%s%s+R%s-%s%s", m[1], m[2], m[3], rw.clusterID, m[5][2:], m[8])
+ if err != nil {
+ return nil, fmt.Errorf("Error updating manifest: %v", err)
+ }
+
+ // for hash checking, ignore signatures
+ n, err = fmt.Fprintf(hasher, "%s%s", m[1], m[2])
+ if err != nil {
+ return nil, fmt.Errorf("Error updating manifest: %v", err)
+ }
+ sz += n
} else {
- updatedManifest.WriteString(token)
+ n, err = mw.Write([]byte(token))
+ if err != nil {
+ return nil, fmt.Errorf("Error updating manifest: %v", err)
+ }
+ sz += n
}
-
}
- updatedManifest.WriteString("\n")
+ n, err = mw.Write([]byte("\n"))
+ if err != nil {
+ return nil, fmt.Errorf("Error updating manifest: %v", err)
+ }
+ sz += n
+ }
+
+ // Certify that the computed hash of the manifest matches our expectation
+ if rw.expectHash == "" {
+ rw.expectHash = col.PortableDataHash
+ }
+
+ sum := hasher.Sum(nil)
+ computedHash := fmt.Sprintf("%x+%v", sum, sz)
+ if computedHash != rw.expectHash {
+ return nil, fmt.Errorf("Computed hash %q did not match expected hash %q", computedHash, rw.expectHash)
}
col.ManifestText = updatedManifest.String()
}
type searchRemoteClusterForPDH struct {
+ pdh string
remoteID string
mtx *sync.Mutex
sentResponse *bool
// another request will find it.
// TODO collect and return error responses.
*s.errors = append(*s.errors, fmt.Sprintf("Response from %q: %v", s.remoteID, resp.Status))
- if resp.StatusCode == 404 && (*s.statusCode == 0 || *s.statusCode == 404) {
- // Only return 404 if every response is 404
- *s.statusCode = http.StatusNotFound
- } else {
+ if resp.StatusCode != 404 {
// Got a non-404 error response, convert into BadGateway
*s.statusCode = http.StatusBadGateway
}
// This reads the response body. We don't want to hold the
// lock while doing this because other remote requests could
- // also have made it to this point, and we want don't want a
+ // also have made it to this point, and we don't want a
// slow response holding the lock to block a faster response
// that is waiting on the lock.
- newResponse, err = rewriteSignaturesClusterId(s.remoteID).rewriteSignatures(resp, nil)
+ newResponse, err = rewriteSignaturesClusterId{s.remoteID, s.pdh}.rewriteSignatures(resp, nil)
s.mtx.Lock()
func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
m := collectionByPDHRe.FindStringSubmatch(req.URL.Path)
- if len(m) == 2 && len(h.handler.Cluster.RemoteClusters) > 0 {
- bearer := req.Header.Get("Authorization")
- if strings.HasPrefix(bearer, "Bearer v2/") &&
- len(bearer) > 10 &&
- bearer[10:15] != h.handler.Cluster.ClusterID {
- // Salted token from another cluster, just
- // fall back to query local cluster only.
- h.next.ServeHTTP(w, req)
+ if len(m) != 2 {
+ // Not a collection PDH request
+ m = collectionRe.FindStringSubmatch(req.URL.Path)
+ if len(m) == 2 && m[1] != h.handler.Cluster.ClusterID {
+ // request for remote collection by uuid
+ h.handler.remoteClusterRequest(m[1], w, req,
+ rewriteSignaturesClusterId{m[1], ""}.rewriteSignatures)
return
}
+ // not a collection UUID request, or it is a request
+ // for a local UUID, either way, continue down the
+ // handler stack.
+ h.next.ServeHTTP(w, req)
+ return
+ }
- urlOut, insecure, err := findRailsAPI(h.handler.Cluster, h.handler.NodeProfile)
- if err != nil {
- httpserver.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
+ // Request for collection by PDH. Search the federation.
- urlOut = &url.URL{
- Scheme: urlOut.Scheme,
- Host: urlOut.Host,
- Path: req.URL.Path,
- RawPath: req.URL.RawPath,
- RawQuery: req.URL.RawQuery,
- }
- client := h.handler.secureClient
- if insecure {
- client = h.handler.insecureClient
- }
- sf := &searchLocalClusterForPDH{}
- h.handler.proxy.Do(w, req, urlOut, client, sf.filterLocalClusterResponse)
- if sf.sentResponse {
- // a response was sent, nothing more to do
- return
- }
+ // First, query the local cluster.
+ urlOut, insecure, err := findRailsAPI(h.handler.Cluster, h.handler.NodeProfile)
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
- sharedContext, cancelFunc := context.WithCancel(req.Context())
- defer cancelFunc()
- req = req.WithContext(sharedContext)
-
- // Create a goroutine that will contact each cluster
- // in the RemoteClusters map. The first one to return
- // a valid result gets returned to the client. When
- // that happens, all other outstanding requests are
- // cancelled or suppressed.
- sentResponse := false
- mtx := sync.Mutex{}
- wg := sync.WaitGroup{}
- var errors []string
- var errorCode int = 0
- for remoteID := range h.handler.Cluster.RemoteClusters {
- search := &searchRemoteClusterForPDH{remoteID, &mtx, &sentResponse,
- &sharedContext, cancelFunc, &errors, &errorCode}
- wg.Add(1)
- go func() {
- h.handler.remoteClusterRequest(search.remoteID, w, req, search.filterRemoteClusterResponse)
- wg.Done()
- }()
- }
- wg.Wait()
- if sentResponse {
- return
- }
+ urlOut = &url.URL{
+ Scheme: urlOut.Scheme,
+ Host: urlOut.Host,
+ Path: req.URL.Path,
+ RawPath: req.URL.RawPath,
+ RawQuery: req.URL.RawQuery,
+ }
+ client := h.handler.secureClient
+ if insecure {
+ client = h.handler.insecureClient
+ }
+ sf := &searchLocalClusterForPDH{}
+ h.handler.proxy.Do(w, req, urlOut, client, sf.filterLocalClusterResponse)
+ if sf.sentResponse {
+ return
+ }
- if errorCode == 0 {
- errorCode = http.StatusBadGateway
+ sharedContext, cancelFunc := context.WithCancel(req.Context())
+ defer cancelFunc()
+ req = req.WithContext(sharedContext)
+
+ // Create a goroutine for each cluster in the
+ // RemoteClusters map. The first valid result gets
+ // returned to the client. When that happens, all
+ // other outstanding requests are cancelled or
+ // suppressed.
+ sentResponse := false
+ mtx := sync.Mutex{}
+ wg := sync.WaitGroup{}
+ var errors []string
+ var errorCode int = 404
+
+ // use channel as a semaphore to limit it to 4
+ // parallel requests at a time
+ sem := make(chan bool, 4)
+ defer close(sem)
+ for remoteID := range h.handler.Cluster.RemoteClusters {
+ // blocks until it can put a value into the
+ // channel (which has a max queue capacity)
+ sem <- true
+ if sentResponse {
+ break
}
-
- // No successful responses, so return an error
- httpserver.Errors(w, errors, errorCode)
- return
+ search := &searchRemoteClusterForPDH{m[1], remoteID, &mtx, &sentResponse,
+ &sharedContext, cancelFunc, &errors, &errorCode}
+ wg.Add(1)
+ go func() {
+ h.handler.remoteClusterRequest(search.remoteID, w, req, search.filterRemoteClusterResponse)
+ wg.Done()
+ <-sem
+ }()
}
+ wg.Wait()
- m = collectionRe.FindStringSubmatch(req.URL.Path)
- if len(m) < 2 || m[1] == h.handler.Cluster.ClusterID {
- h.next.ServeHTTP(w, req)
+ if sentResponse {
return
}
- h.handler.remoteClusterRequest(m[1], w, req,
- rewriteSignaturesClusterId(m[1]).rewriteSignatures)
+
+ // No successful responses, so return the error
+ httpserver.Errors(w, errors, errorCode)
}
func (h *Handler) setupProxyRemoteCluster(next http.Handler) http.Handler {
mux.Handle("/arvados/v1/collections/", &collectionFederatedRequestHandler{next, h})
mux.Handle("/", next)
+ return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ parts := strings.Split(req.Header.Get("Authorization"), "/")
+ alreadySalted := (len(parts) == 3 && parts[0] == "Bearer v2" && len(parts[2]) == 40)
+
+ if alreadySalted ||
+ strings.Index(req.Header.Get("Via"), "arvados-controller") != -1 {
+ // The token is already salted, or this is a
+ // request from another instance of
+ // arvados-controller. In either case, we
+ // don't want to proxy this query, so just
+ // continue down the instance handler stack.
+ next.ServeHTTP(w, req)
+ return
+ }
+
+ mux.ServeHTTP(w, req)
+ })
+
return mux
}