1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.arvados.org/arvados.git/lib/service"
19 "git.arvados.org/arvados.git/sdk/go/arvados"
20 "git.arvados.org/arvados.git/sdk/go/auth"
21 "git.arvados.org/arvados.git/sdk/go/httpserver"
22 "git.arvados.org/arvados.git/sdk/go/keepclient"
23 "github.com/gorilla/mux"
33 func newRouter(keepstore *keepstore, puller *puller, trasher *trasher) service.Handler {
39 adminonly := func(h http.HandlerFunc) http.HandlerFunc {
40 return auth.RequireLiteralToken(keepstore.cluster.SystemRootToken, h).ServeHTTP
44 // Without SkipClean(true), the gorilla/mux package responds
45 // to "PUT //foo" with a 301 redirect to "/foo", which causes
46 // a (Fetch Standard compliant) client to repeat the request
47 // as "GET /foo", which is clearly inappropriate in this case.
48 // It's less confusing if we just return 400.
50 locatorPath := `/{locator:[0-9a-f]{32}.*}`
51 get := r.Methods(http.MethodGet, http.MethodHead).Subrouter()
52 get.HandleFunc(locatorPath, rtr.handleBlockRead)
53 get.HandleFunc("/"+locatorPath, rtr.handleBlockRead) // for compatibility -- see TestBlockRead_DoubleSlash
54 get.HandleFunc(`/index`, adminonly(rtr.handleIndex))
55 get.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, adminonly(rtr.handleIndex))
56 get.HandleFunc(`/mounts`, adminonly(rtr.handleMounts))
57 get.HandleFunc(`/mounts/{uuid}/blocks`, adminonly(rtr.handleIndex))
58 get.HandleFunc(`/mounts/{uuid}/blocks/{prefix:[0-9a-f]{0,32}}`, adminonly(rtr.handleIndex))
59 put := r.Methods(http.MethodPut).Subrouter()
60 put.HandleFunc(locatorPath, rtr.handleBlockWrite)
61 put.HandleFunc(`/pull`, adminonly(rtr.handlePullList))
62 put.HandleFunc(`/trash`, adminonly(rtr.handleTrashList))
63 put.HandleFunc(`/untrash`+locatorPath, adminonly(rtr.handleUntrash))
64 touch := r.Methods("TOUCH").Subrouter()
65 touch.HandleFunc(locatorPath, adminonly(rtr.handleBlockTouch))
66 delete := r.Methods(http.MethodDelete).Subrouter()
67 delete.HandleFunc(locatorPath, adminonly(rtr.handleBlockTrash))
68 options := r.Methods(http.MethodOptions).Subrouter()
69 options.NewRoute().PathPrefix(`/`).HandlerFunc(rtr.handleOptions)
70 r.NotFoundHandler = http.HandlerFunc(rtr.handleBadRequest)
71 r.MethodNotAllowedHandler = http.HandlerFunc(rtr.handleBadRequest)
72 rtr.Handler = corsHandler(auth.LoadToken(r))
76 func (rtr *router) CheckHealth() error {
80 func (rtr *router) Done() <-chan struct{} {
84 func (rtr *router) handleBlockRead(w http.ResponseWriter, req *http.Request) {
85 // Intervening proxies must not return a cached GET response
86 // to a prior request if a X-Keep-Signature request header has
87 // been added or changed.
88 w.Header().Add("Vary", keepclient.XKeepSignature)
89 var localLocator func(string)
90 if strings.SplitN(req.Header.Get(keepclient.XKeepSignature), ",", 2)[0] == "local" {
91 localLocator = func(locator string) {
92 w.Header().Set(keepclient.XKeepLocator, locator)
96 if req.Method == http.MethodHead {
97 out = discardWrite{ResponseWriter: w}
98 } else if li, err := getLocatorInfo(mux.Vars(req)["locator"]); err != nil {
99 rtr.handleError(w, req, err)
101 } else if li.size == 0 && li.hash != "d41d8cd98f00b204e9800998ecf8427e" {
102 // GET {hash} (with no size hint) is not allowed
103 // because we can't report md5 mismatches.
104 rtr.handleError(w, req, errMethodNotAllowed)
107 n, err := rtr.keepstore.BlockRead(req.Context(), arvados.BlockReadOptions{
108 Locator: mux.Vars(req)["locator"],
110 LocalLocator: localLocator,
112 if err != nil && (n == 0 || req.Method == http.MethodHead) {
113 rtr.handleError(w, req, err)
118 func (rtr *router) handleBlockWrite(w http.ResponseWriter, req *http.Request) {
119 dataSize, _ := strconv.Atoi(req.Header.Get("Content-Length"))
120 replicas, _ := strconv.Atoi(req.Header.Get(keepclient.XKeepDesiredReplicas))
121 resp, err := rtr.keepstore.BlockWrite(req.Context(), arvados.BlockWriteOptions{
122 Hash: mux.Vars(req)["locator"],
125 RequestID: req.Header.Get("X-Request-Id"),
126 StorageClasses: trimSplit(req.Header.Get(keepclient.XKeepStorageClasses), ","),
130 rtr.handleError(w, req, err)
133 w.Header().Set(keepclient.XKeepReplicasStored, fmt.Sprintf("%d", resp.Replicas))
135 for k, n := range resp.StorageClasses {
140 scc += fmt.Sprintf("%s=%d", k, n)
143 w.Header().Set(keepclient.XKeepStorageClassesConfirmed, scc)
144 w.WriteHeader(http.StatusOK)
145 fmt.Fprintln(w, resp.Locator)
148 func (rtr *router) handleBlockTouch(w http.ResponseWriter, req *http.Request) {
149 err := rtr.keepstore.BlockTouch(req.Context(), mux.Vars(req)["locator"])
150 rtr.handleError(w, req, err)
153 func (rtr *router) handleBlockTrash(w http.ResponseWriter, req *http.Request) {
154 err := rtr.keepstore.BlockTrash(req.Context(), mux.Vars(req)["locator"])
155 rtr.handleError(w, req, err)
158 func (rtr *router) handleMounts(w http.ResponseWriter, req *http.Request) {
159 json.NewEncoder(w).Encode(rtr.keepstore.Mounts())
162 func (rtr *router) handleIndex(w http.ResponseWriter, req *http.Request) {
163 prefix := req.FormValue("prefix")
165 prefix = mux.Vars(req)["prefix"]
167 cw := &countingWriter{writer: w}
168 err := rtr.keepstore.Index(req.Context(), indexOptions{
169 MountUUID: mux.Vars(req)["uuid"],
173 if err != nil && cw.n.Load() == 0 {
174 // Nothing was written, so it's not too late to report
175 // an error via http response header. (Otherwise, all
176 // we can do is omit the trailing newline below to
177 // indicate something went wrong.)
178 rtr.handleError(w, req, err)
182 // A trailing blank line signals to the caller that
183 // the response is complete.
184 w.Write([]byte("\n"))
188 func (rtr *router) handlePullList(w http.ResponseWriter, req *http.Request) {
189 var pl []PullListItem
190 err := json.NewDecoder(req.Body).Decode(&pl)
192 rtr.handleError(w, req, err)
196 if len(pl) > 0 && len(pl[0].Locator) == 32 {
197 rtr.handleError(w, req, httpserver.ErrorWithStatus(errors.New("rejecting pull list containing a locator without a size hint -- this probably means keep-balance needs to be upgraded"), http.StatusBadRequest))
200 rtr.puller.SetPullList(pl)
203 func (rtr *router) handleTrashList(w http.ResponseWriter, req *http.Request) {
204 var tl []TrashListItem
205 err := json.NewDecoder(req.Body).Decode(&tl)
207 rtr.handleError(w, req, err)
211 rtr.trasher.SetTrashList(tl)
214 func (rtr *router) handleUntrash(w http.ResponseWriter, req *http.Request) {
215 err := rtr.keepstore.BlockUntrash(req.Context(), mux.Vars(req)["locator"])
216 rtr.handleError(w, req, err)
219 func (rtr *router) handleBadRequest(w http.ResponseWriter, req *http.Request) {
220 http.Error(w, "Bad Request", http.StatusBadRequest)
223 func (rtr *router) handleOptions(w http.ResponseWriter, req *http.Request) {
226 func (rtr *router) handleError(w http.ResponseWriter, req *http.Request, err error) {
227 if req.Context().Err() != nil {
233 } else if os.IsNotExist(err) {
234 w.WriteHeader(http.StatusNotFound)
235 } else if statusErr := interface{ HTTPStatus() int }(nil); errors.As(err, &statusErr) {
236 w.WriteHeader(statusErr.HTTPStatus())
238 w.WriteHeader(http.StatusInternalServerError)
240 fmt.Fprintln(w, err.Error())
243 type countingWriter struct {
248 func (cw *countingWriter) Write(p []byte) (int, error) {
249 n, err := cw.writer.Write(p)
254 // Split s by sep, trim whitespace from each part, and drop empty
256 func trimSplit(s, sep string) []string {
258 for _, part := range strings.Split(s, sep) {
259 part = strings.TrimSpace(part)
267 // setSizeOnWrite sets the Content-Length header to the given size on
269 type setSizeOnWrite struct {
275 func (ss *setSizeOnWrite) Write(p []byte) (int, error) {
277 ss.Header().Set("Content-Length", fmt.Sprintf("%d", ss.size))
280 return ss.ResponseWriter.Write(p)
283 type discardWrite struct {
287 func (discardWrite) Write(p []byte) (int, error) {
291 func corsHandler(h http.Handler) http.Handler {
292 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
298 var corsHeaders = map[string]string{
299 "Access-Control-Allow-Methods": "GET, HEAD, PUT, OPTIONS",
300 "Access-Control-Allow-Origin": "*",
301 "Access-Control-Allow-Headers": "Authorization, Content-Length, Content-Type, " + keepclient.XKeepDesiredReplicas + ", " + keepclient.XKeepSignature + ", " + keepclient.XKeepStorageClasses,
302 "Access-Control-Expose-Headers": keepclient.XKeepLocator + ", " + keepclient.XKeepReplicasStored + ", " + keepclient.XKeepStorageClassesConfirmed,
303 "Access-Control-Max-Age": "86486400",
306 func SetCORSHeaders(w http.ResponseWriter) {
307 for k, v := range corsHeaders {