Merge branch '3414-keep-pull-handler'
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "bufio"
12         "bytes"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "git.curoverse.com/arvados.git/services/keepstore/pull_list"
17         "github.com/gorilla/mux"
18         "io"
19         "log"
20         "net/http"
21         "os"
22         "regexp"
23         "runtime"
24         "strconv"
25         "strings"
26         "syscall"
27         "time"
28 )
29
30 // MakeRESTRouter returns a new mux.Router that forwards all Keep
31 // requests to the appropriate handlers.
32 //
33 func MakeRESTRouter() *mux.Router {
34         rest := mux.NewRouter()
35
36         rest.HandleFunc(
37                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
38         rest.HandleFunc(
39                 `/{hash:[0-9a-f]{32}}+{hints}`,
40                 GetBlockHandler).Methods("GET", "HEAD")
41
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
43         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
44
45         // For IndexHandler we support:
46         //   /index           - returns all locators
47         //   /index/{prefix}  - returns all locators that begin with {prefix}
48         //      {prefix} is a string of hexadecimal digits between 0 and 32 digits.
49         //      If {prefix} is the empty string, return an index of all locators
50         //      (so /index and /index/ behave identically)
51         //      A client may supply a full 32-digit locator string, in which
52         //      case the server will return an index with either zero or one
53         //      entries. This usage allows a client to check whether a block is
54         //      present, and its size and upload time, without retrieving the
55         //      entire block.
56         //
57         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
58         rest.HandleFunc(
59                 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
60         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
61
62         // The PullHandler processes "PUT /pull" commands from Data Manager.
63         // It parses the JSON list of pull requests in the request body, and
64         // delivers them to the pull list manager for replication.
65         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
66
67         // Any request which does not match any of these routes gets
68         // 400 Bad Request.
69         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
70
71         return rest
72 }
73
74 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
75         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
76 }
77
78 // FindKeepVolumes scans all mounted volumes on the system for Keep
79 // volumes, and returns a list of matching paths.
80 //
81 // A device is assumed to be a Keep volume if it is a normal or tmpfs
82 // volume and has a "/keep" directory directly underneath the mount
83 // point.
84 //
85 func FindKeepVolumes() []string {
86         vols := make([]string, 0)
87
88         if f, err := os.Open(PROC_MOUNTS); err != nil {
89                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
90         } else {
91                 scanner := bufio.NewScanner(f)
92                 for scanner.Scan() {
93                         args := strings.Fields(scanner.Text())
94                         dev, mount := args[0], args[1]
95                         if mount != "/" &&
96                                 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
97                                 keep := mount + "/keep"
98                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
99                                         vols = append(vols, keep)
100                                 }
101                         }
102                 }
103                 if err := scanner.Err(); err != nil {
104                         log.Fatal(err)
105                 }
106         }
107         return vols
108 }
109
110 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
111         hash := mux.Vars(req)["hash"]
112
113         log.Printf("%s %s", req.Method, hash)
114
115         hints := mux.Vars(req)["hints"]
116
117         // Parse the locator string and hints from the request.
118         // TODO(twp): implement a Locator type.
119         var signature, timestamp string
120         if hints != "" {
121                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
122                 for _, hint := range strings.Split(hints, "+") {
123                         if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
124                                 // Server ignores size hints
125                         } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
126                                 signature = m[1]
127                                 timestamp = m[2]
128                         } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
129                                 // Any unknown hint that starts with an uppercase letter is
130                                 // presumed to be valid and ignored, to permit forward compatibility.
131                         } else {
132                                 // Unknown format; not a valid locator.
133                                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
134                                 return
135                         }
136                 }
137         }
138
139         // If permission checking is in effect, verify this
140         // request's permission signature.
141         if enforce_permissions {
142                 if signature == "" || timestamp == "" {
143                         http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
144                         return
145                 } else if IsExpired(timestamp) {
146                         http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
147                         return
148                 } else {
149                         req_locator := req.URL.Path[1:] // strip leading slash
150                         if !VerifySignature(req_locator, GetApiToken(req)) {
151                                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
152                                 return
153                         }
154                 }
155         }
156
157         block, err := GetBlock(hash, false)
158
159         // Garbage collect after each GET. Fixes #2865.
160         // TODO(twp): review Keep memory usage and see if there's
161         // a better way to do this than blindly garbage collecting
162         // after every block.
163         defer runtime.GC()
164
165         if err != nil {
166                 // This type assertion is safe because the only errors
167                 // GetBlock can return are DiskHashError or NotFoundError.
168                 if err == NotFoundError {
169                         log.Printf("%s: not found, giving up\n", hash)
170                 }
171                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
172                 return
173         }
174
175         resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
176
177         _, err = resp.Write(block)
178         if err != nil {
179                 log.Printf("GetBlockHandler: writing response: %s", err)
180         }
181
182         return
183 }
184
185 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
186         // Garbage collect after each PUT. Fixes #2865.
187         // See also GetBlockHandler.
188         defer runtime.GC()
189
190         hash := mux.Vars(req)["hash"]
191
192         log.Printf("%s %s", req.Method, hash)
193
194         // Read the block data to be stored.
195         // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
196         //
197         if req.ContentLength > BLOCKSIZE {
198                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
199                 return
200         }
201
202         buf := make([]byte, req.ContentLength)
203         nread, err := io.ReadFull(req.Body, buf)
204         if err != nil {
205                 http.Error(resp, err.Error(), 500)
206         } else if int64(nread) < req.ContentLength {
207                 http.Error(resp, "request truncated", 500)
208         } else {
209                 if err := PutBlock(buf, hash); err == nil {
210                         // Success; add a size hint, sign the locator if
211                         // possible, and return it to the client.
212                         return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
213                         api_token := GetApiToken(req)
214                         if PermissionSecret != nil && api_token != "" {
215                                 expiry := time.Now().Add(permission_ttl)
216                                 return_hash = SignLocator(return_hash, api_token, expiry)
217                         }
218                         resp.Write([]byte(return_hash + "\n"))
219                 } else {
220                         ke := err.(*KeepError)
221                         http.Error(resp, ke.Error(), ke.HTTPCode)
222                 }
223         }
224         return
225 }
226
227 // IndexHandler
228 //     A HandleFunc to address /index and /index/{prefix} requests.
229 //
230 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
231         prefix := mux.Vars(req)["prefix"]
232
233         // Only the data manager may issue /index requests,
234         // and only if enforce_permissions is enabled.
235         // All other requests return 403 Forbidden.
236         api_token := GetApiToken(req)
237         if !enforce_permissions ||
238                 api_token == "" ||
239                 data_manager_token != api_token {
240                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
241                 return
242         }
243         var index string
244         for _, vol := range KeepVM.Volumes() {
245                 index = index + vol.Index(prefix)
246         }
247         resp.Write([]byte(index))
248 }
249
250 // StatusHandler
251 //     Responds to /status.json requests with the current node status,
252 //     described in a JSON structure.
253 //
254 //     The data given in a status.json response includes:
255 //        volumes - a list of Keep volumes currently in use by this server
256 //          each volume is an object with the following fields:
257 //            * mount_point
258 //            * device_num (an integer identifying the underlying filesystem)
259 //            * bytes_free
260 //            * bytes_used
261 //
262 type VolumeStatus struct {
263         MountPoint string `json:"mount_point"`
264         DeviceNum  uint64 `json:"device_num"`
265         BytesFree  uint64 `json:"bytes_free"`
266         BytesUsed  uint64 `json:"bytes_used"`
267 }
268
269 type NodeStatus struct {
270         Volumes []*VolumeStatus `json:"volumes"`
271 }
272
273 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
274         st := GetNodeStatus()
275         if jstat, err := json.Marshal(st); err == nil {
276                 resp.Write(jstat)
277         } else {
278                 log.Printf("json.Marshal: %s\n", err)
279                 log.Printf("NodeStatus = %v\n", st)
280                 http.Error(resp, err.Error(), 500)
281         }
282 }
283
284 // GetNodeStatus
285 //     Returns a NodeStatus struct describing this Keep
286 //     node's current status.
287 //
288 func GetNodeStatus() *NodeStatus {
289         st := new(NodeStatus)
290
291         st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
292         for i, vol := range KeepVM.Volumes() {
293                 st.Volumes[i] = vol.Status()
294         }
295         return st
296 }
297
298 // GetVolumeStatus
299 //     Returns a VolumeStatus describing the requested volume.
300 //
301 func GetVolumeStatus(volume string) *VolumeStatus {
302         var fs syscall.Statfs_t
303         var devnum uint64
304
305         if fi, err := os.Stat(volume); err == nil {
306                 devnum = fi.Sys().(*syscall.Stat_t).Dev
307         } else {
308                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
309                 return nil
310         }
311
312         err := syscall.Statfs(volume, &fs)
313         if err != nil {
314                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
315                 return nil
316         }
317         // These calculations match the way df calculates disk usage:
318         // "free" space is measured by fs.Bavail, but "used" space
319         // uses fs.Blocks - fs.Bfree.
320         free := fs.Bavail * uint64(fs.Bsize)
321         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
322         return &VolumeStatus{volume, devnum, free, used}
323 }
324
325 // DeleteHandler processes DELETE requests.
326 //
327 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
328 // from all connected volumes.
329 //
330 // Only the Data Manager, or an Arvados admin with scope "all", are
331 // allowed to issue DELETE requests.  If a DELETE request is not
332 // authenticated or is issued by a non-admin user, the server returns
333 // a PermissionError.
334 //
335 // Upon receiving a valid request from an authorized user,
336 // DeleteHandler deletes all copies of the specified block on local
337 // writable volumes.
338 //
339 // Response format:
340 //
341 // If the requested blocks was not found on any volume, the response
342 // code is HTTP 404 Not Found.
343 //
344 // Otherwise, the response code is 200 OK, with a response body
345 // consisting of the JSON message
346 //
347 //    {"copies_deleted":d,"copies_failed":f}
348 //
349 // where d and f are integers representing the number of blocks that
350 // were successfully and unsuccessfully deleted.
351 //
352 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
353         hash := mux.Vars(req)["hash"]
354         log.Printf("%s %s", req.Method, hash)
355
356         // Confirm that this user is an admin and has a token with unlimited scope.
357         var tok = GetApiToken(req)
358         if tok == "" || !CanDelete(tok) {
359                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
360                 return
361         }
362
363         if never_delete {
364                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
365                 return
366         }
367
368         // Delete copies of this block from all available volumes.  Report
369         // how many blocks were successfully and unsuccessfully
370         // deleted.
371         var result struct {
372                 Deleted int `json:"copies_deleted"`
373                 Failed  int `json:"copies_failed"`
374         }
375         for _, vol := range KeepVM.Volumes() {
376                 if err := vol.Delete(hash); err == nil {
377                         result.Deleted++
378                 } else if os.IsNotExist(err) {
379                         continue
380                 } else {
381                         result.Failed++
382                         log.Println("DeleteHandler:", err)
383                 }
384         }
385
386         var st int
387
388         if result.Deleted == 0 && result.Failed == 0 {
389                 st = http.StatusNotFound
390         } else {
391                 st = http.StatusOK
392         }
393
394         resp.WriteHeader(st)
395
396         if st == http.StatusOK {
397                 if body, err := json.Marshal(result); err == nil {
398                         resp.Write(body)
399                 } else {
400                         log.Printf("json.Marshal: %s (result = %v)\n", err, result)
401                         http.Error(resp, err.Error(), 500)
402                 }
403         }
404 }
405
406 /* PullHandler processes "PUT /pull" requests for the data manager.
407    The request body is a JSON message containing a list of pull
408    requests in the following format:
409
410    [
411       {
412          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
413          "servers":[
414                         "keep0.qr1hi.arvadosapi.com:25107",
415                         "keep1.qr1hi.arvadosapi.com:25108"
416                  ]
417           },
418           {
419                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
420                  "servers":[
421                         "10.0.1.5:25107",
422                         "10.0.1.6:25107",
423                         "10.0.1.7:25108"
424                  ]
425           },
426           ...
427    ]
428
429    Each pull request in the list consists of a block locator string
430    and an ordered list of servers.  Keepstore should try to fetch the
431    block from each server in turn.
432
433    If the request has not been sent by the Data Manager, return 401
434    Unauthorized.
435
436    If the JSON unmarshalling fails, return 400 Bad Request.
437 */
438
439 func PullHandler(resp http.ResponseWriter, req *http.Request) {
440         // Reject unauthorized requests.
441         api_token := GetApiToken(req)
442         if !IsDataManagerToken(api_token) {
443                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
444                 log.Printf("%s %s: %s\n", req.Method, req.URL, UnauthorizedError.Error())
445                 return
446         }
447
448         // Parse the request body.
449         var plist []pull_list.PullRequest
450         r := json.NewDecoder(req.Body)
451         if err := r.Decode(&plist); err != nil {
452                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
453                 log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
454                 return
455         }
456
457         // We have a properly formatted pull list sent from the data
458         // manager.  Report success and send the list to the pull list
459         // manager for further handling.
460         log.Printf("%s %s: received %v\n", req.Method, req.URL, plist)
461         resp.WriteHeader(http.StatusOK)
462         resp.Write([]byte(
463                 fmt.Sprintf("Received %d pull requests\n", len(plist))))
464
465         if pullmgr == nil {
466                 pullmgr = pull_list.NewManager()
467         }
468         pullmgr.SetList(plist)
469 }
470
471 // ==============================
472 // GetBlock and PutBlock implement lower-level code for handling
473 // blocks by rooting through volumes connected to the local machine.
474 // Once the handler has determined that system policy permits the
475 // request, it calls these methods to perform the actual operation.
476 //
477 // TODO(twp): this code would probably be better located in the
478 // VolumeManager interface. As an abstraction, the VolumeManager
479 // should be the only part of the code that cares about which volume a
480 // block is stored on, so it should be responsible for figuring out
481 // which volume to check for fetching blocks, storing blocks, etc.
482
483 // ==============================
484 // GetBlock fetches and returns the block identified by "hash".  If
485 // the update_timestamp argument is true, GetBlock also updates the
486 // block's file modification time (for the sake of PutBlock, which
487 // must update the file's timestamp when the block already exists).
488 //
489 // On success, GetBlock returns a byte slice with the block data, and
490 // a nil error.
491 //
492 // If the block cannot be found on any volume, returns NotFoundError.
493 //
494 // If the block found does not have the correct MD5 hash, returns
495 // DiskHashError.
496 //
497
498 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
499         // Attempt to read the requested hash from a keep volume.
500         error_to_caller := NotFoundError
501
502         for _, vol := range KeepVM.Volumes() {
503                 if buf, err := vol.Get(hash); err != nil {
504                         // IsNotExist is an expected error and may be ignored.
505                         // (If all volumes report IsNotExist, we return a NotFoundError)
506                         // All other errors should be logged but we continue trying to
507                         // read.
508                         switch {
509                         case os.IsNotExist(err):
510                                 continue
511                         default:
512                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
513                         }
514                 } else {
515                         // Double check the file checksum.
516                         //
517                         filehash := fmt.Sprintf("%x", md5.Sum(buf))
518                         if filehash != hash {
519                                 // TODO(twp): this condition probably represents a bad disk and
520                                 // should raise major alarm bells for an administrator: e.g.
521                                 // they should be sent directly to an event manager at high
522                                 // priority or logged as urgent problems.
523                                 //
524                                 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
525                                         vol, hash, filehash)
526                                 error_to_caller = DiskHashError
527                         } else {
528                                 // Success!
529                                 if error_to_caller != NotFoundError {
530                                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
531                                                 vol, hash)
532                                 }
533                                 // Update the timestamp if the caller requested.
534                                 // If we could not update the timestamp, continue looking on
535                                 // other volumes.
536                                 if update_timestamp {
537                                         if vol.Touch(hash) != nil {
538                                                 continue
539                                         }
540                                 }
541                                 return buf, nil
542                         }
543                 }
544         }
545
546         if error_to_caller != NotFoundError {
547                 log.Printf("%s: checksum mismatch, no good copy found\n", hash)
548         }
549         return nil, error_to_caller
550 }
551
552 /* PutBlock(block, hash)
553    Stores the BLOCK (identified by the content id HASH) in Keep.
554
555    The MD5 checksum of the block must be identical to the content id HASH.
556    If not, an error is returned.
557
558    PutBlock stores the BLOCK on the first Keep volume with free space.
559    A failure code is returned to the user only if all volumes fail.
560
561    On success, PutBlock returns nil.
562    On failure, it returns a KeepError with one of the following codes:
563
564    500 Collision
565           A different block with the same hash already exists on this
566           Keep server.
567    422 MD5Fail
568           The MD5 hash of the BLOCK does not match the argument HASH.
569    503 Full
570           There was not enough space left in any Keep volume to store
571           the object.
572    500 Fail
573           The object could not be stored for some other reason (e.g.
574           all writes failed). The text of the error message should
575           provide as much detail as possible.
576 */
577
578 func PutBlock(block []byte, hash string) error {
579         // Check that BLOCK's checksum matches HASH.
580         blockhash := fmt.Sprintf("%x", md5.Sum(block))
581         if blockhash != hash {
582                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
583                 return RequestHashError
584         }
585
586         // If we already have a block on disk under this identifier, return
587         // success (but check for MD5 collisions).  While fetching the block,
588         // update its timestamp.
589         // The only errors that GetBlock can return are DiskHashError and NotFoundError.
590         // In either case, we want to write our new (good) block to disk,
591         // so there is nothing special to do if err != nil.
592         //
593         if oldblock, err := GetBlock(hash, true); err == nil {
594                 if bytes.Compare(block, oldblock) == 0 {
595                         // The block already exists; return success.
596                         return nil
597                 } else {
598                         return CollisionError
599                 }
600         }
601
602         // Choose a Keep volume to write to.
603         // If this volume fails, try all of the volumes in order.
604         vol := KeepVM.Choose()
605         if err := vol.Put(hash, block); err == nil {
606                 return nil // success!
607         } else {
608                 allFull := true
609                 for _, vol := range KeepVM.Volumes() {
610                         err := vol.Put(hash, block)
611                         if err == nil {
612                                 return nil // success!
613                         }
614                         if err != FullError {
615                                 // The volume is not full but the write did not succeed.
616                                 // Report the error and continue trying.
617                                 allFull = false
618                                 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
619                         }
620                 }
621
622                 if allFull {
623                         log.Printf("all Keep volumes full")
624                         return FullError
625                 } else {
626                         log.Printf("all Keep volumes failed")
627                         return GenericError
628                 }
629         }
630 }
631
632 // IsValidLocator
633 //     Return true if the specified string is a valid Keep locator.
634 //     When Keep is extended to support hash types other than MD5,
635 //     this should be updated to cover those as well.
636 //
637 func IsValidLocator(loc string) bool {
638         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
639         if err == nil {
640                 return match
641         }
642         log.Printf("IsValidLocator: %s\n", err)
643         return false
644 }
645
646 // GetApiToken returns the OAuth2 token from the Authorization
647 // header of a HTTP request, or an empty string if no matching
648 // token is found.
649 func GetApiToken(req *http.Request) string {
650         if auth, ok := req.Header["Authorization"]; ok {
651                 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
652                         log.Println(err)
653                 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
654                         return match[1]
655                 }
656         }
657         return ""
658 }
659
660 // IsExpired returns true if the given Unix timestamp (expressed as a
661 // hexadecimal string) is in the past, or if timestamp_hex cannot be
662 // parsed as a hexadecimal string.
663 func IsExpired(timestamp_hex string) bool {
664         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
665         if err != nil {
666                 log.Printf("IsExpired: %s\n", err)
667                 return true
668         }
669         return time.Unix(ts, 0).Before(time.Now())
670 }
671
672 // CanDelete returns true if the user identified by api_token is
673 // allowed to delete blocks.
674 func CanDelete(api_token string) bool {
675         if api_token == "" {
676                 return false
677         }
678         // Blocks may be deleted only when Keep has been configured with a
679         // data manager.
680         if IsDataManagerToken(api_token) {
681                 return true
682         }
683         // TODO(twp): look up api_token with the API server
684         // return true if is_admin is true and if the token
685         // has unlimited scope
686         return false
687 }
688
689 // IsDataManagerToken returns true if api_token represents the data
690 // manager's token.
691 func IsDataManagerToken(api_token string) bool {
692         return data_manager_token != "" && api_token == data_manager_token
693 }