1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.curoverse.com/arvados.git/sdk/go/httpserver"
20 type federatedRequestDelegate func(
21 h *genericFederatedRequestHandler,
22 effectiveMethod string,
26 w http.ResponseWriter,
27 req *http.Request) bool
29 type genericFederatedRequestHandler struct {
32 matcher *regexp.Regexp
33 delegates []federatedRequestDelegate
36 func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
38 clusterID string, uuids []string) (rp []map[string]interface{}, kind string, err error) {
40 found := make(map[string]bool)
41 prev_len_uuids := len(uuids) + 1
43 // (1) there are more uuids to query
44 // (2) we're making progress - on each iteration the set of
45 // uuids we are expecting for must shrink.
46 for len(uuids) > 0 && len(uuids) < prev_len_uuids {
47 var remoteReq http.Request
48 remoteReq.Header = req.Header
49 remoteReq.Method = "POST"
50 remoteReq.URL = &url.URL{Path: req.URL.Path}
51 remoteParams := make(url.Values)
52 remoteParams.Set("_method", "GET")
53 remoteParams.Set("count", "none")
54 if req.Form.Get("select") != "" {
55 remoteParams.Set("select", req.Form.Get("select"))
57 content, err := json.Marshal(uuids)
61 remoteParams["filters"] = []string{fmt.Sprintf(`[["uuid", "in", %s]]`, content)}
62 enc := remoteParams.Encode()
63 remoteReq.Body = ioutil.NopCloser(bytes.NewBufferString(enc))
65 rc := multiClusterQueryResponseCollector{clusterID: clusterID}
67 var resp *http.Response
68 if clusterID == h.handler.Cluster.ClusterID {
69 resp, err = h.handler.localClusterRequest(&remoteReq)
71 resp, err = h.handler.remoteClusterRequest(clusterID, &remoteReq)
73 rc.collectResponse(resp, err)
76 return nil, "", rc.error
81 if len(rc.responses) == 0 {
82 // We got zero responses, no point in doing
87 rp = append(rp, rc.responses...)
89 // Go through the responses and determine what was
90 // returned. If there are remaining items, loop
91 // around and do another request with just the
93 for _, i := range rc.responses {
94 uuid, ok := i["uuid"].(string)
101 for _, u := range uuids {
106 prev_len_uuids = len(uuids)
113 func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter,
114 req *http.Request, clusterId *string) bool {
116 var filters [][]interface{}
117 err := json.Unmarshal([]byte(req.Form.Get("filters")), &filters)
119 httpserver.Error(w, err.Error(), http.StatusBadRequest)
123 // Split the list of uuids by prefix
124 queryClusters := make(map[string][]string)
126 for _, filter := range filters {
127 if len(filter) != 3 {
131 if lhs, ok := filter[0].(string); !ok || lhs != "uuid" {
135 op, ok := filter[1].(string)
141 if rhs, ok := filter[2].([]interface{}); ok {
142 for _, i := range rhs {
143 if u, ok := i.(string); ok && len(u) == 27 {
145 queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
150 } else if op == "=" {
151 if u, ok := filter[2].(string); ok && len(u) == 27 {
153 queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
162 if len(queryClusters) <= 1 {
163 // Query does not search for uuids across multiple
169 count := req.Form.Get("count")
170 if count != "" && count != `none` && count != `"none"` {
171 httpserver.Error(w, "Federated multi-object query must have 'count=none'", http.StatusBadRequest)
174 if req.Form.Get("limit") != "" || req.Form.Get("offset") != "" || req.Form.Get("order") != "" {
175 httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
178 if max := h.handler.Cluster.API.MaxItemsPerResponse; expectCount > max {
179 httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
180 expectCount, max), http.StatusBadRequest)
183 if req.Form.Get("select") != "" {
186 err := json.Unmarshal([]byte(req.Form.Get("select")), &selects)
188 httpserver.Error(w, err.Error(), http.StatusBadRequest)
192 for _, r := range selects {
199 httpserver.Error(w, "Federated multi-object request must include 'uuid' in 'select'", http.StatusBadRequest)
204 // Perform concurrent requests to each cluster
206 acquire, release := semaphore(h.handler.Cluster.API.MaxRequestAmplification)
207 wg := sync.WaitGroup{}
209 req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
212 var completeResponses []map[string]interface{}
215 for k, v := range queryClusters {
222 go func(k string, v []string) {
225 rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
229 completeResponses = append(completeResponses, rp...)
232 errors = append(errors, err)
240 for _, e := range errors {
241 strerr = append(strerr, e.Error())
243 httpserver.Errors(w, strerr, http.StatusBadGateway)
247 w.Header().Set("Content-Type", "application/json")
248 w.WriteHeader(http.StatusOK)
249 itemList := make(map[string]interface{})
250 itemList["items"] = completeResponses
251 itemList["kind"] = kind
252 json.NewEncoder(w).Encode(itemList)
257 func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
258 m := h.matcher.FindStringSubmatch(req.URL.Path)
261 if len(m) > 0 && m[2] != "" {
265 // Get form parameters from URL and form body (if POST).
266 if err := loadParamsFromForm(req); err != nil {
267 httpserver.Error(w, err.Error(), http.StatusBadRequest)
271 // Check if the parameters have an explicit cluster_id
272 if req.Form.Get("cluster_id") != "" {
273 clusterId = req.Form.Get("cluster_id")
276 // Handle the POST-as-GET special case (workaround for large
277 // GET requests that potentially exceed maximum URL length,
278 // like multi-object queries where the filter has 100s of
280 effectiveMethod := req.Method
281 if req.Method == "POST" && req.Form.Get("_method") != "" {
282 effectiveMethod = req.Form.Get("_method")
285 if effectiveMethod == "GET" &&
287 req.Form.Get("filters") != "" &&
288 h.handleMultiClusterQuery(w, req, &clusterId) {
294 // trim leading slash
297 for _, d := range h.delegates {
298 if d(h, effectiveMethod, &clusterId, uuid, m[3], w, req) {
303 if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
304 h.next.ServeHTTP(w, req)
306 resp, err := h.handler.remoteClusterRequest(clusterId, req)
307 h.handler.proxy.ForwardResponse(w, resp, err)
311 type multiClusterQueryResponseCollector struct {
312 responses []map[string]interface{}
318 func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response,
319 requestError error) (newResponse *http.Response, err error) {
320 if requestError != nil {
321 c.error = requestError
325 defer resp.Body.Close()
326 var loadInto struct {
327 Kind string `json:"kind"`
328 Items []map[string]interface{} `json:"items"`
329 Errors []string `json:"errors"`
331 err = json.NewDecoder(resp.Body).Decode(&loadInto)
334 c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, err)
337 if resp.StatusCode != http.StatusOK {
338 c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto.Errors)
342 c.responses = loadInto.Items
343 c.kind = loadInto.Kind