Merge branch '3145-close-early' closes #3145
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "bytes"
12         "container/list"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "runtime"
23         "strconv"
24         "strings"
25         "syscall"
26         "time"
27 )
28
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
31 //
32 func MakeRESTRouter() *mux.Router {
33         rest := mux.NewRouter()
34
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37         rest.HandleFunc(
38                 `/{hash:[0-9a-f]{32}}+{hints}`,
39                 GetBlockHandler).Methods("GET", "HEAD")
40
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
43
44         // For IndexHandler we support:
45         //   /index           - returns all locators
46         //   /index/{prefix}  - returns all locators that begin with {prefix}
47         //      {prefix} is a string of hexadecimal digits between 0 and 32 digits.
48         //      If {prefix} is the empty string, return an index of all locators
49         //      (so /index and /index/ behave identically)
50         //      A client may supply a full 32-digit locator string, in which
51         //      case the server will return an index with either zero or one
52         //      entries. This usage allows a client to check whether a block is
53         //      present, and its size and upload time, without retrieving the
54         //      entire block.
55         //
56         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
57         rest.HandleFunc(
58                 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
59         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
60
61         // The PullHandler and TrashHandler process "PUT /pull" and "PUT
62         // /trash" requests from Data Manager.  These requests instruct
63         // Keep to replicate or delete blocks; see
64         // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
65         // for more details.
66         //
67         // Each handler parses the JSON list of block management requests
68         // in the message body, and replaces any existing pull queue or
69         // trash queue with their contentes.
70         //
71         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
72         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
73
74         // Any request which does not match any of these routes gets
75         // 400 Bad Request.
76         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
77
78         return rest
79 }
80
81 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
82         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
83 }
84
85 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
86         hash := mux.Vars(req)["hash"]
87
88         hints := mux.Vars(req)["hints"]
89
90         // Parse the locator string and hints from the request.
91         // TODO(twp): implement a Locator type.
92         var signature, timestamp string
93         if hints != "" {
94                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
95                 for _, hint := range strings.Split(hints, "+") {
96                         if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
97                                 // Server ignores size hints
98                         } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
99                                 signature = m[1]
100                                 timestamp = m[2]
101                         } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
102                                 // Any unknown hint that starts with an uppercase letter is
103                                 // presumed to be valid and ignored, to permit forward compatibility.
104                         } else {
105                                 // Unknown format; not a valid locator.
106                                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
107                                 return
108                         }
109                 }
110         }
111
112         // If permission checking is in effect, verify this
113         // request's permission signature.
114         if enforce_permissions {
115                 if signature == "" || timestamp == "" {
116                         http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
117                         return
118                 } else if IsExpired(timestamp) {
119                         http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
120                         return
121                 } else {
122                         req_locator := req.URL.Path[1:] // strip leading slash
123                         if !VerifySignature(req_locator, GetApiToken(req)) {
124                                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
125                                 return
126                         }
127                 }
128         }
129
130         block, err := GetBlock(hash, false)
131
132         // Garbage collect after each GET. Fixes #2865.
133         // TODO(twp): review Keep memory usage and see if there's
134         // a better way to do this than blindly garbage collecting
135         // after every block.
136         defer runtime.GC()
137
138         if err != nil {
139                 // This type assertion is safe because the only errors
140                 // GetBlock can return are DiskHashError or NotFoundError.
141                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
142                 return
143         }
144
145         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
146
147         _, err = resp.Write(block)
148
149         return
150 }
151
152 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
153         // Garbage collect after each PUT. Fixes #2865.
154         // See also GetBlockHandler.
155         defer runtime.GC()
156
157         hash := mux.Vars(req)["hash"]
158
159         // Detect as many error conditions as possible before reading
160         // the body: avoid transmitting data that will not end up
161         // being written anyway.
162
163         if req.ContentLength > BLOCKSIZE {
164                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
165                 return
166         }
167
168         if len(KeepVM.AllWritable()) == 0 {
169                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
170                 return
171         }
172
173         buf := make([]byte, req.ContentLength)
174         nread, err := io.ReadFull(req.Body, buf)
175         if err != nil {
176                 http.Error(resp, err.Error(), 500)
177                 return
178         } else if int64(nread) < req.ContentLength {
179                 http.Error(resp, "request truncated", 500)
180                 return
181         }
182
183         err = PutBlock(buf, hash)
184         if err != nil {
185                 ke := err.(*KeepError)
186                 http.Error(resp, ke.Error(), ke.HTTPCode)
187                 return
188         }
189
190         // Success; add a size hint, sign the locator if possible, and
191         // return it to the client.
192         return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
193         api_token := GetApiToken(req)
194         if PermissionSecret != nil && api_token != "" {
195                 expiry := time.Now().Add(permission_ttl)
196                 return_hash = SignLocator(return_hash, api_token, expiry)
197         }
198         resp.Write([]byte(return_hash + "\n"))
199 }
200
201 // IndexHandler
202 //     A HandleFunc to address /index and /index/{prefix} requests.
203 //
204 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
205         // Reject unauthorized requests.
206         if !IsDataManagerToken(GetApiToken(req)) {
207                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
208                 return
209         }
210
211         prefix := mux.Vars(req)["prefix"]
212
213         var index string
214         for _, vol := range KeepVM.AllReadable() {
215                 index = index + vol.Index(prefix)
216         }
217         resp.Write([]byte(index))
218 }
219
220 // StatusHandler
221 //     Responds to /status.json requests with the current node status,
222 //     described in a JSON structure.
223 //
224 //     The data given in a status.json response includes:
225 //        volumes - a list of Keep volumes currently in use by this server
226 //          each volume is an object with the following fields:
227 //            * mount_point
228 //            * device_num (an integer identifying the underlying filesystem)
229 //            * bytes_free
230 //            * bytes_used
231 //
232 type VolumeStatus struct {
233         MountPoint string `json:"mount_point"`
234         DeviceNum  uint64 `json:"device_num"`
235         BytesFree  uint64 `json:"bytes_free"`
236         BytesUsed  uint64 `json:"bytes_used"`
237 }
238
239 type NodeStatus struct {
240         Volumes []*VolumeStatus `json:"volumes"`
241 }
242
243 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
244         st := GetNodeStatus()
245         if jstat, err := json.Marshal(st); err == nil {
246                 resp.Write(jstat)
247         } else {
248                 log.Printf("json.Marshal: %s\n", err)
249                 log.Printf("NodeStatus = %v\n", st)
250                 http.Error(resp, err.Error(), 500)
251         }
252 }
253
254 // GetNodeStatus
255 //     Returns a NodeStatus struct describing this Keep
256 //     node's current status.
257 //
258 func GetNodeStatus() *NodeStatus {
259         st := new(NodeStatus)
260
261         st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
262         for i, vol := range KeepVM.AllReadable() {
263                 st.Volumes[i] = vol.Status()
264         }
265         return st
266 }
267
268 // GetVolumeStatus
269 //     Returns a VolumeStatus describing the requested volume.
270 //
271 func GetVolumeStatus(volume string) *VolumeStatus {
272         var fs syscall.Statfs_t
273         var devnum uint64
274
275         if fi, err := os.Stat(volume); err == nil {
276                 devnum = fi.Sys().(*syscall.Stat_t).Dev
277         } else {
278                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
279                 return nil
280         }
281
282         err := syscall.Statfs(volume, &fs)
283         if err != nil {
284                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
285                 return nil
286         }
287         // These calculations match the way df calculates disk usage:
288         // "free" space is measured by fs.Bavail, but "used" space
289         // uses fs.Blocks - fs.Bfree.
290         free := fs.Bavail * uint64(fs.Bsize)
291         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
292         return &VolumeStatus{volume, devnum, free, used}
293 }
294
295 // DeleteHandler processes DELETE requests.
296 //
297 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
298 // from all connected volumes.
299 //
300 // Only the Data Manager, or an Arvados admin with scope "all", are
301 // allowed to issue DELETE requests.  If a DELETE request is not
302 // authenticated or is issued by a non-admin user, the server returns
303 // a PermissionError.
304 //
305 // Upon receiving a valid request from an authorized user,
306 // DeleteHandler deletes all copies of the specified block on local
307 // writable volumes.
308 //
309 // Response format:
310 //
311 // If the requested blocks was not found on any volume, the response
312 // code is HTTP 404 Not Found.
313 //
314 // Otherwise, the response code is 200 OK, with a response body
315 // consisting of the JSON message
316 //
317 //    {"copies_deleted":d,"copies_failed":f}
318 //
319 // where d and f are integers representing the number of blocks that
320 // were successfully and unsuccessfully deleted.
321 //
322 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
323         hash := mux.Vars(req)["hash"]
324
325         // Confirm that this user is an admin and has a token with unlimited scope.
326         var tok = GetApiToken(req)
327         if tok == "" || !CanDelete(tok) {
328                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
329                 return
330         }
331
332         if never_delete {
333                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
334                 return
335         }
336
337         // Delete copies of this block from all available volumes.
338         // Report how many blocks were successfully deleted, and how
339         // many were found on writable volumes but not deleted.
340         var result struct {
341                 Deleted int `json:"copies_deleted"`
342                 Failed  int `json:"copies_failed"`
343         }
344         for _, vol := range KeepVM.AllWritable() {
345                 if err := vol.Delete(hash); err == nil {
346                         result.Deleted++
347                 } else if os.IsNotExist(err) {
348                         continue
349                 } else {
350                         result.Failed++
351                         log.Println("DeleteHandler:", err)
352                 }
353         }
354
355         var st int
356
357         if result.Deleted == 0 && result.Failed == 0 {
358                 st = http.StatusNotFound
359         } else {
360                 st = http.StatusOK
361         }
362
363         resp.WriteHeader(st)
364
365         if st == http.StatusOK {
366                 if body, err := json.Marshal(result); err == nil {
367                         resp.Write(body)
368                 } else {
369                         log.Printf("json.Marshal: %s (result = %v)\n", err, result)
370                         http.Error(resp, err.Error(), 500)
371                 }
372         }
373 }
374
375 /* PullHandler processes "PUT /pull" requests for the data manager.
376    The request body is a JSON message containing a list of pull
377    requests in the following format:
378
379    [
380       {
381          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
382          "servers":[
383                         "keep0.qr1hi.arvadosapi.com:25107",
384                         "keep1.qr1hi.arvadosapi.com:25108"
385                  ]
386           },
387           {
388                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
389                  "servers":[
390                         "10.0.1.5:25107",
391                         "10.0.1.6:25107",
392                         "10.0.1.7:25108"
393                  ]
394           },
395           ...
396    ]
397
398    Each pull request in the list consists of a block locator string
399    and an ordered list of servers.  Keepstore should try to fetch the
400    block from each server in turn.
401
402    If the request has not been sent by the Data Manager, return 401
403    Unauthorized.
404
405    If the JSON unmarshalling fails, return 400 Bad Request.
406 */
407
408 type PullRequest struct {
409         Locator string   `json:"locator"`
410         Servers []string `json:"servers"`
411 }
412
413 func PullHandler(resp http.ResponseWriter, req *http.Request) {
414         // Reject unauthorized requests.
415         if !IsDataManagerToken(GetApiToken(req)) {
416                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
417                 return
418         }
419
420         // Parse the request body.
421         var pr []PullRequest
422         r := json.NewDecoder(req.Body)
423         if err := r.Decode(&pr); err != nil {
424                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
425                 return
426         }
427
428         // We have a properly formatted pull list sent from the data
429         // manager.  Report success and send the list to the pull list
430         // manager for further handling.
431         resp.WriteHeader(http.StatusOK)
432         resp.Write([]byte(
433                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
434
435         plist := list.New()
436         for _, p := range pr {
437                 plist.PushBack(p)
438         }
439         pullq.ReplaceQueue(plist)
440 }
441
442 type TrashRequest struct {
443         Locator    string `json:"locator"`
444         BlockMtime int64  `json:"block_mtime"`
445 }
446
447 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
448         // Reject unauthorized requests.
449         if !IsDataManagerToken(GetApiToken(req)) {
450                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
451                 return
452         }
453
454         // Parse the request body.
455         var trash []TrashRequest
456         r := json.NewDecoder(req.Body)
457         if err := r.Decode(&trash); err != nil {
458                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
459                 return
460         }
461
462         // We have a properly formatted trash list sent from the data
463         // manager.  Report success and send the list to the trash work
464         // queue for further handling.
465         resp.WriteHeader(http.StatusOK)
466         resp.Write([]byte(
467                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
468
469         tlist := list.New()
470         for _, t := range trash {
471                 tlist.PushBack(t)
472         }
473         trashq.ReplaceQueue(tlist)
474 }
475
476 // ==============================
477 // GetBlock and PutBlock implement lower-level code for handling
478 // blocks by rooting through volumes connected to the local machine.
479 // Once the handler has determined that system policy permits the
480 // request, it calls these methods to perform the actual operation.
481 //
482 // TODO(twp): this code would probably be better located in the
483 // VolumeManager interface. As an abstraction, the VolumeManager
484 // should be the only part of the code that cares about which volume a
485 // block is stored on, so it should be responsible for figuring out
486 // which volume to check for fetching blocks, storing blocks, etc.
487
488 // ==============================
489 // GetBlock fetches and returns the block identified by "hash".  If
490 // the update_timestamp argument is true, GetBlock also updates the
491 // block's file modification time (for the sake of PutBlock, which
492 // must update the file's timestamp when the block already exists).
493 //
494 // On success, GetBlock returns a byte slice with the block data, and
495 // a nil error.
496 //
497 // If the block cannot be found on any volume, returns NotFoundError.
498 //
499 // If the block found does not have the correct MD5 hash, returns
500 // DiskHashError.
501 //
502
503 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
504         // Attempt to read the requested hash from a keep volume.
505         error_to_caller := NotFoundError
506
507         var vols []Volume
508         if update_timestamp {
509                 // Pointless to find the block on an unwritable volume
510                 // because Touch() will fail -- this is as good as
511                 // "not found" for purposes of callers who need to
512                 // update_timestamp.
513                 vols = KeepVM.AllWritable()
514         } else {
515                 vols = KeepVM.AllReadable()
516         }
517
518         for _, vol := range vols {
519                 buf, err := vol.Get(hash)
520                 if err != nil {
521                         // IsNotExist is an expected error and may be
522                         // ignored. All other errors are logged. In
523                         // any case we continue trying to read other
524                         // volumes. If all volumes report IsNotExist,
525                         // we return a NotFoundError.
526                         if !os.IsNotExist(err) {
527                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
528                         }
529                         continue
530                 }
531                 // Check the file checksum.
532                 //
533                 filehash := fmt.Sprintf("%x", md5.Sum(buf))
534                 if filehash != hash {
535                         // TODO: Try harder to tell a sysadmin about
536                         // this.
537                         log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
538                                 vol, hash, filehash)
539                         error_to_caller = DiskHashError
540                         continue
541                 }
542                 if error_to_caller == DiskHashError {
543                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
544                                 vol, hash)
545                 }
546                 if update_timestamp {
547                         if err := vol.Touch(hash); err != nil {
548                                 error_to_caller = GenericError
549                                 log.Printf("%s: Touch %s failed: %s",
550                                         vol, hash, error_to_caller)
551                                 continue
552                         }
553                 }
554                 return buf, nil
555         }
556         return nil, error_to_caller
557 }
558
559 /* PutBlock(block, hash)
560    Stores the BLOCK (identified by the content id HASH) in Keep.
561
562    The MD5 checksum of the block must be identical to the content id HASH.
563    If not, an error is returned.
564
565    PutBlock stores the BLOCK on the first Keep volume with free space.
566    A failure code is returned to the user only if all volumes fail.
567
568    On success, PutBlock returns nil.
569    On failure, it returns a KeepError with one of the following codes:
570
571    500 Collision
572           A different block with the same hash already exists on this
573           Keep server.
574    422 MD5Fail
575           The MD5 hash of the BLOCK does not match the argument HASH.
576    503 Full
577           There was not enough space left in any Keep volume to store
578           the object.
579    500 Fail
580           The object could not be stored for some other reason (e.g.
581           all writes failed). The text of the error message should
582           provide as much detail as possible.
583 */
584
585 func PutBlock(block []byte, hash string) error {
586         // Check that BLOCK's checksum matches HASH.
587         blockhash := fmt.Sprintf("%x", md5.Sum(block))
588         if blockhash != hash {
589                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
590                 return RequestHashError
591         }
592
593         // If we already have a block on disk under this identifier, return
594         // success (but check for MD5 collisions).  While fetching the block,
595         // update its timestamp.
596         // The only errors that GetBlock can return are DiskHashError and NotFoundError.
597         // In either case, we want to write our new (good) block to disk,
598         // so there is nothing special to do if err != nil.
599         //
600         if oldblock, err := GetBlock(hash, true); err == nil {
601                 if bytes.Compare(block, oldblock) == 0 {
602                         // The block already exists; return success.
603                         return nil
604                 } else {
605                         return CollisionError
606                 }
607         }
608
609         // Choose a Keep volume to write to.
610         // If this volume fails, try all of the volumes in order.
611         if vol := KeepVM.NextWritable(); vol != nil {
612                 if err := vol.Put(hash, block); err == nil {
613                         return nil // success!
614                 }
615         }
616
617         writables := KeepVM.AllWritable()
618         if len(writables) == 0 {
619                 log.Print("No writable volumes.")
620                 return FullError
621         }
622
623         allFull := true
624         for _, vol := range writables {
625                 err := vol.Put(hash, block)
626                 if err == nil {
627                         return nil // success!
628                 }
629                 if err != FullError {
630                         // The volume is not full but the
631                         // write did not succeed.  Report the
632                         // error and continue trying.
633                         allFull = false
634                         log.Printf("%s: Write(%s): %s\n", vol, hash, err)
635                 }
636         }
637
638         if allFull {
639                 log.Print("All volumes are full.")
640                 return FullError
641         } else {
642                 // Already logged the non-full errors.
643                 return GenericError
644         }
645 }
646
647 // IsValidLocator
648 //     Return true if the specified string is a valid Keep locator.
649 //     When Keep is extended to support hash types other than MD5,
650 //     this should be updated to cover those as well.
651 //
652 func IsValidLocator(loc string) bool {
653         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
654         if err == nil {
655                 return match
656         }
657         log.Printf("IsValidLocator: %s\n", err)
658         return false
659 }
660
661 // GetApiToken returns the OAuth2 token from the Authorization
662 // header of a HTTP request, or an empty string if no matching
663 // token is found.
664 func GetApiToken(req *http.Request) string {
665         if auth, ok := req.Header["Authorization"]; ok {
666                 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
667                         log.Println(err)
668                 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
669                         return match[1]
670                 }
671         }
672         return ""
673 }
674
675 // IsExpired returns true if the given Unix timestamp (expressed as a
676 // hexadecimal string) is in the past, or if timestamp_hex cannot be
677 // parsed as a hexadecimal string.
678 func IsExpired(timestamp_hex string) bool {
679         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
680         if err != nil {
681                 log.Printf("IsExpired: %s\n", err)
682                 return true
683         }
684         return time.Unix(ts, 0).Before(time.Now())
685 }
686
687 // CanDelete returns true if the user identified by api_token is
688 // allowed to delete blocks.
689 func CanDelete(api_token string) bool {
690         if api_token == "" {
691                 return false
692         }
693         // Blocks may be deleted only when Keep has been configured with a
694         // data manager.
695         if IsDataManagerToken(api_token) {
696                 return true
697         }
698         // TODO(twp): look up api_token with the API server
699         // return true if is_admin is true and if the token
700         // has unlimited scope
701         return false
702 }
703
704 // IsDataManagerToken returns true if api_token represents the data
705 // manager's token.
706 func IsDataManagerToken(api_token string) bool {
707         return data_manager_token != "" && api_token == data_manager_token
708 }