e0e3c6174ada43a2e60c09a01b319831d6ff0d70
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "bytes"
12         "container/list"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "runtime"
23         "strconv"
24         "strings"
25         "syscall"
26         "time"
27 )
28
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
31 //
32 func MakeRESTRouter() *mux.Router {
33         rest := mux.NewRouter()
34
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37         rest.HandleFunc(
38                 `/{hash:[0-9a-f]{32}}+{hints}`,
39                 GetBlockHandler).Methods("GET", "HEAD")
40
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
43         // List all blocks stored here. Privileged client only.
44         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
45         // List blocks stored here whose hash has the given prefix.
46         // Privileged client only.
47         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
48
49         // List volumes: path, device number, bytes used/avail.
50         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
51
52         // Replace the current pull queue.
53         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
54
55         // Replace the current trash queue.
56         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
57
58         // Any request which does not match any of these routes gets
59         // 400 Bad Request.
60         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
61
62         return rest
63 }
64
65 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
66         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
67 }
68
69 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
70         hash := mux.Vars(req)["hash"]
71
72         hints := mux.Vars(req)["hints"]
73
74         // Parse the locator string and hints from the request.
75         // TODO(twp): implement a Locator type.
76         var signature, timestamp string
77         if hints != "" {
78                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
79                 for _, hint := range strings.Split(hints, "+") {
80                         if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
81                                 // Server ignores size hints
82                         } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
83                                 signature = m[1]
84                                 timestamp = m[2]
85                         } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
86                                 // Any unknown hint that starts with an uppercase letter is
87                                 // presumed to be valid and ignored, to permit forward compatibility.
88                         } else {
89                                 // Unknown format; not a valid locator.
90                                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
91                                 return
92                         }
93                 }
94         }
95
96         // If permission checking is in effect, verify this
97         // request's permission signature.
98         if enforce_permissions {
99                 if signature == "" || timestamp == "" {
100                         http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
101                         return
102                 } else if IsExpired(timestamp) {
103                         http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
104                         return
105                 } else {
106                         req_locator := req.URL.Path[1:] // strip leading slash
107                         if !VerifySignature(req_locator, GetApiToken(req)) {
108                                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
109                                 return
110                         }
111                 }
112         }
113
114         block, err := GetBlock(hash, false)
115         if err != nil {
116                 // This type assertion is safe because the only errors
117                 // GetBlock can return are DiskHashError or NotFoundError.
118                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
119                 return
120         }
121         defer bufs.Put(block)
122
123         resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
124         resp.Header().Set("Content-Type", "application/octet-stream")
125         resp.Write(block)
126 }
127
128 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
129         // Garbage collect after each PUT. Fixes #2865.
130         // See also GetBlockHandler.
131         defer runtime.GC()
132
133         hash := mux.Vars(req)["hash"]
134
135         // Detect as many error conditions as possible before reading
136         // the body: avoid transmitting data that will not end up
137         // being written anyway.
138
139         if req.ContentLength == -1 {
140                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
141                 return
142         }
143
144         if req.ContentLength > BLOCKSIZE {
145                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
146                 return
147         }
148
149         if len(KeepVM.AllWritable()) == 0 {
150                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
151                 return
152         }
153
154         buf := bufs.Get(int(req.ContentLength))
155         _, err := io.ReadFull(req.Body, buf)
156         if err != nil {
157                 http.Error(resp, err.Error(), 500)
158                 bufs.Put(buf)
159                 return
160         }
161
162         err = PutBlock(buf, hash)
163         bufs.Put(buf)
164
165         if err != nil {
166                 ke := err.(*KeepError)
167                 http.Error(resp, ke.Error(), ke.HTTPCode)
168                 return
169         }
170
171         // Success; add a size hint, sign the locator if possible, and
172         // return it to the client.
173         return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
174         api_token := GetApiToken(req)
175         if PermissionSecret != nil && api_token != "" {
176                 expiry := time.Now().Add(blob_signature_ttl)
177                 return_hash = SignLocator(return_hash, api_token, expiry)
178         }
179         resp.Write([]byte(return_hash + "\n"))
180 }
181
182 // IndexHandler
183 //     A HandleFunc to address /index and /index/{prefix} requests.
184 //
185 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
186         // Reject unauthorized requests.
187         if !IsDataManagerToken(GetApiToken(req)) {
188                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
189                 return
190         }
191
192         prefix := mux.Vars(req)["prefix"]
193
194         for _, vol := range KeepVM.AllReadable() {
195                 if err := vol.IndexTo(prefix, resp); err != nil {
196                         // The only errors returned by IndexTo are
197                         // write errors returned by resp.Write(),
198                         // which probably means the client has
199                         // disconnected and this error will never be
200                         // reported to the client -- but it will
201                         // appear in our own error log.
202                         http.Error(resp, err.Error(), http.StatusInternalServerError)
203                         return
204                 }
205         }
206 }
207
208 // StatusHandler
209 //     Responds to /status.json requests with the current node status,
210 //     described in a JSON structure.
211 //
212 //     The data given in a status.json response includes:
213 //        volumes - a list of Keep volumes currently in use by this server
214 //          each volume is an object with the following fields:
215 //            * mount_point
216 //            * device_num (an integer identifying the underlying filesystem)
217 //            * bytes_free
218 //            * bytes_used
219 //
220 type VolumeStatus struct {
221         MountPoint string `json:"mount_point"`
222         DeviceNum  uint64 `json:"device_num"`
223         BytesFree  uint64 `json:"bytes_free"`
224         BytesUsed  uint64 `json:"bytes_used"`
225 }
226
227 type NodeStatus struct {
228         Volumes []*VolumeStatus `json:"volumes"`
229 }
230
231 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
232         st := GetNodeStatus()
233         if jstat, err := json.Marshal(st); err == nil {
234                 resp.Write(jstat)
235         } else {
236                 log.Printf("json.Marshal: %s\n", err)
237                 log.Printf("NodeStatus = %v\n", st)
238                 http.Error(resp, err.Error(), 500)
239         }
240 }
241
242 // GetNodeStatus
243 //     Returns a NodeStatus struct describing this Keep
244 //     node's current status.
245 //
246 func GetNodeStatus() *NodeStatus {
247         st := new(NodeStatus)
248
249         st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
250         for i, vol := range KeepVM.AllReadable() {
251                 st.Volumes[i] = vol.Status()
252         }
253         return st
254 }
255
256 // GetVolumeStatus
257 //     Returns a VolumeStatus describing the requested volume.
258 //
259 func GetVolumeStatus(volume string) *VolumeStatus {
260         var fs syscall.Statfs_t
261         var devnum uint64
262
263         if fi, err := os.Stat(volume); err == nil {
264                 devnum = fi.Sys().(*syscall.Stat_t).Dev
265         } else {
266                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
267                 return nil
268         }
269
270         err := syscall.Statfs(volume, &fs)
271         if err != nil {
272                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
273                 return nil
274         }
275         // These calculations match the way df calculates disk usage:
276         // "free" space is measured by fs.Bavail, but "used" space
277         // uses fs.Blocks - fs.Bfree.
278         free := fs.Bavail * uint64(fs.Bsize)
279         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
280         return &VolumeStatus{volume, devnum, free, used}
281 }
282
283 // DeleteHandler processes DELETE requests.
284 //
285 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
286 // from all connected volumes.
287 //
288 // Only the Data Manager, or an Arvados admin with scope "all", are
289 // allowed to issue DELETE requests.  If a DELETE request is not
290 // authenticated or is issued by a non-admin user, the server returns
291 // a PermissionError.
292 //
293 // Upon receiving a valid request from an authorized user,
294 // DeleteHandler deletes all copies of the specified block on local
295 // writable volumes.
296 //
297 // Response format:
298 //
299 // If the requested blocks was not found on any volume, the response
300 // code is HTTP 404 Not Found.
301 //
302 // Otherwise, the response code is 200 OK, with a response body
303 // consisting of the JSON message
304 //
305 //    {"copies_deleted":d,"copies_failed":f}
306 //
307 // where d and f are integers representing the number of blocks that
308 // were successfully and unsuccessfully deleted.
309 //
310 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
311         hash := mux.Vars(req)["hash"]
312
313         // Confirm that this user is an admin and has a token with unlimited scope.
314         var tok = GetApiToken(req)
315         if tok == "" || !CanDelete(tok) {
316                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
317                 return
318         }
319
320         if never_delete {
321                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
322                 return
323         }
324
325         // Delete copies of this block from all available volumes.
326         // Report how many blocks were successfully deleted, and how
327         // many were found on writable volumes but not deleted.
328         var result struct {
329                 Deleted int `json:"copies_deleted"`
330                 Failed  int `json:"copies_failed"`
331         }
332         for _, vol := range KeepVM.AllWritable() {
333                 if err := vol.Delete(hash); err == nil {
334                         result.Deleted++
335                 } else if os.IsNotExist(err) {
336                         continue
337                 } else {
338                         result.Failed++
339                         log.Println("DeleteHandler:", err)
340                 }
341         }
342
343         var st int
344
345         if result.Deleted == 0 && result.Failed == 0 {
346                 st = http.StatusNotFound
347         } else {
348                 st = http.StatusOK
349         }
350
351         resp.WriteHeader(st)
352
353         if st == http.StatusOK {
354                 if body, err := json.Marshal(result); err == nil {
355                         resp.Write(body)
356                 } else {
357                         log.Printf("json.Marshal: %s (result = %v)\n", err, result)
358                         http.Error(resp, err.Error(), 500)
359                 }
360         }
361 }
362
363 /* PullHandler processes "PUT /pull" requests for the data manager.
364    The request body is a JSON message containing a list of pull
365    requests in the following format:
366
367    [
368       {
369          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
370          "servers":[
371                         "keep0.qr1hi.arvadosapi.com:25107",
372                         "keep1.qr1hi.arvadosapi.com:25108"
373                  ]
374           },
375           {
376                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
377                  "servers":[
378                         "10.0.1.5:25107",
379                         "10.0.1.6:25107",
380                         "10.0.1.7:25108"
381                  ]
382           },
383           ...
384    ]
385
386    Each pull request in the list consists of a block locator string
387    and an ordered list of servers.  Keepstore should try to fetch the
388    block from each server in turn.
389
390    If the request has not been sent by the Data Manager, return 401
391    Unauthorized.
392
393    If the JSON unmarshalling fails, return 400 Bad Request.
394 */
395
396 type PullRequest struct {
397         Locator string   `json:"locator"`
398         Servers []string `json:"servers"`
399 }
400
401 func PullHandler(resp http.ResponseWriter, req *http.Request) {
402         // Reject unauthorized requests.
403         if !IsDataManagerToken(GetApiToken(req)) {
404                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
405                 return
406         }
407
408         // Parse the request body.
409         var pr []PullRequest
410         r := json.NewDecoder(req.Body)
411         if err := r.Decode(&pr); err != nil {
412                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
413                 return
414         }
415
416         // We have a properly formatted pull list sent from the data
417         // manager.  Report success and send the list to the pull list
418         // manager for further handling.
419         resp.WriteHeader(http.StatusOK)
420         resp.Write([]byte(
421                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
422
423         plist := list.New()
424         for _, p := range pr {
425                 plist.PushBack(p)
426         }
427         pullq.ReplaceQueue(plist)
428 }
429
430 type TrashRequest struct {
431         Locator    string `json:"locator"`
432         BlockMtime int64  `json:"block_mtime"`
433 }
434
435 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
436         // Reject unauthorized requests.
437         if !IsDataManagerToken(GetApiToken(req)) {
438                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
439                 return
440         }
441
442         // Parse the request body.
443         var trash []TrashRequest
444         r := json.NewDecoder(req.Body)
445         if err := r.Decode(&trash); err != nil {
446                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
447                 return
448         }
449
450         // We have a properly formatted trash list sent from the data
451         // manager.  Report success and send the list to the trash work
452         // queue for further handling.
453         resp.WriteHeader(http.StatusOK)
454         resp.Write([]byte(
455                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
456
457         tlist := list.New()
458         for _, t := range trash {
459                 tlist.PushBack(t)
460         }
461         trashq.ReplaceQueue(tlist)
462 }
463
464 // ==============================
465 // GetBlock and PutBlock implement lower-level code for handling
466 // blocks by rooting through volumes connected to the local machine.
467 // Once the handler has determined that system policy permits the
468 // request, it calls these methods to perform the actual operation.
469 //
470 // TODO(twp): this code would probably be better located in the
471 // VolumeManager interface. As an abstraction, the VolumeManager
472 // should be the only part of the code that cares about which volume a
473 // block is stored on, so it should be responsible for figuring out
474 // which volume to check for fetching blocks, storing blocks, etc.
475
476 // ==============================
477 // GetBlock fetches and returns the block identified by "hash".  If
478 // the update_timestamp argument is true, GetBlock also updates the
479 // block's file modification time (for the sake of PutBlock, which
480 // must update the file's timestamp when the block already exists).
481 //
482 // On success, GetBlock returns a byte slice with the block data, and
483 // a nil error.
484 //
485 // If the block cannot be found on any volume, returns NotFoundError.
486 //
487 // If the block found does not have the correct MD5 hash, returns
488 // DiskHashError.
489 //
490
491 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
492         // Attempt to read the requested hash from a keep volume.
493         error_to_caller := NotFoundError
494
495         var vols []Volume
496         if update_timestamp {
497                 // Pointless to find the block on an unwritable volume
498                 // because Touch() will fail -- this is as good as
499                 // "not found" for purposes of callers who need to
500                 // update_timestamp.
501                 vols = KeepVM.AllWritable()
502         } else {
503                 vols = KeepVM.AllReadable()
504         }
505
506         for _, vol := range vols {
507                 buf, err := vol.Get(hash)
508                 if err != nil {
509                         // IsNotExist is an expected error and may be
510                         // ignored. All other errors are logged. In
511                         // any case we continue trying to read other
512                         // volumes. If all volumes report IsNotExist,
513                         // we return a NotFoundError.
514                         if !os.IsNotExist(err) {
515                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
516                         }
517                         continue
518                 }
519                 // Check the file checksum.
520                 //
521                 filehash := fmt.Sprintf("%x", md5.Sum(buf))
522                 if filehash != hash {
523                         // TODO: Try harder to tell a sysadmin about
524                         // this.
525                         log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
526                                 vol, hash, filehash)
527                         error_to_caller = DiskHashError
528                         continue
529                 }
530                 if error_to_caller == DiskHashError {
531                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
532                                 vol, hash)
533                 }
534                 if update_timestamp {
535                         if err := vol.Touch(hash); err != nil {
536                                 error_to_caller = GenericError
537                                 log.Printf("%s: Touch %s failed: %s",
538                                         vol, hash, error_to_caller)
539                                 continue
540                         }
541                 }
542                 return buf, nil
543         }
544         return nil, error_to_caller
545 }
546
547 /* PutBlock(block, hash)
548    Stores the BLOCK (identified by the content id HASH) in Keep.
549
550    The MD5 checksum of the block must be identical to the content id HASH.
551    If not, an error is returned.
552
553    PutBlock stores the BLOCK on the first Keep volume with free space.
554    A failure code is returned to the user only if all volumes fail.
555
556    On success, PutBlock returns nil.
557    On failure, it returns a KeepError with one of the following codes:
558
559    500 Collision
560           A different block with the same hash already exists on this
561           Keep server.
562    422 MD5Fail
563           The MD5 hash of the BLOCK does not match the argument HASH.
564    503 Full
565           There was not enough space left in any Keep volume to store
566           the object.
567    500 Fail
568           The object could not be stored for some other reason (e.g.
569           all writes failed). The text of the error message should
570           provide as much detail as possible.
571 */
572
573 func PutBlock(block []byte, hash string) error {
574         // Check that BLOCK's checksum matches HASH.
575         blockhash := fmt.Sprintf("%x", md5.Sum(block))
576         if blockhash != hash {
577                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
578                 return RequestHashError
579         }
580
581         // If we already have a block on disk under this identifier, return
582         // success (but check for MD5 collisions).  While fetching the block,
583         // update its timestamp.
584         // The only errors that GetBlock can return are DiskHashError and NotFoundError.
585         // In either case, we want to write our new (good) block to disk,
586         // so there is nothing special to do if err != nil.
587         //
588         if oldblock, err := GetBlock(hash, true); err == nil {
589                 if bytes.Compare(block, oldblock) == 0 {
590                         // The block already exists; return success.
591                         return nil
592                 } else {
593                         return CollisionError
594                 }
595         }
596
597         // Choose a Keep volume to write to.
598         // If this volume fails, try all of the volumes in order.
599         if vol := KeepVM.NextWritable(); vol != nil {
600                 if err := vol.Put(hash, block); err == nil {
601                         return nil // success!
602                 }
603         }
604
605         writables := KeepVM.AllWritable()
606         if len(writables) == 0 {
607                 log.Print("No writable volumes.")
608                 return FullError
609         }
610
611         allFull := true
612         for _, vol := range writables {
613                 err := vol.Put(hash, block)
614                 if err == nil {
615                         return nil // success!
616                 }
617                 if err != FullError {
618                         // The volume is not full but the
619                         // write did not succeed.  Report the
620                         // error and continue trying.
621                         allFull = false
622                         log.Printf("%s: Write(%s): %s\n", vol, hash, err)
623                 }
624         }
625
626         if allFull {
627                 log.Print("All volumes are full.")
628                 return FullError
629         } else {
630                 // Already logged the non-full errors.
631                 return GenericError
632         }
633 }
634
635 // IsValidLocator
636 //     Return true if the specified string is a valid Keep locator.
637 //     When Keep is extended to support hash types other than MD5,
638 //     this should be updated to cover those as well.
639 //
640 func IsValidLocator(loc string) bool {
641         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
642         if err == nil {
643                 return match
644         }
645         log.Printf("IsValidLocator: %s\n", err)
646         return false
647 }
648
649 // GetApiToken returns the OAuth2 token from the Authorization
650 // header of a HTTP request, or an empty string if no matching
651 // token is found.
652 func GetApiToken(req *http.Request) string {
653         if auth, ok := req.Header["Authorization"]; ok {
654                 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
655                         log.Println(err)
656                 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
657                         return match[1]
658                 }
659         }
660         return ""
661 }
662
663 // IsExpired returns true if the given Unix timestamp (expressed as a
664 // hexadecimal string) is in the past, or if timestamp_hex cannot be
665 // parsed as a hexadecimal string.
666 func IsExpired(timestamp_hex string) bool {
667         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
668         if err != nil {
669                 log.Printf("IsExpired: %s\n", err)
670                 return true
671         }
672         return time.Unix(ts, 0).Before(time.Now())
673 }
674
675 // CanDelete returns true if the user identified by api_token is
676 // allowed to delete blocks.
677 func CanDelete(api_token string) bool {
678         if api_token == "" {
679                 return false
680         }
681         // Blocks may be deleted only when Keep has been configured with a
682         // data manager.
683         if IsDataManagerToken(api_token) {
684                 return true
685         }
686         // TODO(twp): look up api_token with the API server
687         // return true if is_admin is true and if the token
688         // has unlimited scope
689         return false
690 }
691
692 // IsDataManagerToken returns true if api_token represents the data
693 // manager's token.
694 func IsDataManagerToken(api_token string) bool {
695         return data_manager_token != "" && api_token == data_manager_token
696 }