integration_performance tests: add comment describing how to run the tests; also...
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "bytes"
12         "container/list"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "runtime"
23         "strconv"
24         "strings"
25         "syscall"
26         "time"
27 )
28
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
31 //
32 func MakeRESTRouter() *mux.Router {
33         rest := mux.NewRouter()
34
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37         rest.HandleFunc(
38                 `/{hash:[0-9a-f]{32}}+{hints}`,
39                 GetBlockHandler).Methods("GET", "HEAD")
40
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
43         // List all blocks stored here. Privileged client only.
44         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
45         // List blocks stored here whose hash has the given prefix.
46         // Privileged client only.
47         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
48
49         // List volumes: path, device number, bytes used/avail.
50         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
51
52         // Replace the current pull queue.
53         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
54
55         // Replace the current trash queue.
56         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
57
58         // Any request which does not match any of these routes gets
59         // 400 Bad Request.
60         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
61
62         return rest
63 }
64
65 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
66         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
67 }
68
69 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
70         hash := mux.Vars(req)["hash"]
71
72         hints := mux.Vars(req)["hints"]
73
74         // Parse the locator string and hints from the request.
75         // TODO(twp): implement a Locator type.
76         var signature, timestamp string
77         if hints != "" {
78                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
79                 for _, hint := range strings.Split(hints, "+") {
80                         if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
81                                 // Server ignores size hints
82                         } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
83                                 signature = m[1]
84                                 timestamp = m[2]
85                         } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
86                                 // Any unknown hint that starts with an uppercase letter is
87                                 // presumed to be valid and ignored, to permit forward compatibility.
88                         } else {
89                                 // Unknown format; not a valid locator.
90                                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
91                                 return
92                         }
93                 }
94         }
95
96         // If permission checking is in effect, verify this
97         // request's permission signature.
98         if enforce_permissions {
99                 if signature == "" || timestamp == "" {
100                         http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
101                         return
102                 } else if IsExpired(timestamp) {
103                         http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
104                         return
105                 } else {
106                         req_locator := req.URL.Path[1:] // strip leading slash
107                         if !VerifySignature(req_locator, GetApiToken(req)) {
108                                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
109                                 return
110                         }
111                 }
112         }
113
114         block, err := GetBlock(hash, false)
115         if err != nil {
116                 // This type assertion is safe because the only errors
117                 // GetBlock can return are DiskHashError or NotFoundError.
118                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
119                 return
120         }
121         defer bufs.Put(block)
122
123         resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
124         resp.Header().Set("Content-Type", "application/octet-stream")
125         resp.Write(block)
126 }
127
128 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
129         // Garbage collect after each PUT. Fixes #2865.
130         // See also GetBlockHandler.
131         defer runtime.GC()
132
133         hash := mux.Vars(req)["hash"]
134
135         // Detect as many error conditions as possible before reading
136         // the body: avoid transmitting data that will not end up
137         // being written anyway.
138
139         if req.ContentLength == -1 {
140                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
141                 return
142         }
143
144         if req.ContentLength > BLOCKSIZE {
145                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
146                 return
147         }
148
149         if len(KeepVM.AllWritable()) == 0 {
150                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
151                 return
152         }
153
154         buf := bufs.Get(int(req.ContentLength))
155         _, err := io.ReadFull(req.Body, buf)
156         if err != nil {
157                 http.Error(resp, err.Error(), 500)
158                 bufs.Put(buf)
159                 return
160         }
161
162         err = PutBlock(buf, hash)
163         bufs.Put(buf)
164
165         if err != nil {
166                 ke := err.(*KeepError)
167                 http.Error(resp, ke.Error(), ke.HTTPCode)
168                 return
169         }
170
171         // Success; add a size hint, sign the locator if possible, and
172         // return it to the client.
173         return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
174         api_token := GetApiToken(req)
175         if PermissionSecret != nil && api_token != "" {
176                 expiry := time.Now().Add(blob_signature_ttl)
177                 return_hash = SignLocator(return_hash, api_token, expiry)
178         }
179         resp.Write([]byte(return_hash + "\n"))
180 }
181
182 // IndexHandler
183 //     A HandleFunc to address /index and /index/{prefix} requests.
184 //
185 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
186         // Reject unauthorized requests.
187         if !IsDataManagerToken(GetApiToken(req)) {
188                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
189                 return
190         }
191
192         prefix := mux.Vars(req)["prefix"]
193
194         for _, vol := range KeepVM.AllReadable() {
195                 if err := vol.IndexTo(prefix, resp); err != nil {
196                         // The only errors returned by IndexTo are
197                         // write errors returned by resp.Write(),
198                         // which probably means the client has
199                         // disconnected and this error will never be
200                         // reported to the client -- but it will
201                         // appear in our own error log.
202                         http.Error(resp, err.Error(), http.StatusInternalServerError)
203                         return
204                 }
205         }
206 }
207
208 // StatusHandler
209 //     Responds to /status.json requests with the current node status,
210 //     described in a JSON structure.
211 //
212 //     The data given in a status.json response includes:
213 //        volumes - a list of Keep volumes currently in use by this server
214 //          each volume is an object with the following fields:
215 //            * mount_point
216 //            * device_num (an integer identifying the underlying filesystem)
217 //            * bytes_free
218 //            * bytes_used
219 //
220 type VolumeStatus struct {
221         MountPoint string `json:"mount_point"`
222         DeviceNum  uint64 `json:"device_num"`
223         BytesFree  uint64 `json:"bytes_free"`
224         BytesUsed  uint64 `json:"bytes_used"`
225 }
226
227 type NodeStatus struct {
228         Volumes []*VolumeStatus `json:"volumes"`
229 }
230
231 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
232         st := GetNodeStatus()
233         if jstat, err := json.Marshal(st); err == nil {
234                 resp.Write(jstat)
235         } else {
236                 log.Printf("json.Marshal: %s\n", err)
237                 log.Printf("NodeStatus = %v\n", st)
238                 http.Error(resp, err.Error(), 500)
239         }
240 }
241
242 // GetNodeStatus
243 //     Returns a NodeStatus struct describing this Keep
244 //     node's current status.
245 //
246 func GetNodeStatus() *NodeStatus {
247         st := new(NodeStatus)
248
249         st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
250         for i, vol := range KeepVM.AllReadable() {
251                 st.Volumes[i] = vol.Status()
252         }
253         return st
254 }
255
256 // GetVolumeStatus
257 //     Returns a VolumeStatus describing the requested volume.
258 //
259 func GetVolumeStatus(volume string) *VolumeStatus {
260         var fs syscall.Statfs_t
261         var devnum uint64
262
263         if fi, err := os.Stat(volume); err == nil {
264                 devnum = fi.Sys().(*syscall.Stat_t).Dev
265         } else {
266                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
267                 return nil
268         }
269
270         err := syscall.Statfs(volume, &fs)
271         if err != nil {
272                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
273                 return nil
274         }
275         // These calculations match the way df calculates disk usage:
276         // "free" space is measured by fs.Bavail, but "used" space
277         // uses fs.Blocks - fs.Bfree.
278         free := fs.Bavail * uint64(fs.Bsize)
279         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
280         return &VolumeStatus{volume, devnum, free, used}
281 }
282
283 // DeleteHandler processes DELETE requests.
284 //
285 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
286 // from all connected volumes.
287 //
288 // Only the Data Manager, or an Arvados admin with scope "all", are
289 // allowed to issue DELETE requests.  If a DELETE request is not
290 // authenticated or is issued by a non-admin user, the server returns
291 // a PermissionError.
292 //
293 // Upon receiving a valid request from an authorized user,
294 // DeleteHandler deletes all copies of the specified block on local
295 // writable volumes.
296 //
297 // Response format:
298 //
299 // If the requested blocks was not found on any volume, the response
300 // code is HTTP 404 Not Found.
301 //
302 // Otherwise, the response code is 200 OK, with a response body
303 // consisting of the JSON message
304 //
305 //    {"copies_deleted":d,"copies_failed":f}
306 //
307 // where d and f are integers representing the number of blocks that
308 // were successfully and unsuccessfully deleted.
309 //
310 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
311         hash := mux.Vars(req)["hash"]
312
313         // Confirm that this user is an admin and has a token with unlimited scope.
314         var tok = GetApiToken(req)
315         if tok == "" || !CanDelete(tok) {
316                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
317                 return
318         }
319
320         if never_delete {
321                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
322                 return
323         }
324
325         // Delete copies of this block from all available volumes.
326         // Report how many blocks were successfully deleted, and how
327         // many were found on writable volumes but not deleted.
328         var result struct {
329                 Deleted int `json:"copies_deleted"`
330                 Failed  int `json:"copies_failed"`
331         }
332         for _, vol := range KeepVM.AllWritable() {
333                 if err := vol.Delete(hash); err == nil {
334                         result.Deleted++
335                 } else if os.IsNotExist(err) {
336                         continue
337                 } else {
338                         result.Failed++
339                         log.Println("DeleteHandler:", err)
340                 }
341         }
342
343         var st int
344
345         if result.Deleted == 0 && result.Failed == 0 {
346                 st = http.StatusNotFound
347         } else {
348                 st = http.StatusOK
349         }
350
351         resp.WriteHeader(st)
352
353         if st == http.StatusOK {
354                 if body, err := json.Marshal(result); err == nil {
355                         resp.Write(body)
356                 } else {
357                         log.Printf("json.Marshal: %s (result = %v)\n", err, result)
358                         http.Error(resp, err.Error(), 500)
359                 }
360         }
361 }
362
363 /* PullHandler processes "PUT /pull" requests for the data manager.
364    The request body is a JSON message containing a list of pull
365    requests in the following format:
366
367    [
368       {
369          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
370          "servers":[
371                         "keep0.qr1hi.arvadosapi.com:25107",
372                         "keep1.qr1hi.arvadosapi.com:25108"
373                  ]
374           },
375           {
376                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
377                  "servers":[
378                         "10.0.1.5:25107",
379                         "10.0.1.6:25107",
380                         "10.0.1.7:25108"
381                  ]
382           },
383           ...
384    ]
385
386    Each pull request in the list consists of a block locator string
387    and an ordered list of servers.  Keepstore should try to fetch the
388    block from each server in turn.
389
390    If the request has not been sent by the Data Manager, return 401
391    Unauthorized.
392
393    If the JSON unmarshalling fails, return 400 Bad Request.
394 */
395
396 type PullRequest struct {
397         Locator string   `json:"locator"`
398         Servers []string `json:"servers"`
399 }
400
401 func PullHandler(resp http.ResponseWriter, req *http.Request) {
402         // Reject unauthorized requests.
403         if !IsDataManagerToken(GetApiToken(req)) {
404                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
405                 return
406         }
407
408         // Parse the request body.
409         var pr []PullRequest
410         r := json.NewDecoder(req.Body)
411         if err := r.Decode(&pr); err != nil {
412                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
413                 return
414         }
415
416         // We have a properly formatted pull list sent from the data
417         // manager.  Report success and send the list to the pull list
418         // manager for further handling.
419         resp.WriteHeader(http.StatusOK)
420         resp.Write([]byte(
421                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
422
423         plist := list.New()
424         for _, p := range pr {
425                 plist.PushBack(p)
426         }
427         pullq.ReplaceQueue(plist)
428 }
429
430 type TrashRequest struct {
431         Locator    string `json:"locator"`
432         BlockMtime int64  `json:"block_mtime"`
433 }
434
435 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
436         // Reject unauthorized requests.
437         if !IsDataManagerToken(GetApiToken(req)) {
438                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
439                 return
440         }
441
442         // Parse the request body.
443         var trash []TrashRequest
444         r := json.NewDecoder(req.Body)
445         if err := r.Decode(&trash); err != nil {
446                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
447                 return
448         }
449
450         // We have a properly formatted trash list sent from the data
451         // manager.  Report success and send the list to the trash work
452         // queue for further handling.
453         resp.WriteHeader(http.StatusOK)
454         resp.Write([]byte(
455                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
456
457         tlist := list.New()
458         for _, t := range trash {
459                 tlist.PushBack(t)
460         }
461         trashq.ReplaceQueue(tlist)
462 }
463
464 // ==============================
465 // GetBlock and PutBlock implement lower-level code for handling
466 // blocks by rooting through volumes connected to the local machine.
467 // Once the handler has determined that system policy permits the
468 // request, it calls these methods to perform the actual operation.
469 //
470 // TODO(twp): this code would probably be better located in the
471 // VolumeManager interface. As an abstraction, the VolumeManager
472 // should be the only part of the code that cares about which volume a
473 // block is stored on, so it should be responsible for figuring out
474 // which volume to check for fetching blocks, storing blocks, etc.
475
476 // ==============================
477 // GetBlock fetches and returns the block identified by "hash".  If
478 // the update_timestamp argument is true, GetBlock also updates the
479 // block's file modification time (for the sake of PutBlock, which
480 // must update the file's timestamp when the block already exists).
481 //
482 // On success, GetBlock returns a byte slice with the block data, and
483 // a nil error.
484 //
485 // If the block cannot be found on any volume, returns NotFoundError.
486 //
487 // If the block found does not have the correct MD5 hash, returns
488 // DiskHashError.
489 //
490
491 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
492         // Attempt to read the requested hash from a keep volume.
493         error_to_caller := NotFoundError
494
495         var vols []Volume
496         if update_timestamp {
497                 // Pointless to find the block on an unwritable volume
498                 // because Touch() will fail -- this is as good as
499                 // "not found" for purposes of callers who need to
500                 // update_timestamp.
501                 vols = KeepVM.AllWritable()
502         } else {
503                 vols = KeepVM.AllReadable()
504         }
505
506         for _, vol := range vols {
507                 buf, err := vol.Get(hash)
508                 if err != nil {
509                         // IsNotExist is an expected error and may be
510                         // ignored. All other errors are logged. In
511                         // any case we continue trying to read other
512                         // volumes. If all volumes report IsNotExist,
513                         // we return a NotFoundError.
514                         if !os.IsNotExist(err) {
515                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
516                         }
517                         continue
518                 }
519                 // Check the file checksum.
520                 //
521                 filehash := fmt.Sprintf("%x", md5.Sum(buf))
522                 if filehash != hash {
523                         // TODO: Try harder to tell a sysadmin about
524                         // this.
525                         log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
526                                 vol, hash, filehash)
527                         error_to_caller = DiskHashError
528                         bufs.Put(buf)
529                         continue
530                 }
531                 if error_to_caller == DiskHashError {
532                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
533                                 vol, hash)
534                 }
535                 if update_timestamp {
536                         if err := vol.Touch(hash); err != nil {
537                                 error_to_caller = GenericError
538                                 log.Printf("%s: Touch %s failed: %s",
539                                         vol, hash, error_to_caller)
540                                 bufs.Put(buf)
541                                 continue
542                         }
543                 }
544                 return buf, nil
545         }
546         return nil, error_to_caller
547 }
548
549 /* PutBlock(block, hash)
550    Stores the BLOCK (identified by the content id HASH) in Keep.
551
552    The MD5 checksum of the block must be identical to the content id HASH.
553    If not, an error is returned.
554
555    PutBlock stores the BLOCK on the first Keep volume with free space.
556    A failure code is returned to the user only if all volumes fail.
557
558    On success, PutBlock returns nil.
559    On failure, it returns a KeepError with one of the following codes:
560
561    500 Collision
562           A different block with the same hash already exists on this
563           Keep server.
564    422 MD5Fail
565           The MD5 hash of the BLOCK does not match the argument HASH.
566    503 Full
567           There was not enough space left in any Keep volume to store
568           the object.
569    500 Fail
570           The object could not be stored for some other reason (e.g.
571           all writes failed). The text of the error message should
572           provide as much detail as possible.
573 */
574
575 func PutBlock(block []byte, hash string) error {
576         // Check that BLOCK's checksum matches HASH.
577         blockhash := fmt.Sprintf("%x", md5.Sum(block))
578         if blockhash != hash {
579                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
580                 return RequestHashError
581         }
582
583         // If we already have a block on disk under this identifier, return
584         // success (but check for MD5 collisions).  While fetching the block,
585         // update its timestamp.
586         // The only errors that GetBlock can return are DiskHashError and NotFoundError.
587         // In either case, we want to write our new (good) block to disk,
588         // so there is nothing special to do if err != nil.
589         //
590         if oldblock, err := GetBlock(hash, true); err == nil {
591                 defer bufs.Put(oldblock)
592                 if bytes.Compare(block, oldblock) == 0 {
593                         // The block already exists; return success.
594                         return nil
595                 } else {
596                         return CollisionError
597                 }
598         }
599
600         // Choose a Keep volume to write to.
601         // If this volume fails, try all of the volumes in order.
602         if vol := KeepVM.NextWritable(); vol != nil {
603                 if err := vol.Put(hash, block); err == nil {
604                         return nil // success!
605                 }
606         }
607
608         writables := KeepVM.AllWritable()
609         if len(writables) == 0 {
610                 log.Print("No writable volumes.")
611                 return FullError
612         }
613
614         allFull := true
615         for _, vol := range writables {
616                 err := vol.Put(hash, block)
617                 if err == nil {
618                         return nil // success!
619                 }
620                 if err != FullError {
621                         // The volume is not full but the
622                         // write did not succeed.  Report the
623                         // error and continue trying.
624                         allFull = false
625                         log.Printf("%s: Write(%s): %s\n", vol, hash, err)
626                 }
627         }
628
629         if allFull {
630                 log.Print("All volumes are full.")
631                 return FullError
632         } else {
633                 // Already logged the non-full errors.
634                 return GenericError
635         }
636 }
637
638 // IsValidLocator
639 //     Return true if the specified string is a valid Keep locator.
640 //     When Keep is extended to support hash types other than MD5,
641 //     this should be updated to cover those as well.
642 //
643 func IsValidLocator(loc string) bool {
644         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
645         if err == nil {
646                 return match
647         }
648         log.Printf("IsValidLocator: %s\n", err)
649         return false
650 }
651
652 // GetApiToken returns the OAuth2 token from the Authorization
653 // header of a HTTP request, or an empty string if no matching
654 // token is found.
655 func GetApiToken(req *http.Request) string {
656         if auth, ok := req.Header["Authorization"]; ok {
657                 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
658                         log.Println(err)
659                 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
660                         return match[1]
661                 }
662         }
663         return ""
664 }
665
666 // IsExpired returns true if the given Unix timestamp (expressed as a
667 // hexadecimal string) is in the past, or if timestamp_hex cannot be
668 // parsed as a hexadecimal string.
669 func IsExpired(timestamp_hex string) bool {
670         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
671         if err != nil {
672                 log.Printf("IsExpired: %s\n", err)
673                 return true
674         }
675         return time.Unix(ts, 0).Before(time.Now())
676 }
677
678 // CanDelete returns true if the user identified by api_token is
679 // allowed to delete blocks.
680 func CanDelete(api_token string) bool {
681         if api_token == "" {
682                 return false
683         }
684         // Blocks may be deleted only when Keep has been configured with a
685         // data manager.
686         if IsDataManagerToken(api_token) {
687                 return true
688         }
689         // TODO(twp): look up api_token with the API server
690         // return true if is_admin is true and if the token
691         // has unlimited scope
692         return false
693 }
694
695 // IsDataManagerToken returns true if api_token represents the data
696 // manager's token.
697 func IsDataManagerToken(api_token string) bool {
698         return data_manager_token != "" && api_token == data_manager_token
699 }