1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
18 "git.arvados.org/arvados.git/lib/service"
19 "git.arvados.org/arvados.git/sdk/go/arvados"
20 "git.arvados.org/arvados.git/sdk/go/auth"
21 "git.arvados.org/arvados.git/sdk/go/httpserver"
22 "github.com/gorilla/mux"
32 func newRouter(keepstore *keepstore, puller *puller, trasher *trasher) service.Handler {
38 adminonly := func(h http.HandlerFunc) http.HandlerFunc {
39 return auth.RequireLiteralToken(keepstore.cluster.SystemRootToken, h).ServeHTTP
43 locatorPath := `/{locator:[0-9a-f]{32}.*}`
44 get := r.Methods(http.MethodGet, http.MethodHead).Subrouter()
45 get.HandleFunc(locatorPath, rtr.handleBlockRead)
46 get.HandleFunc(`/index`, adminonly(rtr.handleIndex))
47 get.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, adminonly(rtr.handleIndex))
48 get.HandleFunc(`/mounts`, adminonly(rtr.handleMounts))
49 get.HandleFunc(`/mounts/{uuid}/blocks`, adminonly(rtr.handleIndex))
50 get.HandleFunc(`/mounts/{uuid}/blocks/{prefix:[0-9a-f]{0,32}}`, adminonly(rtr.handleIndex))
51 put := r.Methods(http.MethodPut).Subrouter()
52 put.HandleFunc(locatorPath, rtr.handleBlockWrite)
53 put.HandleFunc(`/pull`, adminonly(rtr.handlePullList))
54 put.HandleFunc(`/trash`, adminonly(rtr.handleTrashList))
55 put.HandleFunc(`/untrash`+locatorPath, adminonly(rtr.handleUntrash))
56 touch := r.Methods("TOUCH").Subrouter()
57 touch.HandleFunc(locatorPath, adminonly(rtr.handleBlockTouch))
58 delete := r.Methods(http.MethodDelete).Subrouter()
59 delete.HandleFunc(locatorPath, adminonly(rtr.handleBlockTrash))
60 r.NotFoundHandler = http.HandlerFunc(rtr.handleBadRequest)
61 r.MethodNotAllowedHandler = http.HandlerFunc(rtr.handleBadRequest)
62 rtr.Handler = auth.LoadToken(r)
66 func (rtr *router) CheckHealth() error {
70 func (rtr *router) Done() <-chan struct{} {
74 func (rtr *router) handleBlockRead(w http.ResponseWriter, req *http.Request) {
75 // Intervening proxies must not return a cached GET response
76 // to a prior request if a X-Keep-Signature request header has
77 // been added or changed.
78 w.Header().Add("Vary", "X-Keep-Signature")
79 var localLocator func(string)
80 if strings.SplitN(req.Header.Get("X-Keep-Signature"), ",", 2)[0] == "local" {
81 localLocator = func(locator string) {
82 w.Header().Set("X-Keep-Locator", locator)
86 if req.Method == http.MethodHead {
87 out = discardWrite{ResponseWriter: w}
88 } else if li, err := getLocatorInfo(mux.Vars(req)["locator"]); err != nil {
89 rtr.handleError(w, req, err)
91 } else if li.size == 0 && li.hash != "d41d8cd98f00b204e9800998ecf8427e" {
92 // GET {hash} (with no size hint) is not allowed
93 // because we can't report md5 mismatches.
94 rtr.handleError(w, req, errMethodNotAllowed)
97 n, err := rtr.keepstore.BlockRead(req.Context(), arvados.BlockReadOptions{
98 Locator: mux.Vars(req)["locator"],
100 LocalLocator: localLocator,
102 if err != nil && (n == 0 || req.Method == http.MethodHead) {
103 rtr.handleError(w, req, err)
108 func (rtr *router) handleBlockWrite(w http.ResponseWriter, req *http.Request) {
109 dataSize, _ := strconv.Atoi(req.Header.Get("Content-Length"))
110 replicas, _ := strconv.Atoi(req.Header.Get("X-Arvados-Replicas-Desired"))
111 resp, err := rtr.keepstore.BlockWrite(req.Context(), arvados.BlockWriteOptions{
112 Hash: mux.Vars(req)["locator"],
115 RequestID: req.Header.Get("X-Request-Id"),
116 StorageClasses: trimSplit(req.Header.Get("X-Keep-Storage-Classes"), ","),
120 rtr.handleError(w, req, err)
123 w.Header().Set("X-Keep-Replicas-Stored", fmt.Sprintf("%d", resp.Replicas))
125 for k, n := range resp.StorageClasses {
130 scc += fmt.Sprintf("%s=%d", k, n)
133 w.Header().Set("X-Keep-Storage-Classes-Confirmed", scc)
134 w.WriteHeader(http.StatusOK)
135 fmt.Fprintln(w, resp.Locator)
138 func (rtr *router) handleBlockTouch(w http.ResponseWriter, req *http.Request) {
139 err := rtr.keepstore.BlockTouch(req.Context(), mux.Vars(req)["locator"])
140 rtr.handleError(w, req, err)
143 func (rtr *router) handleBlockTrash(w http.ResponseWriter, req *http.Request) {
144 err := rtr.keepstore.BlockTrash(req.Context(), mux.Vars(req)["locator"])
145 rtr.handleError(w, req, err)
148 func (rtr *router) handleMounts(w http.ResponseWriter, req *http.Request) {
149 json.NewEncoder(w).Encode(rtr.keepstore.Mounts())
152 func (rtr *router) handleIndex(w http.ResponseWriter, req *http.Request) {
153 prefix := req.FormValue("prefix")
155 prefix = mux.Vars(req)["prefix"]
157 cw := &countingWriter{writer: w}
158 err := rtr.keepstore.Index(req.Context(), indexOptions{
159 MountUUID: mux.Vars(req)["uuid"],
163 if err != nil && cw.n.Load() == 0 {
164 // Nothing was written, so it's not too late to report
165 // an error via http response header. (Otherwise, all
166 // we can do is omit the trailing newline below to
167 // indicate something went wrong.)
168 rtr.handleError(w, req, err)
172 // A trailing blank line signals to the caller that
173 // the response is complete.
174 w.Write([]byte("\n"))
178 func (rtr *router) handlePullList(w http.ResponseWriter, req *http.Request) {
179 var pl []PullListItem
180 err := json.NewDecoder(req.Body).Decode(&pl)
182 rtr.handleError(w, req, err)
186 if len(pl) > 0 && len(pl[0].Locator) == 32 {
187 rtr.handleError(w, req, httpserver.ErrorWithStatus(errors.New("rejecting pull list containing a locator without a size hint -- this probably means keep-balance needs to be upgraded"), http.StatusBadRequest))
190 rtr.puller.SetPullList(pl)
193 func (rtr *router) handleTrashList(w http.ResponseWriter, req *http.Request) {
194 var tl []TrashListItem
195 err := json.NewDecoder(req.Body).Decode(&tl)
197 rtr.handleError(w, req, err)
201 rtr.trasher.SetTrashList(tl)
204 func (rtr *router) handleUntrash(w http.ResponseWriter, req *http.Request) {
205 err := rtr.keepstore.BlockUntrash(req.Context(), mux.Vars(req)["locator"])
206 rtr.handleError(w, req, err)
209 func (rtr *router) handleBadRequest(w http.ResponseWriter, req *http.Request) {
210 http.Error(w, "Bad Request", http.StatusBadRequest)
213 func (rtr *router) handleError(w http.ResponseWriter, req *http.Request, err error) {
214 if req.Context().Err() != nil {
220 } else if os.IsNotExist(err) {
221 w.WriteHeader(http.StatusNotFound)
222 } else if statusErr := interface{ HTTPStatus() int }(nil); errors.As(err, &statusErr) {
223 w.WriteHeader(statusErr.HTTPStatus())
225 w.WriteHeader(http.StatusInternalServerError)
227 fmt.Fprintln(w, err.Error())
230 type countingWriter struct {
235 func (cw *countingWriter) Write(p []byte) (int, error) {
236 n, err := cw.writer.Write(p)
241 // Split s by sep, trim whitespace from each part, and drop empty
243 func trimSplit(s, sep string) []string {
245 for _, part := range strings.Split(s, sep) {
246 part = strings.TrimSpace(part)
254 // setSizeOnWrite sets the Content-Length header to the given size on
256 type setSizeOnWrite struct {
262 func (ss *setSizeOnWrite) Write(p []byte) (int, error) {
264 ss.Header().Set("Content-Length", fmt.Sprintf("%d", ss.size))
267 return ss.ResponseWriter.Write(p)
270 type discardWrite struct {
274 func (discardWrite) Write(p []byte) (int, error) {