-
- // We have a successful response. Suppress/cancel all the
- // other requests/responses.
- *s.sentResponse = true
- s.cancelFunc()
-
- return newResponse, nil
-}
-
-func (h *collectionFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
- m := collectionByPDHRe.FindStringSubmatch(req.URL.Path)
- if len(m) == 2 && len(h.handler.Cluster.RemoteClusters) > 0 {
- bearer := req.Header.Get("Authorization")
- if strings.HasPrefix(bearer, "Bearer v2/") &&
- len(bearer) > 10 &&
- bearer[10:15] != h.handler.Cluster.ClusterID {
- // Salted token from another cluster, just
- // fall back to query local cluster only.
- h.next.ServeHTTP(w, req)
- return
- }
-
- urlOut, insecure, err := findRailsAPI(h.handler.Cluster, h.handler.NodeProfile)
- if err != nil {
- httpserver.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
-
- urlOut = &url.URL{
- Scheme: urlOut.Scheme,
- Host: urlOut.Host,
- Path: req.URL.Path,
- RawPath: req.URL.RawPath,
- RawQuery: req.URL.RawQuery,
- }
- client := h.handler.secureClient
- if insecure {
- client = h.handler.insecureClient
- }
- sf := &searchLocalClusterForPDH{}
- h.handler.proxy.Do(w, req, urlOut, client, sf.filterLocalClusterResponse)
- if sf.sentResponse {
- // a response was sent, nothing more to do
- return
- }
-
- sharedContext, cancelFunc := context.WithCancel(req.Context())
- defer cancelFunc()
- req = req.WithContext(sharedContext)
-
- // Create a goroutine that will contact each cluster
- // in the RemoteClusters map. The first one to return
- // a valid result gets returned to the client. When
- // that happens, all other outstanding requests are
- // cancelled or suppressed.
- sentResponse := false
- mtx := sync.Mutex{}
- wg := sync.WaitGroup{}
- var errors []string
- var errorCode int = 0
- for remoteID := range h.handler.Cluster.RemoteClusters {
- search := &searchRemoteClusterForPDH{remoteID, &mtx, &sentResponse,
- &sharedContext, cancelFunc, &errors, &errorCode}
- wg.Add(1)
- go func() {
- h.handler.remoteClusterRequest(search.remoteID, w, req, search.filterRemoteClusterResponse)
- wg.Done()
- }()
- }
- wg.Wait()
- if sentResponse {
- return
- }
-
- if errorCode == 0 {
- errorCode = http.StatusBadGateway
- }
-
- // No successful responses, so return an error
- httpserver.Errors(w, errors, errorCode)
- return