2769: reorganize REST handlers
[arvados.git] / services / keep / src / keep / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "bufio"
12         "bytes"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "runtime"
23         "strconv"
24         "strings"
25         "syscall"
26         "time"
27 )
28
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
31 //
32 func MakeRESTRouter() *mux.Router {
33         rest := mux.NewRouter()
34
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37         rest.HandleFunc(
38                 `/{hash:[0-9a-f]{32}}+{hints}`,
39                 GetBlockHandler).Methods("GET", "HEAD")
40
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42
43         // For IndexHandler we support:
44         //   /index           - returns all locators
45         //   /index/{prefix}  - returns all locators that begin with {prefix}
46         //      {prefix} is a string of hexadecimal digits between 0 and 32 digits.
47         //      If {prefix} is the empty string, return an index of all locators
48         //      (so /index and /index/ behave identically)
49         //      A client may supply a full 32-digit locator string, in which
50         //      case the server will return an index with either zero or one
51         //      entries. This usage allows a client to check whether a block is
52         //      present, and its size and upload time, without retrieving the
53         //      entire block.
54         //
55         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
56         rest.HandleFunc(
57                 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
58         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
59
60         // Any request which does not match any of these routes gets
61         // 400 Bad Request.
62         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
63
64         return rest
65 }
66
67 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
68         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
69 }
70
71 // FindKeepVolumes scans all mounted volumes on the system for Keep
72 // volumes, and returns a list of matching paths.
73 //
74 // A device is assumed to be a Keep volume if it is a normal or tmpfs
75 // volume and has a "/keep" directory directly underneath the mount
76 // point.
77 //
78 func FindKeepVolumes() []string {
79         vols := make([]string, 0)
80
81         if f, err := os.Open(PROC_MOUNTS); err != nil {
82                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
83         } else {
84                 scanner := bufio.NewScanner(f)
85                 for scanner.Scan() {
86                         args := strings.Fields(scanner.Text())
87                         dev, mount := args[0], args[1]
88                         if mount != "/" &&
89                                 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
90                                 keep := mount + "/keep"
91                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
92                                         vols = append(vols, keep)
93                                 }
94                         }
95                 }
96                 if err := scanner.Err(); err != nil {
97                         log.Fatal(err)
98                 }
99         }
100         return vols
101 }
102
103 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
104         hash := mux.Vars(req)["hash"]
105
106         log.Printf("%s %s", req.Method, hash)
107
108         hints := mux.Vars(req)["hints"]
109
110         // Parse the locator string and hints from the request.
111         // TODO(twp): implement a Locator type.
112         var signature, timestamp string
113         if hints != "" {
114                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
115                 for _, hint := range strings.Split(hints, "+") {
116                         if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
117                                 // Server ignores size hints
118                         } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
119                                 signature = m[1]
120                                 timestamp = m[2]
121                         } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
122                                 // Any unknown hint that starts with an uppercase letter is
123                                 // presumed to be valid and ignored, to permit forward compatibility.
124                         } else {
125                                 // Unknown format; not a valid locator.
126                                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
127                                 return
128                         }
129                 }
130         }
131
132         // If permission checking is in effect, verify this
133         // request's permission signature.
134         if enforce_permissions {
135                 if signature == "" || timestamp == "" {
136                         http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
137                         return
138                 } else if IsExpired(timestamp) {
139                         http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
140                         return
141                 } else {
142                         req_locator := req.URL.Path[1:] // strip leading slash
143                         if !VerifySignature(req_locator, GetApiToken(req)) {
144                                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
145                                 return
146                         }
147                 }
148         }
149
150         block, err := GetBlock(hash)
151
152         // Garbage collect after each GET. Fixes #2865.
153         // TODO(twp): review Keep memory usage and see if there's
154         // a better way to do this than blindly garbage collecting
155         // after every block.
156         defer runtime.GC()
157
158         if err != nil {
159                 // This type assertion is safe because the only errors
160                 // GetBlock can return are DiskHashError or NotFoundError.
161                 if err == NotFoundError {
162                         log.Printf("%s: not found, giving up\n", hash)
163                 }
164                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
165                 return
166         }
167
168         resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
169
170         _, err = resp.Write(block)
171         if err != nil {
172                 log.Printf("GetBlockHandler: writing response: %s", err)
173         }
174
175         return
176 }
177
178 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
179         // Garbage collect after each PUT. Fixes #2865.
180         // See also GetBlockHandler.
181         defer runtime.GC()
182
183         hash := mux.Vars(req)["hash"]
184
185         log.Printf("%s %s", req.Method, hash)
186
187         // Read the block data to be stored.
188         // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
189         //
190         if req.ContentLength > BLOCKSIZE {
191                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
192                 return
193         }
194
195         buf := make([]byte, req.ContentLength)
196         nread, err := io.ReadFull(req.Body, buf)
197         if err != nil {
198                 http.Error(resp, err.Error(), 500)
199         } else if int64(nread) < req.ContentLength {
200                 http.Error(resp, "request truncated", 500)
201         } else {
202                 if err := PutBlock(buf, hash); err == nil {
203                         // Success; add a size hint, sign the locator if
204                         // possible, and return it to the client.
205                         return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
206                         api_token := GetApiToken(req)
207                         if PermissionSecret != nil && api_token != "" {
208                                 expiry := time.Now().Add(permission_ttl)
209                                 return_hash = SignLocator(return_hash, api_token, expiry)
210                         }
211                         resp.Write([]byte(return_hash + "\n"))
212                 } else {
213                         ke := err.(*KeepError)
214                         http.Error(resp, ke.Error(), ke.HTTPCode)
215                 }
216         }
217         return
218 }
219
220 // IndexHandler
221 //     A HandleFunc to address /index and /index/{prefix} requests.
222 //
223 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
224         prefix := mux.Vars(req)["prefix"]
225
226         // Only the data manager may issue /index requests,
227         // and only if enforce_permissions is enabled.
228         // All other requests return 403 Forbidden.
229         api_token := GetApiToken(req)
230         if !enforce_permissions ||
231                 api_token == "" ||
232                 data_manager_token != api_token {
233                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
234                 return
235         }
236         var index string
237         for _, vol := range KeepVM.Volumes() {
238                 index = index + vol.Index(prefix)
239         }
240         resp.Write([]byte(index))
241 }
242
243 // StatusHandler
244 //     Responds to /status.json requests with the current node status,
245 //     described in a JSON structure.
246 //
247 //     The data given in a status.json response includes:
248 //        volumes - a list of Keep volumes currently in use by this server
249 //          each volume is an object with the following fields:
250 //            * mount_point
251 //            * device_num (an integer identifying the underlying filesystem)
252 //            * bytes_free
253 //            * bytes_used
254 //
255 type VolumeStatus struct {
256         MountPoint string `json:"mount_point"`
257         DeviceNum  uint64 `json:"device_num"`
258         BytesFree  uint64 `json:"bytes_free"`
259         BytesUsed  uint64 `json:"bytes_used"`
260 }
261
262 type NodeStatus struct {
263         Volumes []*VolumeStatus `json:"volumes"`
264 }
265
266 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
267         st := GetNodeStatus()
268         if jstat, err := json.Marshal(st); err == nil {
269                 resp.Write(jstat)
270         } else {
271                 log.Printf("json.Marshal: %s\n", err)
272                 log.Printf("NodeStatus = %v\n", st)
273                 http.Error(resp, err.Error(), 500)
274         }
275 }
276
277 // GetNodeStatus
278 //     Returns a NodeStatus struct describing this Keep
279 //     node's current status.
280 //
281 func GetNodeStatus() *NodeStatus {
282         st := new(NodeStatus)
283
284         st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
285         for i, vol := range KeepVM.Volumes() {
286                 st.Volumes[i] = vol.Status()
287         }
288         return st
289 }
290
291 // GetVolumeStatus
292 //     Returns a VolumeStatus describing the requested volume.
293 //
294 func GetVolumeStatus(volume string) *VolumeStatus {
295         var fs syscall.Statfs_t
296         var devnum uint64
297
298         if fi, err := os.Stat(volume); err == nil {
299                 devnum = fi.Sys().(*syscall.Stat_t).Dev
300         } else {
301                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
302                 return nil
303         }
304
305         err := syscall.Statfs(volume, &fs)
306         if err != nil {
307                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
308                 return nil
309         }
310         // These calculations match the way df calculates disk usage:
311         // "free" space is measured by fs.Bavail, but "used" space
312         // uses fs.Blocks - fs.Bfree.
313         free := fs.Bavail * uint64(fs.Bsize)
314         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
315         return &VolumeStatus{volume, devnum, free, used}
316 }
317
318 func GetBlock(hash string) ([]byte, error) {
319         // Attempt to read the requested hash from a keep volume.
320         error_to_caller := NotFoundError
321
322         for _, vol := range KeepVM.Volumes() {
323                 if buf, err := vol.Get(hash); err != nil {
324                         // IsNotExist is an expected error and may be ignored.
325                         // (If all volumes report IsNotExist, we return a NotFoundError)
326                         // All other errors should be logged but we continue trying to
327                         // read.
328                         switch {
329                         case os.IsNotExist(err):
330                                 continue
331                         default:
332                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
333                         }
334                 } else {
335                         // Double check the file checksum.
336                         //
337                         filehash := fmt.Sprintf("%x", md5.Sum(buf))
338                         if filehash != hash {
339                                 // TODO(twp): this condition probably represents a bad disk and
340                                 // should raise major alarm bells for an administrator: e.g.
341                                 // they should be sent directly to an event manager at high
342                                 // priority or logged as urgent problems.
343                                 //
344                                 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
345                                         vol, hash, filehash)
346                                 error_to_caller = DiskHashError
347                         } else {
348                                 // Success!
349                                 if error_to_caller != NotFoundError {
350                                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
351                                                 vol, hash)
352                                 }
353                                 return buf, nil
354                         }
355                 }
356         }
357
358         if error_to_caller != NotFoundError {
359                 log.Printf("%s: checksum mismatch, no good copy found\n", hash)
360         }
361         return nil, error_to_caller
362 }
363
364 /* PutBlock(block, hash)
365    Stores the BLOCK (identified by the content id HASH) in Keep.
366
367    The MD5 checksum of the block must be identical to the content id HASH.
368    If not, an error is returned.
369
370    PutBlock stores the BLOCK on the first Keep volume with free space.
371    A failure code is returned to the user only if all volumes fail.
372
373    On success, PutBlock returns nil.
374    On failure, it returns a KeepError with one of the following codes:
375
376    500 Collision
377           A different block with the same hash already exists on this
378           Keep server.
379    422 MD5Fail
380           The MD5 hash of the BLOCK does not match the argument HASH.
381    503 Full
382           There was not enough space left in any Keep volume to store
383           the object.
384    500 Fail
385           The object could not be stored for some other reason (e.g.
386           all writes failed). The text of the error message should
387           provide as much detail as possible.
388 */
389
390 func PutBlock(block []byte, hash string) error {
391         // Check that BLOCK's checksum matches HASH.
392         blockhash := fmt.Sprintf("%x", md5.Sum(block))
393         if blockhash != hash {
394                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
395                 return RequestHashError
396         }
397
398         // If we already have a block on disk under this identifier, return
399         // success (but check for MD5 collisions).
400         // The only errors that GetBlock can return are DiskHashError and NotFoundError.
401         // In either case, we want to write our new (good) block to disk,
402         // so there is nothing special to do if err != nil.
403         if oldblock, err := GetBlock(hash); err == nil {
404                 if bytes.Compare(block, oldblock) == 0 {
405                         return nil
406                 } else {
407                         return CollisionError
408                 }
409         }
410
411         // Choose a Keep volume to write to.
412         // If this volume fails, try all of the volumes in order.
413         vol := KeepVM.Choose()
414         if err := vol.Put(hash, block); err == nil {
415                 return nil // success!
416         } else {
417                 allFull := true
418                 for _, vol := range KeepVM.Volumes() {
419                         err := vol.Put(hash, block)
420                         if err == nil {
421                                 return nil // success!
422                         }
423                         if err != FullError {
424                                 // The volume is not full but the write did not succeed.
425                                 // Report the error and continue trying.
426                                 allFull = false
427                                 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
428                         }
429                 }
430
431                 if allFull {
432                         log.Printf("all Keep volumes full")
433                         return FullError
434                 } else {
435                         log.Printf("all Keep volumes failed")
436                         return GenericError
437                 }
438         }
439 }
440
441 // IsValidLocator
442 //     Return true if the specified string is a valid Keep locator.
443 //     When Keep is extended to support hash types other than MD5,
444 //     this should be updated to cover those as well.
445 //
446 func IsValidLocator(loc string) bool {
447         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
448         if err == nil {
449                 return match
450         }
451         log.Printf("IsValidLocator: %s\n", err)
452         return false
453 }
454
455 // GetApiToken returns the OAuth2 token from the Authorization
456 // header of a HTTP request, or an empty string if no matching
457 // token is found.
458 func GetApiToken(req *http.Request) string {
459         if auth, ok := req.Header["Authorization"]; ok {
460                 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
461                         log.Println(err)
462                 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
463                         return match[1]
464                 }
465         }
466         return ""
467 }
468
469 // IsExpired returns true if the given Unix timestamp (expressed as a
470 // hexadecimal string) is in the past, or if timestamp_hex cannot be
471 // parsed as a hexadecimal string.
472 func IsExpired(timestamp_hex string) bool {
473         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
474         if err != nil {
475                 log.Printf("IsExpired: %s\n", err)
476                 return true
477         }
478         return time.Unix(ts, 0).Before(time.Now())
479 }