Merge branch '21639-keep-cache-dict' refs #21639
[arvados.git] / services / keepstore / router.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "encoding/json"
9         "errors"
10         "fmt"
11         "io"
12         "net/http"
13         "os"
14         "strconv"
15         "strings"
16         "sync/atomic"
17
18         "git.arvados.org/arvados.git/lib/service"
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/auth"
21         "git.arvados.org/arvados.git/sdk/go/httpserver"
22         "github.com/gorilla/mux"
23 )
24
25 type router struct {
26         http.Handler
27         keepstore *keepstore
28         puller    *puller
29         trasher   *trasher
30 }
31
32 func newRouter(keepstore *keepstore, puller *puller, trasher *trasher) service.Handler {
33         rtr := &router{
34                 keepstore: keepstore,
35                 puller:    puller,
36                 trasher:   trasher,
37         }
38         adminonly := func(h http.HandlerFunc) http.HandlerFunc {
39                 return auth.RequireLiteralToken(keepstore.cluster.SystemRootToken, h).ServeHTTP
40         }
41
42         r := mux.NewRouter()
43         locatorPath := `/{locator:[0-9a-f]{32}.*}`
44         get := r.Methods(http.MethodGet, http.MethodHead).Subrouter()
45         get.HandleFunc(locatorPath, rtr.handleBlockRead)
46         get.HandleFunc(`/index`, adminonly(rtr.handleIndex))
47         get.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, adminonly(rtr.handleIndex))
48         get.HandleFunc(`/mounts`, adminonly(rtr.handleMounts))
49         get.HandleFunc(`/mounts/{uuid}/blocks`, adminonly(rtr.handleIndex))
50         get.HandleFunc(`/mounts/{uuid}/blocks/{prefix:[0-9a-f]{0,32}}`, adminonly(rtr.handleIndex))
51         put := r.Methods(http.MethodPut).Subrouter()
52         put.HandleFunc(locatorPath, rtr.handleBlockWrite)
53         put.HandleFunc(`/pull`, adminonly(rtr.handlePullList))
54         put.HandleFunc(`/trash`, adminonly(rtr.handleTrashList))
55         put.HandleFunc(`/untrash`+locatorPath, adminonly(rtr.handleUntrash))
56         touch := r.Methods("TOUCH").Subrouter()
57         touch.HandleFunc(locatorPath, adminonly(rtr.handleBlockTouch))
58         delete := r.Methods(http.MethodDelete).Subrouter()
59         delete.HandleFunc(locatorPath, adminonly(rtr.handleBlockTrash))
60         r.NotFoundHandler = http.HandlerFunc(rtr.handleBadRequest)
61         r.MethodNotAllowedHandler = http.HandlerFunc(rtr.handleBadRequest)
62         rtr.Handler = auth.LoadToken(r)
63         return rtr
64 }
65
66 func (rtr *router) CheckHealth() error {
67         return nil
68 }
69
70 func (rtr *router) Done() <-chan struct{} {
71         return nil
72 }
73
74 func (rtr *router) handleBlockRead(w http.ResponseWriter, req *http.Request) {
75         // Intervening proxies must not return a cached GET response
76         // to a prior request if a X-Keep-Signature request header has
77         // been added or changed.
78         w.Header().Add("Vary", "X-Keep-Signature")
79         var localLocator func(string)
80         if strings.SplitN(req.Header.Get("X-Keep-Signature"), ",", 2)[0] == "local" {
81                 localLocator = func(locator string) {
82                         w.Header().Set("X-Keep-Locator", locator)
83                 }
84         }
85         out := w
86         if req.Method == http.MethodHead {
87                 out = discardWrite{ResponseWriter: w}
88         } else if li, err := getLocatorInfo(mux.Vars(req)["locator"]); err != nil {
89                 rtr.handleError(w, req, err)
90                 return
91         } else if li.size == 0 && li.hash != "d41d8cd98f00b204e9800998ecf8427e" {
92                 // GET {hash} (with no size hint) is not allowed
93                 // because we can't report md5 mismatches.
94                 rtr.handleError(w, req, errMethodNotAllowed)
95                 return
96         }
97         n, err := rtr.keepstore.BlockRead(req.Context(), arvados.BlockReadOptions{
98                 Locator:      mux.Vars(req)["locator"],
99                 WriteTo:      out,
100                 LocalLocator: localLocator,
101         })
102         if err != nil && (n == 0 || req.Method == http.MethodHead) {
103                 rtr.handleError(w, req, err)
104                 return
105         }
106 }
107
108 func (rtr *router) handleBlockWrite(w http.ResponseWriter, req *http.Request) {
109         dataSize, _ := strconv.Atoi(req.Header.Get("Content-Length"))
110         replicas, _ := strconv.Atoi(req.Header.Get("X-Arvados-Replicas-Desired"))
111         resp, err := rtr.keepstore.BlockWrite(req.Context(), arvados.BlockWriteOptions{
112                 Hash:           mux.Vars(req)["locator"],
113                 Reader:         req.Body,
114                 DataSize:       dataSize,
115                 RequestID:      req.Header.Get("X-Request-Id"),
116                 StorageClasses: trimSplit(req.Header.Get("X-Keep-Storage-Classes"), ","),
117                 Replicas:       replicas,
118         })
119         if err != nil {
120                 rtr.handleError(w, req, err)
121                 return
122         }
123         w.Header().Set("X-Keep-Replicas-Stored", fmt.Sprintf("%d", resp.Replicas))
124         scc := ""
125         for k, n := range resp.StorageClasses {
126                 if n > 0 {
127                         if scc != "" {
128                                 scc += "; "
129                         }
130                         scc += fmt.Sprintf("%s=%d", k, n)
131                 }
132         }
133         w.Header().Set("X-Keep-Storage-Classes-Confirmed", scc)
134         w.WriteHeader(http.StatusOK)
135         fmt.Fprintln(w, resp.Locator)
136 }
137
138 func (rtr *router) handleBlockTouch(w http.ResponseWriter, req *http.Request) {
139         err := rtr.keepstore.BlockTouch(req.Context(), mux.Vars(req)["locator"])
140         rtr.handleError(w, req, err)
141 }
142
143 func (rtr *router) handleBlockTrash(w http.ResponseWriter, req *http.Request) {
144         err := rtr.keepstore.BlockTrash(req.Context(), mux.Vars(req)["locator"])
145         rtr.handleError(w, req, err)
146 }
147
148 func (rtr *router) handleMounts(w http.ResponseWriter, req *http.Request) {
149         json.NewEncoder(w).Encode(rtr.keepstore.Mounts())
150 }
151
152 func (rtr *router) handleIndex(w http.ResponseWriter, req *http.Request) {
153         prefix := req.FormValue("prefix")
154         if prefix == "" {
155                 prefix = mux.Vars(req)["prefix"]
156         }
157         cw := &countingWriter{writer: w}
158         err := rtr.keepstore.Index(req.Context(), indexOptions{
159                 MountUUID: mux.Vars(req)["uuid"],
160                 Prefix:    prefix,
161                 WriteTo:   cw,
162         })
163         if err != nil && cw.n.Load() == 0 {
164                 // Nothing was written, so it's not too late to report
165                 // an error via http response header. (Otherwise, all
166                 // we can do is omit the trailing newline below to
167                 // indicate something went wrong.)
168                 rtr.handleError(w, req, err)
169                 return
170         }
171         if err == nil {
172                 // A trailing blank line signals to the caller that
173                 // the response is complete.
174                 w.Write([]byte("\n"))
175         }
176 }
177
178 func (rtr *router) handlePullList(w http.ResponseWriter, req *http.Request) {
179         var pl []PullListItem
180         err := json.NewDecoder(req.Body).Decode(&pl)
181         if err != nil {
182                 rtr.handleError(w, req, err)
183                 return
184         }
185         req.Body.Close()
186         if len(pl) > 0 && len(pl[0].Locator) == 32 {
187                 rtr.handleError(w, req, httpserver.ErrorWithStatus(errors.New("rejecting pull list containing a locator without a size hint -- this probably means keep-balance needs to be upgraded"), http.StatusBadRequest))
188                 return
189         }
190         rtr.puller.SetPullList(pl)
191 }
192
193 func (rtr *router) handleTrashList(w http.ResponseWriter, req *http.Request) {
194         var tl []TrashListItem
195         err := json.NewDecoder(req.Body).Decode(&tl)
196         if err != nil {
197                 rtr.handleError(w, req, err)
198                 return
199         }
200         req.Body.Close()
201         rtr.trasher.SetTrashList(tl)
202 }
203
204 func (rtr *router) handleUntrash(w http.ResponseWriter, req *http.Request) {
205         err := rtr.keepstore.BlockUntrash(req.Context(), mux.Vars(req)["locator"])
206         rtr.handleError(w, req, err)
207 }
208
209 func (rtr *router) handleBadRequest(w http.ResponseWriter, req *http.Request) {
210         http.Error(w, "Bad Request", http.StatusBadRequest)
211 }
212
213 func (rtr *router) handleError(w http.ResponseWriter, req *http.Request, err error) {
214         if req.Context().Err() != nil {
215                 w.WriteHeader(499)
216                 return
217         }
218         if err == nil {
219                 return
220         } else if os.IsNotExist(err) {
221                 w.WriteHeader(http.StatusNotFound)
222         } else if statusErr := interface{ HTTPStatus() int }(nil); errors.As(err, &statusErr) {
223                 w.WriteHeader(statusErr.HTTPStatus())
224         } else {
225                 w.WriteHeader(http.StatusInternalServerError)
226         }
227         fmt.Fprintln(w, err.Error())
228 }
229
230 type countingWriter struct {
231         writer io.Writer
232         n      atomic.Int64
233 }
234
235 func (cw *countingWriter) Write(p []byte) (int, error) {
236         n, err := cw.writer.Write(p)
237         cw.n.Add(int64(n))
238         return n, err
239 }
240
241 // Split s by sep, trim whitespace from each part, and drop empty
242 // parts.
243 func trimSplit(s, sep string) []string {
244         var r []string
245         for _, part := range strings.Split(s, sep) {
246                 part = strings.TrimSpace(part)
247                 if part != "" {
248                         r = append(r, part)
249                 }
250         }
251         return r
252 }
253
254 // setSizeOnWrite sets the Content-Length header to the given size on
255 // first write.
256 type setSizeOnWrite struct {
257         http.ResponseWriter
258         size  int
259         wrote bool
260 }
261
262 func (ss *setSizeOnWrite) Write(p []byte) (int, error) {
263         if !ss.wrote {
264                 ss.Header().Set("Content-Length", fmt.Sprintf("%d", ss.size))
265                 ss.wrote = true
266         }
267         return ss.ResponseWriter.Write(p)
268 }
269
270 type discardWrite struct {
271         http.ResponseWriter
272 }
273
274 func (discardWrite) Write(p []byte) (int, error) {
275         return len(p), nil
276 }