1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.arvados.org/arvados.git/lib/service"
19 "git.arvados.org/arvados.git/sdk/go/arvados"
20 "git.arvados.org/arvados.git/sdk/go/auth"
21 "git.arvados.org/arvados.git/sdk/go/httpserver"
22 "git.arvados.org/arvados.git/sdk/go/keepclient"
23 "github.com/gorilla/mux"
33 func newRouter(keepstore *keepstore, puller *puller, trasher *trasher) service.Handler {
39 adminonly := func(h http.HandlerFunc) http.HandlerFunc {
40 return auth.RequireLiteralToken(keepstore.cluster.SystemRootToken, h).ServeHTTP
44 // Without SkipClean(true), the gorilla/mux package responds
45 // to "PUT //foo" with a 301 redirect to "/foo", which causes
46 // a (Fetch Standard compliant) client to repeat the request
47 // as "GET /foo", which is clearly inappropriate in this case.
48 // It's less confusing if we just return 400.
50 locatorPath := `/{locator:[0-9a-f]{32}.*}`
51 get := r.Methods(http.MethodGet, http.MethodHead).Subrouter()
52 get.HandleFunc(locatorPath, rtr.handleBlockRead)
53 get.HandleFunc("/"+locatorPath, rtr.handleBlockRead) // for compatibility -- see TestBlockRead_DoubleSlash
54 get.HandleFunc(`/index`, adminonly(rtr.handleIndex))
55 get.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, adminonly(rtr.handleIndex))
56 get.HandleFunc(`/mounts`, adminonly(rtr.handleMounts))
57 get.HandleFunc(`/mounts/{uuid}/blocks`, adminonly(rtr.handleIndex))
58 get.HandleFunc(`/mounts/{uuid}/blocks/{prefix:[0-9a-f]{0,32}}`, adminonly(rtr.handleIndex))
59 put := r.Methods(http.MethodPut).Subrouter()
60 put.HandleFunc(locatorPath, rtr.handleBlockWrite)
61 put.HandleFunc(`/pull`, adminonly(rtr.handlePullList))
62 put.HandleFunc(`/trash`, adminonly(rtr.handleTrashList))
63 put.HandleFunc(`/untrash`+locatorPath, adminonly(rtr.handleUntrash))
64 touch := r.Methods("TOUCH").Subrouter()
65 touch.HandleFunc(locatorPath, adminonly(rtr.handleBlockTouch))
66 delete := r.Methods(http.MethodDelete).Subrouter()
67 delete.HandleFunc(locatorPath, adminonly(rtr.handleBlockTrash))
68 options := r.Methods(http.MethodOptions).Subrouter()
69 options.NewRoute().PathPrefix(`/`).HandlerFunc(rtr.handleOptions)
70 r.NotFoundHandler = http.HandlerFunc(rtr.handleBadRequest)
71 r.MethodNotAllowedHandler = http.HandlerFunc(rtr.handleBadRequest)
72 rtr.Handler = corsHandler(auth.LoadToken(r))
76 func (rtr *router) CheckHealth() error {
80 func (rtr *router) Done() <-chan struct{} {
84 func (rtr *router) handleBlockRead(w http.ResponseWriter, req *http.Request) {
85 // Intervening proxies must not return a cached GET response
86 // to a prior request if a X-Keep-Signature request header has
87 // been added or changed.
88 w.Header().Add("Vary", keepclient.XKeepSignature)
89 var localLocator func(string)
90 if strings.SplitN(req.Header.Get(keepclient.XKeepSignature), ",", 2)[0] == "local" {
91 localLocator = func(locator string) {
92 w.Header().Set(keepclient.XKeepLocator, locator)
96 if req.Method == http.MethodHead {
97 out = discardWrite{ResponseWriter: w}
98 } else if li, err := getLocatorInfo(mux.Vars(req)["locator"]); err != nil {
99 rtr.handleError(w, req, err)
101 } else if li.size == 0 && li.hash != "d41d8cd98f00b204e9800998ecf8427e" {
102 // GET {hash} (with no size hint) is not allowed
103 // because we can't report md5 mismatches.
104 rtr.handleError(w, req, errMethodNotAllowed)
107 n, err := rtr.keepstore.BlockRead(req.Context(), arvados.BlockReadOptions{
108 Locator: mux.Vars(req)["locator"],
110 LocalLocator: localLocator,
112 if err != nil && (n == 0 || req.Method == http.MethodHead) {
113 rtr.handleError(w, req, err)
118 func (rtr *router) handleBlockWrite(w http.ResponseWriter, req *http.Request) {
119 dataSize, _ := strconv.Atoi(req.Header.Get("Content-Length"))
120 replicas, _ := strconv.Atoi(req.Header.Get(keepclient.XKeepDesiredReplicas))
121 resp, err := rtr.keepstore.BlockWrite(req.Context(), arvados.BlockWriteOptions{
122 Hash: mux.Vars(req)["locator"],
125 RequestID: req.Header.Get("X-Request-Id"),
126 StorageClasses: trimSplit(req.Header.Get(keepclient.XKeepStorageClasses), ","),
130 rtr.handleError(w, req, err)
133 w.Header().Set(keepclient.XKeepReplicasStored, fmt.Sprintf("%d", resp.Replicas))
135 for k, n := range resp.StorageClasses {
140 scc += fmt.Sprintf("%s=%d", k, n)
143 w.Header().Set(keepclient.XKeepStorageClassesConfirmed, scc)
144 w.WriteHeader(http.StatusOK)
145 fmt.Fprintln(w, resp.Locator)
148 func (rtr *router) handleBlockTouch(w http.ResponseWriter, req *http.Request) {
149 err := rtr.keepstore.BlockTouch(req.Context(), mux.Vars(req)["locator"])
150 rtr.handleError(w, req, err)
153 func (rtr *router) handleBlockTrash(w http.ResponseWriter, req *http.Request) {
154 err := rtr.keepstore.BlockTrash(req.Context(), mux.Vars(req)["locator"])
155 rtr.handleError(w, req, err)
158 func (rtr *router) handleMounts(w http.ResponseWriter, req *http.Request) {
159 json.NewEncoder(w).Encode(rtr.keepstore.Mounts())
162 func (rtr *router) handleIndex(w http.ResponseWriter, req *http.Request) {
163 httpserver.ExemptFromDeadline(req)
164 prefix := req.FormValue("prefix")
166 prefix = mux.Vars(req)["prefix"]
168 cw := &countingWriter{writer: w}
169 err := rtr.keepstore.Index(req.Context(), indexOptions{
170 MountUUID: mux.Vars(req)["uuid"],
174 if err != nil && cw.n.Load() == 0 {
175 // Nothing was written, so it's not too late to report
176 // an error via http response header. (Otherwise, all
177 // we can do is omit the trailing newline below to
178 // indicate something went wrong.)
179 rtr.handleError(w, req, err)
183 // A trailing blank line signals to the caller that
184 // the response is complete.
185 w.Write([]byte("\n"))
189 func (rtr *router) handlePullList(w http.ResponseWriter, req *http.Request) {
190 var pl []PullListItem
191 err := json.NewDecoder(req.Body).Decode(&pl)
193 rtr.handleError(w, req, err)
197 if len(pl) > 0 && len(pl[0].Locator) == 32 {
198 rtr.handleError(w, req, httpserver.ErrorWithStatus(errors.New("rejecting pull list containing a locator without a size hint -- this probably means keep-balance needs to be upgraded"), http.StatusBadRequest))
201 rtr.puller.SetPullList(pl)
204 func (rtr *router) handleTrashList(w http.ResponseWriter, req *http.Request) {
205 var tl []TrashListItem
206 err := json.NewDecoder(req.Body).Decode(&tl)
208 rtr.handleError(w, req, err)
212 rtr.trasher.SetTrashList(tl)
215 func (rtr *router) handleUntrash(w http.ResponseWriter, req *http.Request) {
216 err := rtr.keepstore.BlockUntrash(req.Context(), mux.Vars(req)["locator"])
217 rtr.handleError(w, req, err)
220 func (rtr *router) handleBadRequest(w http.ResponseWriter, req *http.Request) {
221 http.Error(w, "Bad Request", http.StatusBadRequest)
224 func (rtr *router) handleOptions(w http.ResponseWriter, req *http.Request) {
227 func (rtr *router) handleError(w http.ResponseWriter, req *http.Request, err error) {
228 if req.Context().Err() != nil {
234 } else if os.IsNotExist(err) {
235 w.WriteHeader(http.StatusNotFound)
236 } else if statusErr := interface{ HTTPStatus() int }(nil); errors.As(err, &statusErr) {
237 w.WriteHeader(statusErr.HTTPStatus())
239 w.WriteHeader(http.StatusInternalServerError)
241 fmt.Fprintln(w, err.Error())
244 type countingWriter struct {
249 func (cw *countingWriter) Write(p []byte) (int, error) {
250 n, err := cw.writer.Write(p)
255 // Split s by sep, trim whitespace from each part, and drop empty
257 func trimSplit(s, sep string) []string {
259 for _, part := range strings.Split(s, sep) {
260 part = strings.TrimSpace(part)
268 // setSizeOnWrite sets the Content-Length header to the given size on
270 type setSizeOnWrite struct {
276 func (ss *setSizeOnWrite) Write(p []byte) (int, error) {
278 ss.Header().Set("Content-Length", fmt.Sprintf("%d", ss.size))
281 return ss.ResponseWriter.Write(p)
284 func (ss *setSizeOnWrite) Unwrap() http.ResponseWriter {
285 return ss.ResponseWriter
288 type discardWrite struct {
292 func (discardWrite) Write(p []byte) (int, error) {
296 func (dw discardWrite) Unwrap() http.ResponseWriter {
297 return dw.ResponseWriter
300 func corsHandler(h http.Handler) http.Handler {
301 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
307 var corsHeaders = map[string]string{
308 "Access-Control-Allow-Methods": "GET, HEAD, PUT, OPTIONS",
309 "Access-Control-Allow-Origin": "*",
310 "Access-Control-Allow-Headers": "Authorization, Content-Length, Content-Type, " + keepclient.XKeepDesiredReplicas + ", " + keepclient.XKeepSignature + ", " + keepclient.XKeepStorageClasses,
311 "Access-Control-Expose-Headers": keepclient.XKeepLocator + ", " + keepclient.XKeepReplicasStored + ", " + keepclient.XKeepStorageClassesConfirmed,
312 "Access-Control-Max-Age": "86486400",
315 func SetCORSHeaders(w http.ResponseWriter) {
316 for k, v := range corsHeaders {