1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.curoverse.com/arvados.git/sdk/go/httpserver"
20 type federatedRequestDelegate func(
21 h *genericFederatedRequestHandler,
22 effectiveMethod string,
26 w http.ResponseWriter,
27 req *http.Request) bool
29 type genericFederatedRequestHandler struct {
32 matcher *regexp.Regexp
33 delegates []federatedRequestDelegate
36 func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
38 clusterID string, uuids []string) (rp []map[string]interface{}, kind string, err error) {
40 found := make(map[string]bool)
41 prev_len_uuids := len(uuids) + 1
43 // (1) there are more uuids to query
44 // (2) we're making progress - on each iteration the set of
45 // uuids we are expecting for must shrink.
46 for len(uuids) > 0 && len(uuids) < prev_len_uuids {
47 var remoteReq http.Request
48 remoteReq.Header = req.Header
49 remoteReq.Method = "POST"
50 remoteReq.URL = &url.URL{Path: req.URL.Path}
51 remoteParams := make(url.Values)
52 remoteParams.Set("_method", "GET")
53 remoteParams.Set("count", "none")
54 if req.Form.Get("select") != "" {
55 remoteParams.Set("select", req.Form.Get("select"))
57 content, err := json.Marshal(uuids)
61 remoteParams["filters"] = []string{fmt.Sprintf(`[["uuid", "in", %s]]`, content)}
62 enc := remoteParams.Encode()
63 remoteReq.Body = ioutil.NopCloser(bytes.NewBufferString(enc))
65 rc := multiClusterQueryResponseCollector{clusterID: clusterID}
67 var resp *http.Response
68 if clusterID == h.handler.Cluster.ClusterID {
69 resp, err = h.handler.localClusterRequest(&remoteReq)
71 resp, err = h.handler.remoteClusterRequest(clusterID, &remoteReq)
73 rc.collectResponse(resp, err)
76 return nil, "", rc.error
81 if len(rc.responses) == 0 {
82 // We got zero responses, no point in doing
87 rp = append(rp, rc.responses...)
89 // Go through the responses and determine what was
90 // returned. If there are remaining items, loop
91 // around and do another request with just the
93 for _, i := range rc.responses {
94 uuid, ok := i["uuid"].(string)
101 for _, u := range uuids {
106 prev_len_uuids = len(uuids)
113 func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter,
114 req *http.Request, clusterId *string) bool {
116 var filters [][]interface{}
117 err := json.Unmarshal([]byte(req.Form.Get("filters")), &filters)
119 httpserver.Error(w, err.Error(), http.StatusBadRequest)
123 // Split the list of uuids by prefix
124 queryClusters := make(map[string][]string)
126 for _, filter := range filters {
127 if len(filter) != 3 {
131 if lhs, ok := filter[0].(string); !ok || lhs != "uuid" {
135 op, ok := filter[1].(string)
141 if rhs, ok := filter[2].([]interface{}); ok {
142 for _, i := range rhs {
143 if u, ok := i.(string); ok && len(u) == 27 {
145 queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
150 } else if op == "=" {
151 if u, ok := filter[2].(string); ok && len(u) == 27 {
153 queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
162 if len(queryClusters) <= 1 {
163 // Query does not search for uuids across multiple
169 count := req.Form.Get("count")
170 if count != "" && count != `none` && count != `"none"` {
171 httpserver.Error(w, "Federated multi-object query must have 'count=none'", http.StatusBadRequest)
174 if req.Form.Get("limit") != "" || req.Form.Get("offset") != "" || req.Form.Get("order") != "" {
175 httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
178 if expectCount > h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse() {
179 httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
180 expectCount, h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse()), http.StatusBadRequest)
183 if req.Form.Get("select") != "" {
186 err := json.Unmarshal([]byte(req.Form.Get("select")), &selects)
188 httpserver.Error(w, err.Error(), http.StatusBadRequest)
192 for _, r := range selects {
199 httpserver.Error(w, "Federated multi-object request must include 'uuid' in 'select'", http.StatusBadRequest)
204 // Perform concurrent requests to each cluster
206 // use channel as a semaphore to limit the number of concurrent
207 // requests at a time
208 sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
210 wg := sync.WaitGroup{}
212 req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
215 var completeResponses []map[string]interface{}
218 for k, v := range queryClusters {
224 // blocks until it can put a value into the
225 // channel (which has a max queue capacity)
228 go func(k string, v []string) {
229 rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
232 completeResponses = append(completeResponses, rp...)
235 errors = append(errors, err)
246 for _, e := range errors {
247 strerr = append(strerr, e.Error())
249 httpserver.Errors(w, strerr, http.StatusBadGateway)
253 w.Header().Set("Content-Type", "application/json")
254 w.WriteHeader(http.StatusOK)
255 itemList := make(map[string]interface{})
256 itemList["items"] = completeResponses
257 itemList["kind"] = kind
258 json.NewEncoder(w).Encode(itemList)
263 func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
264 m := h.matcher.FindStringSubmatch(req.URL.Path)
267 if len(m) > 0 && m[2] != "" {
271 // Get form parameters from URL and form body (if POST).
272 if err := loadParamsFromForm(req); err != nil {
273 httpserver.Error(w, err.Error(), http.StatusBadRequest)
277 // Check if the parameters have an explicit cluster_id
278 if req.Form.Get("cluster_id") != "" {
279 clusterId = req.Form.Get("cluster_id")
282 // Handle the POST-as-GET special case (workaround for large
283 // GET requests that potentially exceed maximum URL length,
284 // like multi-object queries where the filter has 100s of
286 effectiveMethod := req.Method
287 if req.Method == "POST" && req.Form.Get("_method") != "" {
288 effectiveMethod = req.Form.Get("_method")
291 if effectiveMethod == "GET" &&
293 req.Form.Get("filters") != "" &&
294 h.handleMultiClusterQuery(w, req, &clusterId) {
300 // trim leading slash
303 for _, d := range h.delegates {
304 if d(h, effectiveMethod, &clusterId, uuid, m[3], w, req) {
309 if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
310 h.next.ServeHTTP(w, req)
312 resp, err := h.handler.remoteClusterRequest(clusterId, req)
313 h.handler.proxy.ForwardResponse(w, resp, err)
317 type multiClusterQueryResponseCollector struct {
318 responses []map[string]interface{}
324 func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response,
325 requestError error) (newResponse *http.Response, err error) {
326 if requestError != nil {
327 c.error = requestError
331 defer resp.Body.Close()
332 var loadInto struct {
333 Kind string `json:"kind"`
334 Items []map[string]interface{} `json:"items"`
335 Errors []string `json:"errors"`
337 err = json.NewDecoder(resp.Body).Decode(&loadInto)
340 c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, err)
343 if resp.StatusCode != http.StatusOK {
344 c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto.Errors)
348 c.responses = loadInto.Items
349 c.kind = loadInto.Kind