5562: Use static method. Fixes "TypeError: _socket_open() takes exactly 5 arguments...
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "bytes"
12         "container/list"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "runtime"
23         "strconv"
24         "strings"
25         "syscall"
26         "time"
27 )
28
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
31 //
32 func MakeRESTRouter() *mux.Router {
33         rest := mux.NewRouter()
34
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37         rest.HandleFunc(
38                 `/{hash:[0-9a-f]{32}}+{hints}`,
39                 GetBlockHandler).Methods("GET", "HEAD")
40
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
43
44         // For IndexHandler we support:
45         //   /index           - returns all locators
46         //   /index/{prefix}  - returns all locators that begin with {prefix}
47         //      {prefix} is a string of hexadecimal digits between 0 and 32 digits.
48         //      If {prefix} is the empty string, return an index of all locators
49         //      (so /index and /index/ behave identically)
50         //      A client may supply a full 32-digit locator string, in which
51         //      case the server will return an index with either zero or one
52         //      entries. This usage allows a client to check whether a block is
53         //      present, and its size and upload time, without retrieving the
54         //      entire block.
55         //
56         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
57         rest.HandleFunc(
58                 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
59         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
60
61         // The PullHandler and TrashHandler process "PUT /pull" and "PUT
62         // /trash" requests from Data Manager.  These requests instruct
63         // Keep to replicate or delete blocks; see
64         // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
65         // for more details.
66         //
67         // Each handler parses the JSON list of block management requests
68         // in the message body, and replaces any existing pull queue or
69         // trash queue with their contentes.
70         //
71         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
72         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
73
74         // Any request which does not match any of these routes gets
75         // 400 Bad Request.
76         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
77
78         return rest
79 }
80
81 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
82         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
83 }
84
85 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
86         hash := mux.Vars(req)["hash"]
87
88         hints := mux.Vars(req)["hints"]
89
90         // Parse the locator string and hints from the request.
91         // TODO(twp): implement a Locator type.
92         var signature, timestamp string
93         if hints != "" {
94                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
95                 for _, hint := range strings.Split(hints, "+") {
96                         if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
97                                 // Server ignores size hints
98                         } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
99                                 signature = m[1]
100                                 timestamp = m[2]
101                         } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
102                                 // Any unknown hint that starts with an uppercase letter is
103                                 // presumed to be valid and ignored, to permit forward compatibility.
104                         } else {
105                                 // Unknown format; not a valid locator.
106                                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
107                                 return
108                         }
109                 }
110         }
111
112         // If permission checking is in effect, verify this
113         // request's permission signature.
114         if enforce_permissions {
115                 if signature == "" || timestamp == "" {
116                         http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
117                         return
118                 } else if IsExpired(timestamp) {
119                         http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
120                         return
121                 } else {
122                         req_locator := req.URL.Path[1:] // strip leading slash
123                         if !VerifySignature(req_locator, GetApiToken(req)) {
124                                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
125                                 return
126                         }
127                 }
128         }
129
130         block, err := GetBlock(hash, false)
131
132         // Garbage collect after each GET. Fixes #2865.
133         // TODO(twp): review Keep memory usage and see if there's
134         // a better way to do this than blindly garbage collecting
135         // after every block.
136         defer runtime.GC()
137
138         if err != nil {
139                 // This type assertion is safe because the only errors
140                 // GetBlock can return are DiskHashError or NotFoundError.
141                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
142                 return
143         }
144
145         resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(block)))
146
147         _, err = resp.Write(block)
148
149         return
150 }
151
152 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
153         // Garbage collect after each PUT. Fixes #2865.
154         // See also GetBlockHandler.
155         defer runtime.GC()
156
157         hash := mux.Vars(req)["hash"]
158
159         // Read the block data to be stored.
160         // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
161         //
162         if req.ContentLength > BLOCKSIZE {
163                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
164                 return
165         }
166
167         buf := make([]byte, req.ContentLength)
168         nread, err := io.ReadFull(req.Body, buf)
169         if err != nil {
170                 http.Error(resp, err.Error(), 500)
171         } else if int64(nread) < req.ContentLength {
172                 http.Error(resp, "request truncated", 500)
173         } else {
174                 if err := PutBlock(buf, hash); err == nil {
175                         // Success; add a size hint, sign the locator if
176                         // possible, and return it to the client.
177                         return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
178                         api_token := GetApiToken(req)
179                         if PermissionSecret != nil && api_token != "" {
180                                 expiry := time.Now().Add(permission_ttl)
181                                 return_hash = SignLocator(return_hash, api_token, expiry)
182                         }
183                         resp.Write([]byte(return_hash + "\n"))
184                 } else {
185                         ke := err.(*KeepError)
186                         http.Error(resp, ke.Error(), ke.HTTPCode)
187                 }
188         }
189         return
190 }
191
192 // IndexHandler
193 //     A HandleFunc to address /index and /index/{prefix} requests.
194 //
195 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
196         // Reject unauthorized requests.
197         if !IsDataManagerToken(GetApiToken(req)) {
198                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
199                 return
200         }
201
202         prefix := mux.Vars(req)["prefix"]
203
204         var index string
205         for _, vol := range KeepVM.AllReadable() {
206                 index = index + vol.Index(prefix)
207         }
208         resp.Write([]byte(index))
209 }
210
211 // StatusHandler
212 //     Responds to /status.json requests with the current node status,
213 //     described in a JSON structure.
214 //
215 //     The data given in a status.json response includes:
216 //        volumes - a list of Keep volumes currently in use by this server
217 //          each volume is an object with the following fields:
218 //            * mount_point
219 //            * device_num (an integer identifying the underlying filesystem)
220 //            * bytes_free
221 //            * bytes_used
222 //
223 type VolumeStatus struct {
224         MountPoint string `json:"mount_point"`
225         DeviceNum  uint64 `json:"device_num"`
226         BytesFree  uint64 `json:"bytes_free"`
227         BytesUsed  uint64 `json:"bytes_used"`
228 }
229
230 type NodeStatus struct {
231         Volumes []*VolumeStatus `json:"volumes"`
232 }
233
234 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
235         st := GetNodeStatus()
236         if jstat, err := json.Marshal(st); err == nil {
237                 resp.Write(jstat)
238         } else {
239                 log.Printf("json.Marshal: %s\n", err)
240                 log.Printf("NodeStatus = %v\n", st)
241                 http.Error(resp, err.Error(), 500)
242         }
243 }
244
245 // GetNodeStatus
246 //     Returns a NodeStatus struct describing this Keep
247 //     node's current status.
248 //
249 func GetNodeStatus() *NodeStatus {
250         st := new(NodeStatus)
251
252         st.Volumes = make([]*VolumeStatus, len(KeepVM.AllReadable()))
253         for i, vol := range KeepVM.AllReadable() {
254                 st.Volumes[i] = vol.Status()
255         }
256         return st
257 }
258
259 // GetVolumeStatus
260 //     Returns a VolumeStatus describing the requested volume.
261 //
262 func GetVolumeStatus(volume string) *VolumeStatus {
263         var fs syscall.Statfs_t
264         var devnum uint64
265
266         if fi, err := os.Stat(volume); err == nil {
267                 devnum = fi.Sys().(*syscall.Stat_t).Dev
268         } else {
269                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
270                 return nil
271         }
272
273         err := syscall.Statfs(volume, &fs)
274         if err != nil {
275                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
276                 return nil
277         }
278         // These calculations match the way df calculates disk usage:
279         // "free" space is measured by fs.Bavail, but "used" space
280         // uses fs.Blocks - fs.Bfree.
281         free := fs.Bavail * uint64(fs.Bsize)
282         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
283         return &VolumeStatus{volume, devnum, free, used}
284 }
285
286 // DeleteHandler processes DELETE requests.
287 //
288 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
289 // from all connected volumes.
290 //
291 // Only the Data Manager, or an Arvados admin with scope "all", are
292 // allowed to issue DELETE requests.  If a DELETE request is not
293 // authenticated or is issued by a non-admin user, the server returns
294 // a PermissionError.
295 //
296 // Upon receiving a valid request from an authorized user,
297 // DeleteHandler deletes all copies of the specified block on local
298 // writable volumes.
299 //
300 // Response format:
301 //
302 // If the requested blocks was not found on any volume, the response
303 // code is HTTP 404 Not Found.
304 //
305 // Otherwise, the response code is 200 OK, with a response body
306 // consisting of the JSON message
307 //
308 //    {"copies_deleted":d,"copies_failed":f}
309 //
310 // where d and f are integers representing the number of blocks that
311 // were successfully and unsuccessfully deleted.
312 //
313 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
314         hash := mux.Vars(req)["hash"]
315
316         // Confirm that this user is an admin and has a token with unlimited scope.
317         var tok = GetApiToken(req)
318         if tok == "" || !CanDelete(tok) {
319                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
320                 return
321         }
322
323         if never_delete {
324                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
325                 return
326         }
327
328         // Delete copies of this block from all available volumes.
329         // Report how many blocks were successfully deleted, and how
330         // many were found on writable volumes but not deleted.
331         var result struct {
332                 Deleted int `json:"copies_deleted"`
333                 Failed  int `json:"copies_failed"`
334         }
335         for _, vol := range KeepVM.AllWritable() {
336                 if err := vol.Delete(hash); err == nil {
337                         result.Deleted++
338                 } else if os.IsNotExist(err) {
339                         continue
340                 } else {
341                         result.Failed++
342                         log.Println("DeleteHandler:", err)
343                 }
344         }
345
346         var st int
347
348         if result.Deleted == 0 && result.Failed == 0 {
349                 st = http.StatusNotFound
350         } else {
351                 st = http.StatusOK
352         }
353
354         resp.WriteHeader(st)
355
356         if st == http.StatusOK {
357                 if body, err := json.Marshal(result); err == nil {
358                         resp.Write(body)
359                 } else {
360                         log.Printf("json.Marshal: %s (result = %v)\n", err, result)
361                         http.Error(resp, err.Error(), 500)
362                 }
363         }
364 }
365
366 /* PullHandler processes "PUT /pull" requests for the data manager.
367    The request body is a JSON message containing a list of pull
368    requests in the following format:
369
370    [
371       {
372          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
373          "servers":[
374                         "keep0.qr1hi.arvadosapi.com:25107",
375                         "keep1.qr1hi.arvadosapi.com:25108"
376                  ]
377           },
378           {
379                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
380                  "servers":[
381                         "10.0.1.5:25107",
382                         "10.0.1.6:25107",
383                         "10.0.1.7:25108"
384                  ]
385           },
386           ...
387    ]
388
389    Each pull request in the list consists of a block locator string
390    and an ordered list of servers.  Keepstore should try to fetch the
391    block from each server in turn.
392
393    If the request has not been sent by the Data Manager, return 401
394    Unauthorized.
395
396    If the JSON unmarshalling fails, return 400 Bad Request.
397 */
398
399 type PullRequest struct {
400         Locator string   `json:"locator"`
401         Servers []string `json:"servers"`
402 }
403
404 func PullHandler(resp http.ResponseWriter, req *http.Request) {
405         // Reject unauthorized requests.
406         if !IsDataManagerToken(GetApiToken(req)) {
407                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
408                 return
409         }
410
411         // Parse the request body.
412         var pr []PullRequest
413         r := json.NewDecoder(req.Body)
414         if err := r.Decode(&pr); err != nil {
415                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
416                 return
417         }
418
419         // We have a properly formatted pull list sent from the data
420         // manager.  Report success and send the list to the pull list
421         // manager for further handling.
422         resp.WriteHeader(http.StatusOK)
423         resp.Write([]byte(
424                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
425
426         plist := list.New()
427         for _, p := range pr {
428                 plist.PushBack(p)
429         }
430         pullq.ReplaceQueue(plist)
431 }
432
433 type TrashRequest struct {
434         Locator    string `json:"locator"`
435         BlockMtime int64  `json:"block_mtime"`
436 }
437
438 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
439         // Reject unauthorized requests.
440         if !IsDataManagerToken(GetApiToken(req)) {
441                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
442                 return
443         }
444
445         // Parse the request body.
446         var trash []TrashRequest
447         r := json.NewDecoder(req.Body)
448         if err := r.Decode(&trash); err != nil {
449                 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
450                 return
451         }
452
453         // We have a properly formatted trash list sent from the data
454         // manager.  Report success and send the list to the trash work
455         // queue for further handling.
456         resp.WriteHeader(http.StatusOK)
457         resp.Write([]byte(
458                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
459
460         tlist := list.New()
461         for _, t := range trash {
462                 tlist.PushBack(t)
463         }
464         trashq.ReplaceQueue(tlist)
465 }
466
467 // ==============================
468 // GetBlock and PutBlock implement lower-level code for handling
469 // blocks by rooting through volumes connected to the local machine.
470 // Once the handler has determined that system policy permits the
471 // request, it calls these methods to perform the actual operation.
472 //
473 // TODO(twp): this code would probably be better located in the
474 // VolumeManager interface. As an abstraction, the VolumeManager
475 // should be the only part of the code that cares about which volume a
476 // block is stored on, so it should be responsible for figuring out
477 // which volume to check for fetching blocks, storing blocks, etc.
478
479 // ==============================
480 // GetBlock fetches and returns the block identified by "hash".  If
481 // the update_timestamp argument is true, GetBlock also updates the
482 // block's file modification time (for the sake of PutBlock, which
483 // must update the file's timestamp when the block already exists).
484 //
485 // On success, GetBlock returns a byte slice with the block data, and
486 // a nil error.
487 //
488 // If the block cannot be found on any volume, returns NotFoundError.
489 //
490 // If the block found does not have the correct MD5 hash, returns
491 // DiskHashError.
492 //
493
494 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
495         // Attempt to read the requested hash from a keep volume.
496         error_to_caller := NotFoundError
497
498         var vols []Volume
499         if update_timestamp {
500                 // Pointless to find the block on an unwritable volume
501                 // because Touch() will fail -- this is as good as
502                 // "not found" for purposes of callers who need to
503                 // update_timestamp.
504                 vols = KeepVM.AllWritable()
505         } else {
506                 vols = KeepVM.AllReadable()
507         }
508
509         for _, vol := range vols {
510                 buf, err := vol.Get(hash)
511                 if err != nil {
512                         // IsNotExist is an expected error and may be
513                         // ignored. All other errors are logged. In
514                         // any case we continue trying to read other
515                         // volumes. If all volumes report IsNotExist,
516                         // we return a NotFoundError.
517                         if !os.IsNotExist(err) {
518                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
519                         }
520                         continue
521                 }
522                 // Check the file checksum.
523                 //
524                 filehash := fmt.Sprintf("%x", md5.Sum(buf))
525                 if filehash != hash {
526                         // TODO: Try harder to tell a sysadmin about
527                         // this.
528                         log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
529                                 vol, hash, filehash)
530                         error_to_caller = DiskHashError
531                         continue
532                 }
533                 if error_to_caller == DiskHashError {
534                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
535                                 vol, hash)
536                 }
537                 if update_timestamp {
538                         if err := vol.Touch(hash); err != nil {
539                                 error_to_caller = GenericError
540                                 log.Printf("%s: Touch %s failed: %s",
541                                         vol, hash, error_to_caller)
542                                 continue
543                         }
544                 }
545                 return buf, nil
546         }
547         return nil, error_to_caller
548 }
549
550 /* PutBlock(block, hash)
551    Stores the BLOCK (identified by the content id HASH) in Keep.
552
553    The MD5 checksum of the block must be identical to the content id HASH.
554    If not, an error is returned.
555
556    PutBlock stores the BLOCK on the first Keep volume with free space.
557    A failure code is returned to the user only if all volumes fail.
558
559    On success, PutBlock returns nil.
560    On failure, it returns a KeepError with one of the following codes:
561
562    500 Collision
563           A different block with the same hash already exists on this
564           Keep server.
565    422 MD5Fail
566           The MD5 hash of the BLOCK does not match the argument HASH.
567    503 Full
568           There was not enough space left in any Keep volume to store
569           the object.
570    500 Fail
571           The object could not be stored for some other reason (e.g.
572           all writes failed). The text of the error message should
573           provide as much detail as possible.
574 */
575
576 func PutBlock(block []byte, hash string) error {
577         // Check that BLOCK's checksum matches HASH.
578         blockhash := fmt.Sprintf("%x", md5.Sum(block))
579         if blockhash != hash {
580                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
581                 return RequestHashError
582         }
583
584         // If we already have a block on disk under this identifier, return
585         // success (but check for MD5 collisions).  While fetching the block,
586         // update its timestamp.
587         // The only errors that GetBlock can return are DiskHashError and NotFoundError.
588         // In either case, we want to write our new (good) block to disk,
589         // so there is nothing special to do if err != nil.
590         //
591         if oldblock, err := GetBlock(hash, true); err == nil {
592                 if bytes.Compare(block, oldblock) == 0 {
593                         // The block already exists; return success.
594                         return nil
595                 } else {
596                         return CollisionError
597                 }
598         }
599
600         // Choose a Keep volume to write to.
601         // If this volume fails, try all of the volumes in order.
602         if vol := KeepVM.NextWritable(); vol != nil {
603                 if err := vol.Put(hash, block); err == nil {
604                         return nil // success!
605                 }
606         }
607
608         writables := KeepVM.AllWritable()
609         if len(writables) == 0 {
610                 log.Print("No writable volumes.")
611                 return FullError
612         }
613
614         allFull := true
615         for _, vol := range writables {
616                 err := vol.Put(hash, block)
617                 if err == nil {
618                         return nil // success!
619                 }
620                 if err != FullError {
621                         // The volume is not full but the
622                         // write did not succeed.  Report the
623                         // error and continue trying.
624                         allFull = false
625                         log.Printf("%s: Write(%s): %s\n", vol, hash, err)
626                 }
627         }
628
629         if allFull {
630                 log.Print("All volumes are full.")
631                 return FullError
632         } else {
633                 // Already logged the non-full errors.
634                 return GenericError
635         }
636 }
637
638 // IsValidLocator
639 //     Return true if the specified string is a valid Keep locator.
640 //     When Keep is extended to support hash types other than MD5,
641 //     this should be updated to cover those as well.
642 //
643 func IsValidLocator(loc string) bool {
644         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
645         if err == nil {
646                 return match
647         }
648         log.Printf("IsValidLocator: %s\n", err)
649         return false
650 }
651
652 // GetApiToken returns the OAuth2 token from the Authorization
653 // header of a HTTP request, or an empty string if no matching
654 // token is found.
655 func GetApiToken(req *http.Request) string {
656         if auth, ok := req.Header["Authorization"]; ok {
657                 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
658                         log.Println(err)
659                 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
660                         return match[1]
661                 }
662         }
663         return ""
664 }
665
666 // IsExpired returns true if the given Unix timestamp (expressed as a
667 // hexadecimal string) is in the past, or if timestamp_hex cannot be
668 // parsed as a hexadecimal string.
669 func IsExpired(timestamp_hex string) bool {
670         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
671         if err != nil {
672                 log.Printf("IsExpired: %s\n", err)
673                 return true
674         }
675         return time.Unix(ts, 0).Before(time.Now())
676 }
677
678 // CanDelete returns true if the user identified by api_token is
679 // allowed to delete blocks.
680 func CanDelete(api_token string) bool {
681         if api_token == "" {
682                 return false
683         }
684         // Blocks may be deleted only when Keep has been configured with a
685         // data manager.
686         if IsDataManagerToken(api_token) {
687                 return true
688         }
689         // TODO(twp): look up api_token with the API server
690         // return true if is_admin is true and if the token
691         // has unlimited scope
692         return false
693 }
694
695 // IsDataManagerToken returns true if api_token represents the data
696 // manager's token.
697 func IsDataManagerToken(api_token string) bool {
698         return data_manager_token != "" && api_token == data_manager_token
699 }