1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.curoverse.com/arvados.git/sdk/go/httpserver"
21 type federatedRequestDelegate func(
22 h *genericFederatedRequestHandler,
23 effectiveMethod string,
27 w http.ResponseWriter,
28 req *http.Request) bool
30 type genericFederatedRequestHandler struct {
33 matcher *regexp.Regexp
34 delegates []federatedRequestDelegate
37 func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
39 clusterID string, uuids []string) (rp []map[string]interface{}, kind string, err error) {
41 found := make(map[string]bool)
42 prev_len_uuids := len(uuids) + 1
44 // (1) there are more uuids to query
45 // (2) we're making progress - on each iteration the set of
46 // uuids we are expecting for must shrink.
47 for len(uuids) > 0 && len(uuids) < prev_len_uuids {
48 var remoteReq http.Request
49 remoteReq.Header = req.Header
50 remoteReq.Method = "POST"
51 remoteReq.URL = &url.URL{Path: req.URL.Path}
52 remoteParams := make(url.Values)
53 remoteParams.Set("_method", "GET")
54 remoteParams.Set("count", "none")
55 if req.Form.Get("select") != "" {
56 remoteParams.Set("select", req.Form.Get("select"))
58 content, err := json.Marshal(uuids)
62 remoteParams["filters"] = []string{fmt.Sprintf(`[["uuid", "in", %s]]`, content)}
63 enc := remoteParams.Encode()
64 remoteReq.Body = ioutil.NopCloser(bytes.NewBufferString(enc))
66 rc := multiClusterQueryResponseCollector{clusterID: clusterID}
68 var resp *http.Response
69 var cancel context.CancelFunc
70 if clusterID == h.handler.Cluster.ClusterID {
71 resp, cancel, err = h.handler.localClusterRequest(&remoteReq)
73 resp, cancel, err = h.handler.remoteClusterRequest(clusterID, &remoteReq)
75 rc.collectResponse(resp, err)
81 return nil, "", rc.error
86 if len(rc.responses) == 0 {
87 // We got zero responses, no point in doing
92 rp = append(rp, rc.responses...)
94 // Go through the responses and determine what was
95 // returned. If there are remaining items, loop
96 // around and do another request with just the
98 for _, i := range rc.responses {
99 uuid, ok := i["uuid"].(string)
106 for _, u := range uuids {
111 prev_len_uuids = len(uuids)
118 func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter,
119 req *http.Request, clusterId *string) bool {
121 var filters [][]interface{}
122 err := json.Unmarshal([]byte(req.Form.Get("filters")), &filters)
124 httpserver.Error(w, err.Error(), http.StatusBadRequest)
128 // Split the list of uuids by prefix
129 queryClusters := make(map[string][]string)
131 for _, filter := range filters {
132 if len(filter) != 3 {
136 if lhs, ok := filter[0].(string); !ok || lhs != "uuid" {
140 op, ok := filter[1].(string)
146 if rhs, ok := filter[2].([]interface{}); ok {
147 for _, i := range rhs {
148 if u, ok := i.(string); ok {
150 queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
155 } else if op == "=" {
156 if u, ok := filter[2].(string); ok {
158 queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
167 if len(queryClusters) <= 1 {
168 // Query does not search for uuids across multiple
174 count := req.Form.Get("count")
175 if count != "" && count != `none` && count != `"none"` {
176 httpserver.Error(w, "Federated multi-object query must have 'count=none'", http.StatusBadRequest)
179 if req.Form.Get("limit") != "" || req.Form.Get("offset") != "" || req.Form.Get("order") != "" {
180 httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
183 if expectCount > h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse() {
184 httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
185 expectCount, h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse()), http.StatusBadRequest)
188 if req.Form.Get("select") != "" {
191 err := json.Unmarshal([]byte(req.Form.Get("select")), &selects)
193 httpserver.Error(w, err.Error(), http.StatusBadRequest)
197 for _, r := range selects {
204 httpserver.Error(w, "Federated multi-object request must include 'uuid' in 'select'", http.StatusBadRequest)
209 // Perform concurrent requests to each cluster
211 // use channel as a semaphore to limit the number of concurrent
212 // requests at a time
213 sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
215 wg := sync.WaitGroup{}
217 req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
220 var completeResponses []map[string]interface{}
223 for k, v := range queryClusters {
229 // blocks until it can put a value into the
230 // channel (which has a max queue capacity)
233 go func(k string, v []string) {
234 rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
237 completeResponses = append(completeResponses, rp...)
240 errors = append(errors, err)
251 for _, e := range errors {
252 strerr = append(strerr, e.Error())
254 httpserver.Errors(w, strerr, http.StatusBadGateway)
258 w.Header().Set("Content-Type", "application/json")
259 w.WriteHeader(http.StatusOK)
260 itemList := make(map[string]interface{})
261 itemList["items"] = completeResponses
262 itemList["kind"] = kind
263 json.NewEncoder(w).Encode(itemList)
268 func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
269 m := h.matcher.FindStringSubmatch(req.URL.Path)
272 if len(m) > 0 && m[2] != "" {
276 // Get form parameters from URL and form body (if POST).
277 if err := loadParamsFromForm(req); err != nil {
278 httpserver.Error(w, err.Error(), http.StatusBadRequest)
282 // Check if the parameters have an explicit cluster_id
283 if req.Form.Get("cluster_id") != "" {
284 clusterId = req.Form.Get("cluster_id")
287 // Handle the POST-as-GET special case (workaround for large
288 // GET requests that potentially exceed maximum URL length,
289 // like multi-object queries where the filter has 100s of
291 effectiveMethod := req.Method
292 if req.Method == "POST" && req.Form.Get("_method") != "" {
293 effectiveMethod = req.Form.Get("_method")
296 if effectiveMethod == "GET" &&
298 req.Form.Get("filters") != "" &&
299 h.handleMultiClusterQuery(w, req, &clusterId) {
303 for _, d := range h.delegates {
304 if d(h, effectiveMethod, &clusterId, m[1], m[3], w, req) {
309 if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
310 h.next.ServeHTTP(w, req)
312 resp, cancel, err := h.handler.remoteClusterRequest(clusterId, req)
316 h.handler.proxy.ForwardResponse(w, resp, err)
320 type multiClusterQueryResponseCollector struct {
321 responses []map[string]interface{}
327 func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response,
328 requestError error) (newResponse *http.Response, err error) {
329 if requestError != nil {
330 c.error = requestError
334 defer resp.Body.Close()
335 var loadInto struct {
336 Kind string `json:"kind"`
337 Items []map[string]interface{} `json:"items"`
338 Errors []string `json:"errors"`
340 err = json.NewDecoder(resp.Body).Decode(&loadInto)
343 c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, err)
346 if resp.StatusCode != http.StatusOK {
347 c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto.Errors)
351 c.responses = loadInto.Items
352 c.kind = loadInto.Kind