"fmt"
"io"
"io/ioutil"
- "log"
"net/http"
"net/url"
"regexp"
defer c.mtx.Unlock()
if err == nil {
- c.responses = append(c.responses, loadInto["items"].([]interface{})...)
+ if resp.StatusCode != http.StatusOK {
+ c.errors = append(c.errors, fmt.Errorf("error %v", loadInto["errors"]))
+ } else {
+ c.responses = append(c.responses, loadInto["items"].([]interface{})...)
+ }
} else {
c.errors = append(c.errors, err)
}
wg.Done()
<-sem
}()
- remoteReq := *req
+ var remoteReq http.Request
+ remoteReq.Header = req.Header
remoteReq.Method = "POST"
- remoteReq.URL = &url.URL{
- Path: req.URL.Path,
- RawPath: req.URL.RawPath,
- }
+ remoteReq.URL = &url.URL{Path: req.URL.Path}
remoteParams := make(url.Values)
remoteParams["_method"] = []string{"GET"}
content, err := json.Marshal(v)
if err != nil {
rc.mtx.Lock()
+ defer rc.mtx.Unlock()
rc.errors = append(rc.errors, err)
- rc.mtx.Unlock()
return
}
remoteParams["filters"] = []string{fmt.Sprintf(`[["uuid", "in", %s]]`, content)}
remoteReq.Body = ioutil.NopCloser(bytes.NewBufferString(enc))
if k == h.handler.Cluster.ClusterID {
- h.handler.proxy.Do(w, &remoteReq, remoteReq.URL,
- h.handler.secureClient, rc.collectResponse)
+ h.handler.localClusterRequest(w, &remoteReq,
+ rc.collectResponse)
} else {
h.handler.remoteClusterRequest(k, w, &remoteReq,
rc.collectResponse)
for _, e := range rc.errors {
strerr = append(strerr, e.Error())
}
- httpserver.Errors(w, strerr, http.StatusBadRequest)
+ httpserver.Errors(w, strerr, http.StatusBadGateway)
} else {
- log.Printf("Sending status ok %+v", rc)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
itemList := make(map[string]interface{})
itemList["items"] = rc.responses
- //x, _ := json.Marshal(itemList)
- //log.Printf("Sending response %v", string(x))
json.NewEncoder(w).Encode(itemList)
- log.Printf("Sent?")
}
return true
return
}
}
- log.Printf("Clusterid is %q", clusterId)
+ //log.Printf("Clusterid is %q", clusterId)
if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
h.next.ServeHTTP(w, req)
return resp, nil
}
-type searchLocalClusterForPDH struct {
- sentResponse bool
-}
-
-func (s *searchLocalClusterForPDH) filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+func filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
if requestError != nil {
return resp, requestError
}
if resp.StatusCode == 404 {
// Suppress returning this result, because we want to
// search the federation.
- s.sentResponse = false
return nil, nil
}
- s.sentResponse = true
return resp, nil
}
// Request for collection by PDH. Search the federation.
// First, query the local cluster.
- urlOut, insecure, err := findRailsAPI(h.handler.Cluster, h.handler.NodeProfile)
- if err != nil {
- httpserver.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
-
- urlOut = &url.URL{
- Scheme: urlOut.Scheme,
- Host: urlOut.Host,
- Path: req.URL.Path,
- RawPath: req.URL.RawPath,
- RawQuery: req.URL.RawQuery,
- }
- client := h.handler.secureClient
- if insecure {
- client = h.handler.insecureClient
- }
- sf := &searchLocalClusterForPDH{}
- h.handler.proxy.Do(w, req, urlOut, client, sf.filterLocalClusterResponse)
- if sf.sentResponse {
+ if h.handler.localClusterRequest(w, req, filterLocalClusterResponse) {
return
}
"encoding/json"
"fmt"
"io/ioutil"
- "log"
"net/http"
"net/http/httptest"
"net/url"
c.Check(jresp.Errors[0], check.Matches, re)
}
-func (s *FederationSuite) localServiceReturns404(c *check.C) *httpserver.Server {
+func (s *FederationSuite) localServiceHandler(c *check.C, h http.Handler) *httpserver.Server {
srv := &httpserver.Server{
Server: http.Server{
- Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
- w.WriteHeader(404)
- }),
+ Handler: h,
},
}
return srv
}
+func (s *FederationSuite) localServiceReturns404(c *check.C) *httpserver.Server {
+ return s.localServiceHandler(c, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ w.WriteHeader(404)
+ }))
+}
+
func (s *FederationSuite) TestGetLocalCollection(c *check.C) {
np := arvados.NodeProfile{
Controller: arvados.SystemServiceInstance{Listen: ":"},
}
func (s *FederationSuite) TestListMultiRemoteContainers(c *check.C) {
- defer s.localServiceReturns404(c).Close()
+ defer s.localServiceHandler(c, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ bd, _ := ioutil.ReadAll(req.Body)
+ c.Check(string(bd), check.Equals, `_method=GET&filters=%5B%5B%22uuid%22%2C+%22in%22%2C+%5B%22zhome-xvhdp-cr5queuedcontnr%22%5D%5D%5D`)
+ w.WriteHeader(200)
+ w.Write([]byte(`{"items": [{"uuid": "zhome-xvhdp-cr5queuedcontnr"}]}`))
+ })).Close()
req := httptest.NewRequest("GET", "/arvados/v1/containers?filters="+
url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v", "zhome-xvhdp-cr5queuedcontnr"]]]`, arvadostest.QueuedContainerUUID)), nil)
req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
resp := s.testRequest(req)
- log.Printf("got %+v", resp)
- c.Assert(resp.StatusCode, check.Equals, http.StatusOK)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
var cn arvados.ContainerList
c.Check(json.NewDecoder(resp.Body).Decode(&cn), check.IsNil)
- c.Check(cn.Items[0].UUID, check.Equals, arvadostest.QueuedContainerUUID)
+ if cn.Items[0].UUID == arvadostest.QueuedContainerUUID {
+ c.Check(cn.Items[1].UUID, check.Equals, "zhome-xvhdp-cr5queuedcontnr")
+ } else {
+ c.Check(cn.Items[1].UUID, check.Equals, arvadostest.QueuedContainerUUID)
+ c.Check(cn.Items[0].UUID, check.Equals, "zhome-xvhdp-cr5queuedcontnr")
+ }
}
})
}
-func (h *Handler) proxyRailsAPI(w http.ResponseWriter, req *http.Request, next http.Handler) {
+// localClusterRequest sets up a request so it can be proxied to the
+// local API server using proxy.Do(). Returns true if a response was
+// written, false if not.
+func (h *Handler) localClusterRequest(w http.ResponseWriter, req *http.Request, filter ResponseFilter) bool {
urlOut, insecure, err := findRailsAPI(h.Cluster, h.NodeProfile)
if err != nil {
httpserver.Error(w, err.Error(), http.StatusInternalServerError)
- return
+ return true
}
urlOut = &url.URL{
Scheme: urlOut.Scheme,
if insecure {
client = h.insecureClient
}
- h.proxy.Do(w, req, urlOut, client, nil)
+ return h.proxy.Do(w, req, urlOut, client, filter)
+}
+
+func (h *Handler) proxyRailsAPI(w http.ResponseWriter, req *http.Request, next http.Handler) {
+ if !h.localClusterRequest(w, req, nil) && next != nil {
+ next.ServeHTTP(w, req)
+ }
}
// For now, findRailsAPI always uses the rails API running on this
type ResponseFilter func(*http.Response, error) (*http.Response, error)
+// Do sends a request, passes the result to the filter (if provided)
+// and then if the result is not suppressed by the filter, sends the
+// request to the ResponseWriter. Returns true if a response was written,
+// false if not.
func (p *proxy) Do(w http.ResponseWriter,
reqIn *http.Request,
urlOut *url.URL,
client *http.Client,
- filter ResponseFilter) {
+ filter ResponseFilter) bool {
// Copy headers from incoming request, then add/replace proxy
// headers like Via and X-Forwarded-For.
resp, err := client.Do(reqOut)
if filter == nil && err != nil {
httpserver.Error(w, err.Error(), http.StatusBadGateway)
- return
+ return true
}
// make sure original response body gets closed
if err != nil {
httpserver.Error(w, err.Error(), http.StatusBadGateway)
- return
+ return true
}
if resp == nil {
// filter() returned a nil response, this means suppress
// writing a response, for the case where there might
// be multiple response writers.
- return
+ return false
}
// the filter gave us a new response body, make sure that gets closed too.
if err != nil {
httpserver.Logger(reqIn).WithError(err).WithField("bytesCopied", n).Error("error copying response body")
}
+ return true
}