5745: Serialize writes and data reads, but allow concurrent requests
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "bytes"
12         "container/list"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "runtime"
23         "strconv"
24         "strings"
25         "syscall"
26         "time"
27 )
28
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
31 //
32 func MakeRESTRouter() *mux.Router {
33         rest := mux.NewRouter()
34
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37         rest.HandleFunc(
38                 `/{hash:[0-9a-f]{32}}+{hints}`,
39                 GetBlockHandler).Methods("GET", "HEAD")
40
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
43         // List all blocks stored here. Privileged client only.
44         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
45         // List blocks stored here whose hash has the given prefix.
46         // Privileged client only.
47         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
48
49         // List volumes: path, device number, bytes used/avail.
50         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
51
52         // Replace the current pull queue.
53         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
54
55         // Replace the current trash queue.
56         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
57
58         // Any request which does not match any of these routes gets
59         // 400 Bad Request.
60         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
61
62         return rest
63 }
64
65 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
66         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
67 }
68
69 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
70         hash := mux.Vars(req)["hash"]
71
72         hints := mux.Vars(req)["hints"]
73
74         // Parse the locator string and hints from the request.
75         // TODO(twp): implement a Locator type.
76         var signature, timestamp string
77         if hints != "" {
78                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
79                 for _, hint := range strings.Split(hints, "+") {
80                         if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
81                                 // Server ignores size hints
82                         } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
83                                 signature = m[1]
84                                 timestamp = m[2]
85                         } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
86                                 // Any unknown hint that starts with an uppercase letter is
87                                 // presumed to be valid and ignored, to permit forward compatibility.
88                         } else {
89                                 // Unknown format; not a valid locator.
90                                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
91                                 return
92                         }
93                 }
94         }
95
96         // If permission checking is in effect, verify this
97         // request's permission signature.
98         if enforce_permissions {
99                 if signature == "" || timestamp == "" {
100                         http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
101                         return
102                 } else if IsExpired(timestamp) {
103                         http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
104                         return
105                 } else {
106                         req_locator := req.URL.Path[1:] // strip leading slash
107                         if !VerifySignature(req_locator, GetApiToken(req)) {
108                                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
109                                 return
110                         }
111                 }
112         }
113
114         block, err := GetBlock(hash, false)
115
116         // Garbage collect after each GET. Fixes #2865.
117         // TODO(twp): review Keep memory usage and see if there's
118         // a better way to do this than blindly garbage collecting
119         // after every block.
120         defer runtime.GC()
121
122         if err != nil {
123                 // This type assertion is safe because the only errors
124                 // GetBlock can return are DiskHashError or NotFoundError.
125                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
126                 return
127         }
128
129         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
130
131         _, err = resp.Write(block)
132
133         return
134 }
135
136 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
137         // Garbage collect after each PUT. Fixes #2865.
138         // See also GetBlockHandler.
139         defer runtime.GC()
140
141         hash := mux.Vars(req)["hash"]
142
143         // Detect as many error conditions as possible before reading
144         // the body: avoid transmitting data that will not end up
145         // being written anyway.
146
147         if req.ContentLength == -1 {
148                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
149                 return
150         }
151
152         if req.ContentLength > BLOCKSIZE {
153                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
154                 return
155         }
156
157         if len(KeepVM.AllWritable()) == 0 {
158                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
159                 return
160         }
161
162         buf := make([]byte, req.ContentLength)
163         nread, err := io.ReadFull(req.Body, buf)
164         if err != nil {
165                 http.Error(resp, err.Error(), 500)
166                 return
167         } else if int64(nread) < req.ContentLength {
168                 http.Error(resp, "request truncated", 500)
169                 return
170         }
171
172         err = PutBlock(buf, hash)
173         if err != nil {
174                 ke := err.(*KeepError)
175                 http.Error(resp, ke.Error(), ke.HTTPCode)
176                 return
177         }
178
179         // Success; add a size hint, sign the locator if possible, and
180         // return it to the client.
181         return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
182         api_token := GetApiToken(req)
183         if PermissionSecret != nil && api_token != "" {
184                 expiry := time.Now().Add(blob_signature_ttl)
185                 return_hash = SignLocator(return_hash, api_token, expiry)
186         }
187         resp.Write([]byte(return_hash + "\n"))
188 }
189
190 // IndexHandler
191 //     A HandleFunc to address /index and /index/{prefix} requests.
192 //
193 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
194         // Reject unauthorized requests.
195         if !IsDataManagerToken(GetApiToken(req)) {
196                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
197                 return
198         }
199
200         prefix := mux.Vars(req)["prefix"]
201
202         for _, vol := range KeepVM.AllReadable() {
203                 if err := vol.IndexTo(prefix, resp); err != nil {
204                         // The only errors returned by IndexTo are
205                         // write errors returned by resp.Write(),
206                         // which probably means the client has
207                         // disconnected and this error will never be
208                         // reported to the client -- but it will
209                         // appear in our own error log.
210                         http.Error(resp, err.Error(), http.StatusInternalServerError)
211                         return
212                 }
213         }
214 }
215
216 // StatusHandler
217 //     Responds to /status.json requests with the current node status,
218 //     described in a JSON structure.
219 //
220 //     The data given in a status.json response includes:
221 //        volumes - a list of Keep volumes currently in use by this server
222 //          each volume is an object with the following fields:
223 //            * mount_point
224 //            * device_num (an integer identifying the underlying filesystem)
225 //            * bytes_free
226 //            * bytes_used
227 //
228 type VolumeStatus struct {
229         MountPoint string `json:"mount_point"`
230         DeviceNum  uint64 `json:"device_num"`
231         BytesFree  uint64 `json:"bytes_free"`
232         BytesUsed  uint64 `json:"bytes_used"`
233 }
234
235 type NodeStatus struct {
236         Volumes []*VolumeStatus `json:"volumes"`
237 }
238
239 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
240         st := GetNodeStatus()
241         if jstat, err := json.Marshal(st); err == nil {
242                 resp.Write(jstat)
243         } else {
244                 log.Printf("json.Marshal: %s\n", err)
245                 log.Printf("NodeStatus = %v\n", st)
246                 http.Error(resp, err.Error(), 500)
247         }
248 }
249
250 // GetNodeStatus
251 //     Returns a NodeStatus struct describing this Keep
252 //     node's current status.
253 //
254 func GetNodeStatus() *NodeStatus {
255         st := new(NodeStatus)
256
257         st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
258         for i, vol := range KeepVM.AllReadable() {
259                 st.Volumes[i] = vol.Status()
260         }
261         return st
262 }
263
264 // GetVolumeStatus
265 //     Returns a VolumeStatus describing the requested volume.
266 //
267 func GetVolumeStatus(volume string) *VolumeStatus {
268         var fs syscall.Statfs_t
269         var devnum uint64
270
271         if fi, err := os.Stat(volume); err == nil {
272                 devnum = fi.Sys().(*syscall.Stat_t).Dev
273         } else {
274                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
275                 return nil
276         }
277
278         err := syscall.Statfs(volume, &fs)
279         if err != nil {
280                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
281                 return nil
282         }
283         // These calculations match the way df calculates disk usage:
284         // "free" space is measured by fs.Bavail, but "used" space
285         // uses fs.Blocks - fs.Bfree.
286         free := fs.Bavail * uint64(fs.Bsize)
287         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
288         return &VolumeStatus{volume, devnum, free, used}
289 }
290
291 // DeleteHandler processes DELETE requests.
292 //
293 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
294 // from all connected volumes.
295 //
296 // Only the Data Manager, or an Arvados admin with scope "all", are
297 // allowed to issue DELETE requests.  If a DELETE request is not
298 // authenticated or is issued by a non-admin user, the server returns
299 // a PermissionError.
300 //
301 // Upon receiving a valid request from an authorized user,
302 // DeleteHandler deletes all copies of the specified block on local
303 // writable volumes.
304 //
305 // Response format:
306 //
307 // If the requested blocks was not found on any volume, the response
308 // code is HTTP 404 Not Found.
309 //
310 // Otherwise, the response code is 200 OK, with a response body
311 // consisting of the JSON message
312 //
313 //    {"copies_deleted":d,"copies_failed":f}
314 //
315 // where d and f are integers representing the number of blocks that
316 // were successfully and unsuccessfully deleted.
317 //
318 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
319         hash := mux.Vars(req)["hash"]
320
321         // Confirm that this user is an admin and has a token with unlimited scope.
322         var tok = GetApiToken(req)
323         if tok == "" || !CanDelete(tok) {
324                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
325                 return
326         }
327
328         if never_delete {
329                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
330                 return
331         }
332
333         // Delete copies of this block from all available volumes.
334         // Report how many blocks were successfully deleted, and how
335         // many were found on writable volumes but not deleted.
336         var result struct {
337                 Deleted int `json:"copies_deleted"`
338                 Failed  int `json:"copies_failed"`
339         }
340         for _, vol := range KeepVM.AllWritable() {
341                 if err := vol.Delete(hash); err == nil {
342                         result.Deleted++
343                 } else if os.IsNotExist(err) {
344                         continue
345                 } else {
346                         result.Failed++
347                         log.Println("DeleteHandler:", err)
348                 }
349         }
350
351         var st int
352
353         if result.Deleted == 0 && result.Failed == 0 {
354                 st = http.StatusNotFound
355         } else {
356                 st = http.StatusOK
357         }
358
359         resp.WriteHeader(st)
360
361         if st == http.StatusOK {
362                 if body, err := json.Marshal(result); err == nil {
363                         resp.Write(body)
364                 } else {
365                         log.Printf("json.Marshal: %s (result = %v)\n", err, result)
366                         http.Error(resp, err.Error(), 500)
367                 }
368         }
369 }
370
371 /* PullHandler processes "PUT /pull" requests for the data manager.
372    The request body is a JSON message containing a list of pull
373    requests in the following format:
374
375    [
376       {
377          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
378          "servers":[
379                         "keep0.qr1hi.arvadosapi.com:25107",
380                         "keep1.qr1hi.arvadosapi.com:25108"
381                  ]
382           },
383           {
384                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
385                  "servers":[
386                         "10.0.1.5:25107",
387                         "10.0.1.6:25107",
388                         "10.0.1.7:25108"
389                  ]
390           },
391           ...
392    ]
393
394    Each pull request in the list consists of a block locator string
395    and an ordered list of servers.  Keepstore should try to fetch the
396    block from each server in turn.
397
398    If the request has not been sent by the Data Manager, return 401
399    Unauthorized.
400
401    If the JSON unmarshalling fails, return 400 Bad Request.
402 */
403
404 type PullRequest struct {
405         Locator string   `json:"locator"`
406         Servers []string `json:"servers"`
407 }
408
409 func PullHandler(resp http.ResponseWriter, req *http.Request) {
410         // Reject unauthorized requests.
411         if !IsDataManagerToken(GetApiToken(req)) {
412                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
413                 return
414         }
415
416         // Parse the request body.
417         var pr []PullRequest
418         r := json.NewDecoder(req.Body)
419         if err := r.Decode(&pr); err != nil {
420                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
421                 return
422         }
423
424         // We have a properly formatted pull list sent from the data
425         // manager.  Report success and send the list to the pull list
426         // manager for further handling.
427         resp.WriteHeader(http.StatusOK)
428         resp.Write([]byte(
429                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
430
431         plist := list.New()
432         for _, p := range pr {
433                 plist.PushBack(p)
434         }
435         pullq.ReplaceQueue(plist)
436 }
437
438 type TrashRequest struct {
439         Locator    string `json:"locator"`
440         BlockMtime int64  `json:"block_mtime"`
441 }
442
443 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
444         // Reject unauthorized requests.
445         if !IsDataManagerToken(GetApiToken(req)) {
446                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
447                 return
448         }
449
450         // Parse the request body.
451         var trash []TrashRequest
452         r := json.NewDecoder(req.Body)
453         if err := r.Decode(&trash); err != nil {
454                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
455                 return
456         }
457
458         // We have a properly formatted trash list sent from the data
459         // manager.  Report success and send the list to the trash work
460         // queue for further handling.
461         resp.WriteHeader(http.StatusOK)
462         resp.Write([]byte(
463                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
464
465         tlist := list.New()
466         for _, t := range trash {
467                 tlist.PushBack(t)
468         }
469         trashq.ReplaceQueue(tlist)
470 }
471
472 // ==============================
473 // GetBlock and PutBlock implement lower-level code for handling
474 // blocks by rooting through volumes connected to the local machine.
475 // Once the handler has determined that system policy permits the
476 // request, it calls these methods to perform the actual operation.
477 //
478 // TODO(twp): this code would probably be better located in the
479 // VolumeManager interface. As an abstraction, the VolumeManager
480 // should be the only part of the code that cares about which volume a
481 // block is stored on, so it should be responsible for figuring out
482 // which volume to check for fetching blocks, storing blocks, etc.
483
484 // ==============================
485 // GetBlock fetches and returns the block identified by "hash".  If
486 // the update_timestamp argument is true, GetBlock also updates the
487 // block's file modification time (for the sake of PutBlock, which
488 // must update the file's timestamp when the block already exists).
489 //
490 // On success, GetBlock returns a byte slice with the block data, and
491 // a nil error.
492 //
493 // If the block cannot be found on any volume, returns NotFoundError.
494 //
495 // If the block found does not have the correct MD5 hash, returns
496 // DiskHashError.
497 //
498
499 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
500         // Attempt to read the requested hash from a keep volume.
501         error_to_caller := NotFoundError
502
503         var vols []Volume
504         if update_timestamp {
505                 // Pointless to find the block on an unwritable volume
506                 // because Touch() will fail -- this is as good as
507                 // "not found" for purposes of callers who need to
508                 // update_timestamp.
509                 vols = KeepVM.AllWritable()
510         } else {
511                 vols = KeepVM.AllReadable()
512         }
513
514         for _, vol := range vols {
515                 buf, err := vol.Get(hash)
516                 if err != nil {
517                         // IsNotExist is an expected error and may be
518                         // ignored. All other errors are logged. In
519                         // any case we continue trying to read other
520                         // volumes. If all volumes report IsNotExist,
521                         // we return a NotFoundError.
522                         if !os.IsNotExist(err) {
523                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
524                         }
525                         continue
526                 }
527                 // Check the file checksum.
528                 //
529                 filehash := fmt.Sprintf("%x", md5.Sum(buf))
530                 if filehash != hash {
531                         // TODO: Try harder to tell a sysadmin about
532                         // this.
533                         log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
534                                 vol, hash, filehash)
535                         error_to_caller = DiskHashError
536                         continue
537                 }
538                 if error_to_caller == DiskHashError {
539                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
540                                 vol, hash)
541                 }
542                 if update_timestamp {
543                         if err := vol.Touch(hash); err != nil {
544                                 error_to_caller = GenericError
545                                 log.Printf("%s: Touch %s failed: %s",
546                                         vol, hash, error_to_caller)
547                                 continue
548                         }
549                 }
550                 return buf, nil
551         }
552         return nil, error_to_caller
553 }
554
555 /* PutBlock(block, hash)
556    Stores the BLOCK (identified by the content id HASH) in Keep.
557
558    The MD5 checksum of the block must be identical to the content id HASH.
559    If not, an error is returned.
560
561    PutBlock stores the BLOCK on the first Keep volume with free space.
562    A failure code is returned to the user only if all volumes fail.
563
564    On success, PutBlock returns nil.
565    On failure, it returns a KeepError with one of the following codes:
566
567    500 Collision
568           A different block with the same hash already exists on this
569           Keep server.
570    422 MD5Fail
571           The MD5 hash of the BLOCK does not match the argument HASH.
572    503 Full
573           There was not enough space left in any Keep volume to store
574           the object.
575    500 Fail
576           The object could not be stored for some other reason (e.g.
577           all writes failed). The text of the error message should
578           provide as much detail as possible.
579 */
580
581 func PutBlock(block []byte, hash string) error {
582         // Check that BLOCK's checksum matches HASH.
583         blockhash := fmt.Sprintf("%x", md5.Sum(block))
584         if blockhash != hash {
585                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
586                 return RequestHashError
587         }
588
589         // If we already have a block on disk under this identifier, return
590         // success (but check for MD5 collisions).  While fetching the block,
591         // update its timestamp.
592         // The only errors that GetBlock can return are DiskHashError and NotFoundError.
593         // In either case, we want to write our new (good) block to disk,
594         // so there is nothing special to do if err != nil.
595         //
596         if oldblock, err := GetBlock(hash, true); err == nil {
597                 if bytes.Compare(block, oldblock) == 0 {
598                         // The block already exists; return success.
599                         return nil
600                 } else {
601                         return CollisionError
602                 }
603         }
604
605         // Choose a Keep volume to write to.
606         // If this volume fails, try all of the volumes in order.
607         if vol := KeepVM.NextWritable(); vol != nil {
608                 if err := vol.Put(hash, block); err == nil {
609                         return nil // success!
610                 }
611         }
612
613         writables := KeepVM.AllWritable()
614         if len(writables) == 0 {
615                 log.Print("No writable volumes.")
616                 return FullError
617         }
618
619         allFull := true
620         for _, vol := range writables {
621                 err := vol.Put(hash, block)
622                 if err == nil {
623                         return nil // success!
624                 }
625                 if err != FullError {
626                         // The volume is not full but the
627                         // write did not succeed.  Report the
628                         // error and continue trying.
629                         allFull = false
630                         log.Printf("%s: Write(%s): %s\n", vol, hash, err)
631                 }
632         }
633
634         if allFull {
635                 log.Print("All volumes are full.")
636                 return FullError
637         } else {
638                 // Already logged the non-full errors.
639                 return GenericError
640         }
641 }
642
643 // IsValidLocator
644 //     Return true if the specified string is a valid Keep locator.
645 //     When Keep is extended to support hash types other than MD5,
646 //     this should be updated to cover those as well.
647 //
648 func IsValidLocator(loc string) bool {
649         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
650         if err == nil {
651                 return match
652         }
653         log.Printf("IsValidLocator: %s\n", err)
654         return false
655 }
656
657 // GetApiToken returns the OAuth2 token from the Authorization
658 // header of a HTTP request, or an empty string if no matching
659 // token is found.
660 func GetApiToken(req *http.Request) string {
661         if auth, ok := req.Header["Authorization"]; ok {
662                 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
663                         log.Println(err)
664                 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
665                         return match[1]
666                 }
667         }
668         return ""
669 }
670
671 // IsExpired returns true if the given Unix timestamp (expressed as a
672 // hexadecimal string) is in the past, or if timestamp_hex cannot be
673 // parsed as a hexadecimal string.
674 func IsExpired(timestamp_hex string) bool {
675         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
676         if err != nil {
677                 log.Printf("IsExpired: %s\n", err)
678                 return true
679         }
680         return time.Unix(ts, 0).Before(time.Now())
681 }
682
683 // CanDelete returns true if the user identified by api_token is
684 // allowed to delete blocks.
685 func CanDelete(api_token string) bool {
686         if api_token == "" {
687                 return false
688         }
689         // Blocks may be deleted only when Keep has been configured with a
690         // data manager.
691         if IsDataManagerToken(api_token) {
692                 return true
693         }
694         // TODO(twp): look up api_token with the API server
695         // return true if is_admin is true and if the token
696         // has unlimited scope
697         return false
698 }
699
700 // IsDataManagerToken returns true if api_token represents the data
701 // manager's token.
702 func IsDataManagerToken(api_token string) bool {
703         return data_manager_token != "" && api_token == data_manager_token
704 }