Return error if Content-Length header is missing instead of panicing.
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "bytes"
12         "container/list"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "runtime"
23         "strconv"
24         "strings"
25         "syscall"
26         "time"
27 )
28
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
31 //
32 func MakeRESTRouter() *mux.Router {
33         rest := mux.NewRouter()
34
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37         rest.HandleFunc(
38                 `/{hash:[0-9a-f]{32}}+{hints}`,
39                 GetBlockHandler).Methods("GET", "HEAD")
40
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
43
44         // For IndexHandler we support:
45         //   /index           - returns all locators
46         //   /index/{prefix}  - returns all locators that begin with {prefix}
47         //      {prefix} is a string of hexadecimal digits between 0 and 32 digits.
48         //      If {prefix} is the empty string, return an index of all locators
49         //      (so /index and /index/ behave identically)
50         //      A client may supply a full 32-digit locator string, in which
51         //      case the server will return an index with either zero or one
52         //      entries. This usage allows a client to check whether a block is
53         //      present, and its size and upload time, without retrieving the
54         //      entire block.
55         //
56         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
57         rest.HandleFunc(
58                 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
59         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
60
61         // The PullHandler and TrashHandler process "PUT /pull" and "PUT
62         // /trash" requests from Data Manager.  These requests instruct
63         // Keep to replicate or delete blocks; see
64         // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
65         // for more details.
66         //
67         // Each handler parses the JSON list of block management requests
68         // in the message body, and replaces any existing pull queue or
69         // trash queue with their contentes.
70         //
71         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
72         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
73
74         // Any request which does not match any of these routes gets
75         // 400 Bad Request.
76         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
77
78         return rest
79 }
80
81 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
82         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
83 }
84
85 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
86         hash := mux.Vars(req)["hash"]
87
88         hints := mux.Vars(req)["hints"]
89
90         // Parse the locator string and hints from the request.
91         // TODO(twp): implement a Locator type.
92         var signature, timestamp string
93         if hints != "" {
94                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
95                 for _, hint := range strings.Split(hints, "+") {
96                         if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
97                                 // Server ignores size hints
98                         } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
99                                 signature = m[1]
100                                 timestamp = m[2]
101                         } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
102                                 // Any unknown hint that starts with an uppercase letter is
103                                 // presumed to be valid and ignored, to permit forward compatibility.
104                         } else {
105                                 // Unknown format; not a valid locator.
106                                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
107                                 return
108                         }
109                 }
110         }
111
112         // If permission checking is in effect, verify this
113         // request's permission signature.
114         if enforce_permissions {
115                 if signature == "" || timestamp == "" {
116                         http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
117                         return
118                 } else if IsExpired(timestamp) {
119                         http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
120                         return
121                 } else {
122                         req_locator := req.URL.Path[1:] // strip leading slash
123                         if !VerifySignature(req_locator, GetApiToken(req)) {
124                                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
125                                 return
126                         }
127                 }
128         }
129
130         block, err := GetBlock(hash, false)
131
132         // Garbage collect after each GET. Fixes #2865.
133         // TODO(twp): review Keep memory usage and see if there's
134         // a better way to do this than blindly garbage collecting
135         // after every block.
136         defer runtime.GC()
137
138         if err != nil {
139                 // This type assertion is safe because the only errors
140                 // GetBlock can return are DiskHashError or NotFoundError.
141                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
142                 return
143         }
144
145         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
146
147         _, err = resp.Write(block)
148
149         return
150 }
151
152 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
153         // Garbage collect after each PUT. Fixes #2865.
154         // See also GetBlockHandler.
155         defer runtime.GC()
156
157         hash := mux.Vars(req)["hash"]
158
159         // Detect as many error conditions as possible before reading
160         // the body: avoid transmitting data that will not end up
161         // being written anyway.
162
163         if req.ContentLength == -1 {
164                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
165                 return
166         }
167
168         if req.ContentLength > BLOCKSIZE {
169                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
170                 return
171         }
172
173         if len(KeepVM.AllWritable()) == 0 {
174                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
175                 return
176         }
177
178         buf := make([]byte, req.ContentLength)
179         nread, err := io.ReadFull(req.Body, buf)
180         if err != nil {
181                 http.Error(resp, err.Error(), 500)
182                 return
183         } else if int64(nread) < req.ContentLength {
184                 http.Error(resp, "request truncated", 500)
185                 return
186         }
187
188         err = PutBlock(buf, hash)
189         if err != nil {
190                 ke := err.(*KeepError)
191                 http.Error(resp, ke.Error(), ke.HTTPCode)
192                 return
193         }
194
195         // Success; add a size hint, sign the locator if possible, and
196         // return it to the client.
197         return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
198         api_token := GetApiToken(req)
199         if PermissionSecret != nil && api_token != "" {
200                 expiry := time.Now().Add(permission_ttl)
201                 return_hash = SignLocator(return_hash, api_token, expiry)
202         }
203         resp.Write([]byte(return_hash + "\n"))
204 }
205
206 // IndexHandler
207 //     A HandleFunc to address /index and /index/{prefix} requests.
208 //
209 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
210         // Reject unauthorized requests.
211         if !IsDataManagerToken(GetApiToken(req)) {
212                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
213                 return
214         }
215
216         prefix := mux.Vars(req)["prefix"]
217
218         var index string
219         for _, vol := range KeepVM.AllReadable() {
220                 index = index + vol.Index(prefix)
221         }
222         resp.Write([]byte(index))
223 }
224
225 // StatusHandler
226 //     Responds to /status.json requests with the current node status,
227 //     described in a JSON structure.
228 //
229 //     The data given in a status.json response includes:
230 //        volumes - a list of Keep volumes currently in use by this server
231 //          each volume is an object with the following fields:
232 //            * mount_point
233 //            * device_num (an integer identifying the underlying filesystem)
234 //            * bytes_free
235 //            * bytes_used
236 //
237 type VolumeStatus struct {
238         MountPoint string `json:"mount_point"`
239         DeviceNum  uint64 `json:"device_num"`
240         BytesFree  uint64 `json:"bytes_free"`
241         BytesUsed  uint64 `json:"bytes_used"`
242 }
243
244 type NodeStatus struct {
245         Volumes []*VolumeStatus `json:"volumes"`
246 }
247
248 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
249         st := GetNodeStatus()
250         if jstat, err := json.Marshal(st); err == nil {
251                 resp.Write(jstat)
252         } else {
253                 log.Printf("json.Marshal: %s\n", err)
254                 log.Printf("NodeStatus = %v\n", st)
255                 http.Error(resp, err.Error(), 500)
256         }
257 }
258
259 // GetNodeStatus
260 //     Returns a NodeStatus struct describing this Keep
261 //     node's current status.
262 //
263 func GetNodeStatus() *NodeStatus {
264         st := new(NodeStatus)
265
266         st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
267         for i, vol := range KeepVM.AllReadable() {
268                 st.Volumes[i] = vol.Status()
269         }
270         return st
271 }
272
273 // GetVolumeStatus
274 //     Returns a VolumeStatus describing the requested volume.
275 //
276 func GetVolumeStatus(volume string) *VolumeStatus {
277         var fs syscall.Statfs_t
278         var devnum uint64
279
280         if fi, err := os.Stat(volume); err == nil {
281                 devnum = fi.Sys().(*syscall.Stat_t).Dev
282         } else {
283                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
284                 return nil
285         }
286
287         err := syscall.Statfs(volume, &fs)
288         if err != nil {
289                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
290                 return nil
291         }
292         // These calculations match the way df calculates disk usage:
293         // "free" space is measured by fs.Bavail, but "used" space
294         // uses fs.Blocks - fs.Bfree.
295         free := fs.Bavail * uint64(fs.Bsize)
296         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
297         return &VolumeStatus{volume, devnum, free, used}
298 }
299
300 // DeleteHandler processes DELETE requests.
301 //
302 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
303 // from all connected volumes.
304 //
305 // Only the Data Manager, or an Arvados admin with scope "all", are
306 // allowed to issue DELETE requests.  If a DELETE request is not
307 // authenticated or is issued by a non-admin user, the server returns
308 // a PermissionError.
309 //
310 // Upon receiving a valid request from an authorized user,
311 // DeleteHandler deletes all copies of the specified block on local
312 // writable volumes.
313 //
314 // Response format:
315 //
316 // If the requested blocks was not found on any volume, the response
317 // code is HTTP 404 Not Found.
318 //
319 // Otherwise, the response code is 200 OK, with a response body
320 // consisting of the JSON message
321 //
322 //    {"copies_deleted":d,"copies_failed":f}
323 //
324 // where d and f are integers representing the number of blocks that
325 // were successfully and unsuccessfully deleted.
326 //
327 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
328         hash := mux.Vars(req)["hash"]
329
330         // Confirm that this user is an admin and has a token with unlimited scope.
331         var tok = GetApiToken(req)
332         if tok == "" || !CanDelete(tok) {
333                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
334                 return
335         }
336
337         if never_delete {
338                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
339                 return
340         }
341
342         // Delete copies of this block from all available volumes.
343         // Report how many blocks were successfully deleted, and how
344         // many were found on writable volumes but not deleted.
345         var result struct {
346                 Deleted int `json:"copies_deleted"`
347                 Failed  int `json:"copies_failed"`
348         }
349         for _, vol := range KeepVM.AllWritable() {
350                 if err := vol.Delete(hash); err == nil {
351                         result.Deleted++
352                 } else if os.IsNotExist(err) {
353                         continue
354                 } else {
355                         result.Failed++
356                         log.Println("DeleteHandler:", err)
357                 }
358         }
359
360         var st int
361
362         if result.Deleted == 0 && result.Failed == 0 {
363                 st = http.StatusNotFound
364         } else {
365                 st = http.StatusOK
366         }
367
368         resp.WriteHeader(st)
369
370         if st == http.StatusOK {
371                 if body, err := json.Marshal(result); err == nil {
372                         resp.Write(body)
373                 } else {
374                         log.Printf("json.Marshal: %s (result = %v)\n", err, result)
375                         http.Error(resp, err.Error(), 500)
376                 }
377         }
378 }
379
380 /* PullHandler processes "PUT /pull" requests for the data manager.
381    The request body is a JSON message containing a list of pull
382    requests in the following format:
383
384    [
385       {
386          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
387          "servers":[
388                         "keep0.qr1hi.arvadosapi.com:25107",
389                         "keep1.qr1hi.arvadosapi.com:25108"
390                  ]
391           },
392           {
393                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
394                  "servers":[
395                         "10.0.1.5:25107",
396                         "10.0.1.6:25107",
397                         "10.0.1.7:25108"
398                  ]
399           },
400           ...
401    ]
402
403    Each pull request in the list consists of a block locator string
404    and an ordered list of servers.  Keepstore should try to fetch the
405    block from each server in turn.
406
407    If the request has not been sent by the Data Manager, return 401
408    Unauthorized.
409
410    If the JSON unmarshalling fails, return 400 Bad Request.
411 */
412
413 type PullRequest struct {
414         Locator string   `json:"locator"`
415         Servers []string `json:"servers"`
416 }
417
418 func PullHandler(resp http.ResponseWriter, req *http.Request) {
419         // Reject unauthorized requests.
420         if !IsDataManagerToken(GetApiToken(req)) {
421                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
422                 return
423         }
424
425         // Parse the request body.
426         var pr []PullRequest
427         r := json.NewDecoder(req.Body)
428         if err := r.Decode(&pr); err != nil {
429                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
430                 return
431         }
432
433         // We have a properly formatted pull list sent from the data
434         // manager.  Report success and send the list to the pull list
435         // manager for further handling.
436         resp.WriteHeader(http.StatusOK)
437         resp.Write([]byte(
438                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
439
440         plist := list.New()
441         for _, p := range pr {
442                 plist.PushBack(p)
443         }
444         pullq.ReplaceQueue(plist)
445 }
446
447 type TrashRequest struct {
448         Locator    string `json:"locator"`
449         BlockMtime int64  `json:"block_mtime"`
450 }
451
452 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
453         // Reject unauthorized requests.
454         if !IsDataManagerToken(GetApiToken(req)) {
455                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
456                 return
457         }
458
459         // Parse the request body.
460         var trash []TrashRequest
461         r := json.NewDecoder(req.Body)
462         if err := r.Decode(&trash); err != nil {
463                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
464                 return
465         }
466
467         // We have a properly formatted trash list sent from the data
468         // manager.  Report success and send the list to the trash work
469         // queue for further handling.
470         resp.WriteHeader(http.StatusOK)
471         resp.Write([]byte(
472                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
473
474         tlist := list.New()
475         for _, t := range trash {
476                 tlist.PushBack(t)
477         }
478         trashq.ReplaceQueue(tlist)
479 }
480
481 // ==============================
482 // GetBlock and PutBlock implement lower-level code for handling
483 // blocks by rooting through volumes connected to the local machine.
484 // Once the handler has determined that system policy permits the
485 // request, it calls these methods to perform the actual operation.
486 //
487 // TODO(twp): this code would probably be better located in the
488 // VolumeManager interface. As an abstraction, the VolumeManager
489 // should be the only part of the code that cares about which volume a
490 // block is stored on, so it should be responsible for figuring out
491 // which volume to check for fetching blocks, storing blocks, etc.
492
493 // ==============================
494 // GetBlock fetches and returns the block identified by "hash".  If
495 // the update_timestamp argument is true, GetBlock also updates the
496 // block's file modification time (for the sake of PutBlock, which
497 // must update the file's timestamp when the block already exists).
498 //
499 // On success, GetBlock returns a byte slice with the block data, and
500 // a nil error.
501 //
502 // If the block cannot be found on any volume, returns NotFoundError.
503 //
504 // If the block found does not have the correct MD5 hash, returns
505 // DiskHashError.
506 //
507
508 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
509         // Attempt to read the requested hash from a keep volume.
510         error_to_caller := NotFoundError
511
512         var vols []Volume
513         if update_timestamp {
514                 // Pointless to find the block on an unwritable volume
515                 // because Touch() will fail -- this is as good as
516                 // "not found" for purposes of callers who need to
517                 // update_timestamp.
518                 vols = KeepVM.AllWritable()
519         } else {
520                 vols = KeepVM.AllReadable()
521         }
522
523         for _, vol := range vols {
524                 buf, err := vol.Get(hash)
525                 if err != nil {
526                         // IsNotExist is an expected error and may be
527                         // ignored. All other errors are logged. In
528                         // any case we continue trying to read other
529                         // volumes. If all volumes report IsNotExist,
530                         // we return a NotFoundError.
531                         if !os.IsNotExist(err) {
532                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
533                         }
534                         continue
535                 }
536                 // Check the file checksum.
537                 //
538                 filehash := fmt.Sprintf("%x", md5.Sum(buf))
539                 if filehash != hash {
540                         // TODO: Try harder to tell a sysadmin about
541                         // this.
542                         log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
543                                 vol, hash, filehash)
544                         error_to_caller = DiskHashError
545                         continue
546                 }
547                 if error_to_caller == DiskHashError {
548                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
549                                 vol, hash)
550                 }
551                 if update_timestamp {
552                         if err := vol.Touch(hash); err != nil {
553                                 error_to_caller = GenericError
554                                 log.Printf("%s: Touch %s failed: %s",
555                                         vol, hash, error_to_caller)
556                                 continue
557                         }
558                 }
559                 return buf, nil
560         }
561         return nil, error_to_caller
562 }
563
564 /* PutBlock(block, hash)
565    Stores the BLOCK (identified by the content id HASH) in Keep.
566
567    The MD5 checksum of the block must be identical to the content id HASH.
568    If not, an error is returned.
569
570    PutBlock stores the BLOCK on the first Keep volume with free space.
571    A failure code is returned to the user only if all volumes fail.
572
573    On success, PutBlock returns nil.
574    On failure, it returns a KeepError with one of the following codes:
575
576    500 Collision
577           A different block with the same hash already exists on this
578           Keep server.
579    422 MD5Fail
580           The MD5 hash of the BLOCK does not match the argument HASH.
581    503 Full
582           There was not enough space left in any Keep volume to store
583           the object.
584    500 Fail
585           The object could not be stored for some other reason (e.g.
586           all writes failed). The text of the error message should
587           provide as much detail as possible.
588 */
589
590 func PutBlock(block []byte, hash string) error {
591         // Check that BLOCK's checksum matches HASH.
592         blockhash := fmt.Sprintf("%x", md5.Sum(block))
593         if blockhash != hash {
594                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
595                 return RequestHashError
596         }
597
598         // If we already have a block on disk under this identifier, return
599         // success (but check for MD5 collisions).  While fetching the block,
600         // update its timestamp.
601         // The only errors that GetBlock can return are DiskHashError and NotFoundError.
602         // In either case, we want to write our new (good) block to disk,
603         // so there is nothing special to do if err != nil.
604         //
605         if oldblock, err := GetBlock(hash, true); err == nil {
606                 if bytes.Compare(block, oldblock) == 0 {
607                         // The block already exists; return success.
608                         return nil
609                 } else {
610                         return CollisionError
611                 }
612         }
613
614         // Choose a Keep volume to write to.
615         // If this volume fails, try all of the volumes in order.
616         if vol := KeepVM.NextWritable(); vol != nil {
617                 if err := vol.Put(hash, block); err == nil {
618                         return nil // success!
619                 }
620         }
621
622         writables := KeepVM.AllWritable()
623         if len(writables) == 0 {
624                 log.Print("No writable volumes.")
625                 return FullError
626         }
627
628         allFull := true
629         for _, vol := range writables {
630                 err := vol.Put(hash, block)
631                 if err == nil {
632                         return nil // success!
633                 }
634                 if err != FullError {
635                         // The volume is not full but the
636                         // write did not succeed.  Report the
637                         // error and continue trying.
638                         allFull = false
639                         log.Printf("%s: Write(%s): %s\n", vol, hash, err)
640                 }
641         }
642
643         if allFull {
644                 log.Print("All volumes are full.")
645                 return FullError
646         } else {
647                 // Already logged the non-full errors.
648                 return GenericError
649         }
650 }
651
652 // IsValidLocator
653 //     Return true if the specified string is a valid Keep locator.
654 //     When Keep is extended to support hash types other than MD5,
655 //     this should be updated to cover those as well.
656 //
657 func IsValidLocator(loc string) bool {
658         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
659         if err == nil {
660                 return match
661         }
662         log.Printf("IsValidLocator: %s\n", err)
663         return false
664 }
665
666 // GetApiToken returns the OAuth2 token from the Authorization
667 // header of a HTTP request, or an empty string if no matching
668 // token is found.
669 func GetApiToken(req *http.Request) string {
670         if auth, ok := req.Header["Authorization"]; ok {
671                 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
672                         log.Println(err)
673                 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
674                         return match[1]
675                 }
676         }
677         return ""
678 }
679
680 // IsExpired returns true if the given Unix timestamp (expressed as a
681 // hexadecimal string) is in the past, or if timestamp_hex cannot be
682 // parsed as a hexadecimal string.
683 func IsExpired(timestamp_hex string) bool {
684         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
685         if err != nil {
686                 log.Printf("IsExpired: %s\n", err)
687                 return true
688         }
689         return time.Unix(ts, 0).Before(time.Now())
690 }
691
692 // CanDelete returns true if the user identified by api_token is
693 // allowed to delete blocks.
694 func CanDelete(api_token string) bool {
695         if api_token == "" {
696                 return false
697         }
698         // Blocks may be deleted only when Keep has been configured with a
699         // data manager.
700         if IsDataManagerToken(api_token) {
701                 return true
702         }
703         // TODO(twp): look up api_token with the API server
704         // return true if is_admin is true and if the token
705         // has unlimited scope
706         return false
707 }
708
709 // IsDataManagerToken returns true if api_token represents the data
710 // manager's token.
711 func IsDataManagerToken(api_token string) bool {
712         return data_manager_token != "" && api_token == data_manager_token
713 }