Merge branch '2328-keep-permission-flags' (closes #2328)
[arvados.git] / services / keep / src / keep / keep.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/md5"
7         "encoding/json"
8         "errors"
9         "flag"
10         "fmt"
11         "github.com/gorilla/mux"
12         "io"
13         "io/ioutil"
14         "log"
15         "net/http"
16         "os"
17         "regexp"
18         "strconv"
19         "strings"
20         "syscall"
21         "time"
22 )
23
24 // ======================
25 // Configuration settings
26 //
27 // TODO(twp): make all of these configurable via command line flags
28 // and/or configuration file settings.
29
30 // Default TCP address on which to listen for requests.
31 // Initialized by the --listen flag.
32 const DEFAULT_ADDR = ":25107"
33
34 // A Keep "block" is 64MB.
35 const BLOCKSIZE = 64 * 1024 * 1024
36
37 // A Keep volume must have at least MIN_FREE_KILOBYTES available
38 // in order to permit writes.
39 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
40
41 var PROC_MOUNTS = "/proc/mounts"
42
43 // The Keep VolumeManager maintains a list of available volumes.
44 // Initialized by the --volumes flag (or by FindKeepVolumes).
45 var KeepVM VolumeManager
46
47 // enforce_permissions controls whether permission signatures
48 // should be enforced (affecting GET and DELETE requests).
49 // Initialized by the --enforce-permissions flag.
50 var enforce_permissions bool
51
52 // permission_ttl is the time duration for which new permission
53 // signatures (returned by PUT requests) will be valid.
54 // Initialized by the --permission-ttl flag.
55 var permission_ttl time.Duration
56
57 // data_manager_token represents the API token used by the
58 // Data Manager, and is required on certain privileged operations.
59 // Initialized by the --data-manager-token-file flag.
60 var data_manager_token string
61
62 // ==========
63 // Error types.
64 //
65 type KeepError struct {
66         HTTPCode int
67         ErrMsg   string
68 }
69
70 var (
71         CollisionError  = &KeepError{400, "Collision"}
72         MD5Error        = &KeepError{401, "MD5 Failure"}
73         PermissionError = &KeepError{401, "Permission denied"}
74         CorruptError    = &KeepError{402, "Corruption"}
75         ExpiredError    = &KeepError{403, "Expired permission signature"}
76         NotFoundError   = &KeepError{404, "Not Found"}
77         GenericError    = &KeepError{500, "Fail"}
78         FullError       = &KeepError{503, "Full"}
79         TooLongError    = &KeepError{504, "Too Long"}
80 )
81
82 func (e *KeepError) Error() string {
83         return e.ErrMsg
84 }
85
86 // This error is returned by ReadAtMost if the available
87 // data exceeds BLOCKSIZE bytes.
88 var ReadErrorTooLong = errors.New("Too long")
89
90 // TODO(twp): continue moving as much code as possible out of main
91 // so it can be effectively tested. Esp. handling and postprocessing
92 // of command line flags (identifying Keep volumes and initializing
93 // permission arguments).
94
95 func main() {
96         // Parse command-line flags:
97         //
98         // -listen=ipaddr:port
99         //    Interface on which to listen for requests. Use :port without
100         //    an ipaddr to listen on all network interfaces.
101         //    Examples:
102         //      -listen=127.0.0.1:4949
103         //      -listen=10.0.1.24:8000
104         //      -listen=:25107 (to listen to port 25107 on all interfaces)
105         //
106         // -volumes
107         //    A comma-separated list of directories to use as Keep volumes.
108         //    Example:
109         //      -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
110         //
111         //    If -volumes is empty or is not present, Keep will select volumes
112         //    by looking at currently mounted filesystems for /keep top-level
113         //    directories.
114
115         var (
116                 data_manager_token_file string
117                 listen                  string
118                 permission_key_file     string
119                 permission_ttl_sec      int
120                 serialize_io            bool
121                 volumearg               string
122         )
123         flag.StringVar(
124                 &data_manager_token_file,
125                 "data-manager-token-file",
126                 "",
127                 "File with the API token used by the Data Manager. All DELETE "+
128                         "requests or GET /index requests must carry this token.")
129         flag.BoolVar(
130                 &enforce_permissions,
131                 "enforce-permissions",
132                 false,
133                 "Enforce permission signatures on requests.")
134         flag.StringVar(
135                 &listen,
136                 "listen",
137                 DEFAULT_ADDR,
138                 "Interface on which to listen for requests, in the format "+
139                         "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
140                         "to listen on all network interfaces.")
141         flag.StringVar(
142                 &permission_key_file,
143                 "permission-key-file",
144                 "",
145                 "File containing the secret key for generating and verifying "+
146                         "permission signatures.")
147         flag.IntVar(
148                 &permission_ttl_sec,
149                 "permission-ttl",
150                 300,
151                 "Expiration time (in seconds) for newly generated permission "+
152                         "signatures.")
153         flag.BoolVar(
154                 &serialize_io,
155                 "serialize",
156                 false,
157                 "If set, all read and write operations on local Keep volumes will "+
158                         "be serialized.")
159         flag.StringVar(
160                 &volumearg,
161                 "volumes",
162                 "",
163                 "Comma-separated list of directories to use for Keep volumes, "+
164                         "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
165                         "supplied, Keep will scan mounted filesystems for volumes "+
166                         "with a /keep top-level directory.")
167         flag.Parse()
168
169         // Look for local keep volumes.
170         var keepvols []string
171         if volumearg == "" {
172                 // TODO(twp): decide whether this is desirable default behavior.
173                 // In production we may want to require the admin to specify
174                 // Keep volumes explicitly.
175                 keepvols = FindKeepVolumes()
176         } else {
177                 keepvols = strings.Split(volumearg, ",")
178         }
179
180         // Check that the specified volumes actually exist.
181         var goodvols []Volume = nil
182         for _, v := range keepvols {
183                 if _, err := os.Stat(v); err == nil {
184                         log.Println("adding Keep volume:", v)
185                         newvol := MakeUnixVolume(v, serialize_io)
186                         goodvols = append(goodvols, &newvol)
187                 } else {
188                         log.Printf("bad Keep volume: %s\n", err)
189                 }
190         }
191
192         if len(goodvols) == 0 {
193                 log.Fatal("could not find any keep volumes")
194         }
195
196         // Initialize data manager token and permission key.
197         // If these tokens are specified but cannot be read,
198         // raise a fatal error.
199         if data_manager_token_file != "" {
200                 if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
201                         data_manager_token = strings.TrimSpace(string(buf))
202                 } else {
203                         log.Fatalf("reading data manager token: %s\n", err)
204                 }
205         }
206         if permission_key_file != "" {
207                 if buf, err := ioutil.ReadFile(permission_key_file); err == nil {
208                         PermissionSecret = bytes.TrimSpace(buf)
209                 } else {
210                         log.Fatalf("reading permission key: %s\n", err)
211                 }
212         }
213
214         // Initialize permission TTL
215         permission_ttl = time.Duration(permission_ttl_sec) * time.Second
216
217         // If --enforce-permissions is true, we must have a permission key
218         // to continue.
219         if PermissionSecret == nil {
220                 if enforce_permissions {
221                         log.Fatal("--enforce-permissions requires a permission key")
222                 } else {
223                         log.Warning("Running without a PermissionSecret. Block locators " +
224                                 "returned by this server will not be signed, and will be rejected " +
225                                 "by a server that enforces permissions.")
226                         log.Warning("To fix this, run Keep with --permission-key-file=<path> " +
227                                 "to define the location of a file containing the permission key.")
228                 }
229         }
230
231         // Start a round-robin VolumeManager with the volumes we have found.
232         KeepVM = MakeRRVolumeManager(goodvols)
233
234         // Tell the built-in HTTP server to direct all requests to the REST
235         // router.
236         http.Handle("/", MakeRESTRouter())
237
238         // Start listening for requests.
239         http.ListenAndServe(listen, nil)
240 }
241
242 // MakeRESTRouter
243 //     Returns a mux.Router that passes GET and PUT requests to the
244 //     appropriate handlers.
245 //
246 func MakeRESTRouter() *mux.Router {
247         rest := mux.NewRouter()
248         rest.HandleFunc(
249                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
250         rest.HandleFunc(
251                 `/{hash:[0-9a-f]{32}}+A{signature:[0-9a-f]+}@{timestamp:[0-9a-f]+}`,
252                 GetBlockHandler).Methods("GET", "HEAD")
253         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
254
255         // For IndexHandler we support:
256         //   /index           - returns all locators
257         //   /index/{prefix}  - returns all locators that begin with {prefix}
258         //      {prefix} is a string of hexadecimal digits between 0 and 32 digits.
259         //      If {prefix} is the empty string, return an index of all locators
260         //      (so /index and /index/ behave identically)
261         //      A client may supply a full 32-digit locator string, in which
262         //      case the server will return an index with either zero or one
263         //      entries. This usage allows a client to check whether a block is
264         //      present, and its size and upload time, without retrieving the
265         //      entire block.
266         //
267         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
268         rest.HandleFunc(
269                 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
270         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
271         return rest
272 }
273
274 // FindKeepVolumes
275 //     Returns a list of Keep volumes mounted on this system.
276 //
277 //     A Keep volume is a normal or tmpfs volume with a /keep
278 //     directory at the top level of the mount point.
279 //
280 func FindKeepVolumes() []string {
281         vols := make([]string, 0)
282
283         if f, err := os.Open(PROC_MOUNTS); err != nil {
284                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
285         } else {
286                 scanner := bufio.NewScanner(f)
287                 for scanner.Scan() {
288                         args := strings.Fields(scanner.Text())
289                         dev, mount := args[0], args[1]
290                         if mount != "/" &&
291                                 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
292                                 keep := mount + "/keep"
293                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
294                                         vols = append(vols, keep)
295                                 }
296                         }
297                 }
298                 if err := scanner.Err(); err != nil {
299                         log.Fatal(err)
300                 }
301         }
302         return vols
303 }
304
305 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
306         hash := mux.Vars(req)["hash"]
307         signature := mux.Vars(req)["signature"]
308         timestamp := mux.Vars(req)["timestamp"]
309
310         // If permission checking is in effect, verify this
311         // request's permission signature.
312         if enforce_permissions {
313                 if signature == "" || timestamp == "" {
314                         http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
315                         return
316                 } else if IsExpired(timestamp) {
317                         http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
318                         return
319                 } else {
320                         validsig := MakePermSignature(hash, GetApiToken(req), timestamp)
321                         if signature != validsig {
322                                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
323                                 return
324                         }
325                 }
326         }
327
328         block, err := GetBlock(hash)
329         if err != nil {
330                 // This type assertion is safe because the only errors
331                 // GetBlock can return are CorruptError or NotFoundError.
332                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
333                 return
334         }
335
336         _, err = resp.Write(block)
337         if err != nil {
338                 log.Printf("GetBlockHandler: writing response: %s", err)
339         }
340
341         return
342 }
343
344 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
345         hash := mux.Vars(req)["hash"]
346
347         // Read the block data to be stored.
348         // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
349         //
350         // Note: because req.Body is a buffered Reader, each Read() call will
351         // collect only the data in the network buffer (typically 16384 bytes),
352         // even if it is passed a much larger slice.
353         //
354         // Instead, call ReadAtMost to read data from the socket
355         // repeatedly until either EOF or BLOCKSIZE bytes have been read.
356         //
357         if buf, err := ReadAtMost(req.Body, BLOCKSIZE); err == nil {
358                 if err := PutBlock(buf, hash); err == nil {
359                         // Success; sign the locator and return it to the client.
360                         api_token := GetApiToken(req)
361                         expiry := time.Now().Add(permission_ttl)
362                         signed_loc := SignLocator(hash, api_token, expiry)
363                         resp.Write([]byte(signed_loc))
364                 } else {
365                         ke := err.(*KeepError)
366                         http.Error(resp, ke.Error(), ke.HTTPCode)
367                 }
368         } else {
369                 log.Println("error reading request: ", err)
370                 errmsg := err.Error()
371                 if err == ReadErrorTooLong {
372                         // Use a more descriptive error message that includes
373                         // the maximum request size.
374                         errmsg = fmt.Sprintf("Max request size %d bytes", BLOCKSIZE)
375                 }
376                 http.Error(resp, errmsg, 500)
377         }
378 }
379
380 // IndexHandler
381 //     A HandleFunc to address /index and /index/{prefix} requests.
382 //
383 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
384         prefix := mux.Vars(req)["prefix"]
385
386         // Only the data manager may issue /index requests,
387         // and only if enforce_permissions is enabled.
388         // All other requests return 403 Permission denied.
389         api_token := GetApiToken(req)
390         if !enforce_permissions ||
391                 api_token == "" ||
392                 data_manager_token != api_token {
393                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
394                 return
395         }
396         var index string
397         for _, vol := range KeepVM.Volumes() {
398                 index = index + vol.Index(prefix)
399         }
400         resp.Write([]byte(index))
401 }
402
403 // StatusHandler
404 //     Responds to /status.json requests with the current node status,
405 //     described in a JSON structure.
406 //
407 //     The data given in a status.json response includes:
408 //        volumes - a list of Keep volumes currently in use by this server
409 //          each volume is an object with the following fields:
410 //            * mount_point
411 //            * device_num (an integer identifying the underlying filesystem)
412 //            * bytes_free
413 //            * bytes_used
414 //
415 type VolumeStatus struct {
416         MountPoint string `json:"mount_point"`
417         DeviceNum  uint64 `json:"device_num"`
418         BytesFree  uint64 `json:"bytes_free"`
419         BytesUsed  uint64 `json:"bytes_used"`
420 }
421
422 type NodeStatus struct {
423         Volumes []*VolumeStatus `json:"volumes"`
424 }
425
426 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
427         st := GetNodeStatus()
428         if jstat, err := json.Marshal(st); err == nil {
429                 resp.Write(jstat)
430         } else {
431                 log.Printf("json.Marshal: %s\n", err)
432                 log.Printf("NodeStatus = %v\n", st)
433                 http.Error(resp, err.Error(), 500)
434         }
435 }
436
437 // GetNodeStatus
438 //     Returns a NodeStatus struct describing this Keep
439 //     node's current status.
440 //
441 func GetNodeStatus() *NodeStatus {
442         st := new(NodeStatus)
443
444         st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
445         for i, vol := range KeepVM.Volumes() {
446                 st.Volumes[i] = vol.Status()
447         }
448         return st
449 }
450
451 // GetVolumeStatus
452 //     Returns a VolumeStatus describing the requested volume.
453 //
454 func GetVolumeStatus(volume string) *VolumeStatus {
455         var fs syscall.Statfs_t
456         var devnum uint64
457
458         if fi, err := os.Stat(volume); err == nil {
459                 devnum = fi.Sys().(*syscall.Stat_t).Dev
460         } else {
461                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
462                 return nil
463         }
464
465         err := syscall.Statfs(volume, &fs)
466         if err != nil {
467                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
468                 return nil
469         }
470         // These calculations match the way df calculates disk usage:
471         // "free" space is measured by fs.Bavail, but "used" space
472         // uses fs.Blocks - fs.Bfree.
473         free := fs.Bavail * uint64(fs.Bsize)
474         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
475         return &VolumeStatus{volume, devnum, free, used}
476 }
477
478 func GetBlock(hash string) ([]byte, error) {
479         // Attempt to read the requested hash from a keep volume.
480         for _, vol := range KeepVM.Volumes() {
481                 if buf, err := vol.Get(hash); err != nil {
482                         // IsNotExist is an expected error and may be ignored.
483                         // (If all volumes report IsNotExist, we return a NotFoundError)
484                         // A CorruptError should be returned immediately.
485                         // Any other errors should be logged but we continue trying to
486                         // read.
487                         switch {
488                         case os.IsNotExist(err):
489                                 continue
490                         default:
491                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
492                         }
493                 } else {
494                         // Double check the file checksum.
495                         //
496                         filehash := fmt.Sprintf("%x", md5.Sum(buf))
497                         if filehash != hash {
498                                 // TODO(twp): this condition probably represents a bad disk and
499                                 // should raise major alarm bells for an administrator: e.g.
500                                 // they should be sent directly to an event manager at high
501                                 // priority or logged as urgent problems.
502                                 //
503                                 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
504                                         vol, hash, filehash)
505                                 return buf, CorruptError
506                         }
507                         // Success!
508                         return buf, nil
509                 }
510         }
511
512         log.Printf("%s: not found on any volumes, giving up\n", hash)
513         return nil, NotFoundError
514 }
515
516 /* PutBlock(block, hash)
517    Stores the BLOCK (identified by the content id HASH) in Keep.
518
519    The MD5 checksum of the block must be identical to the content id HASH.
520    If not, an error is returned.
521
522    PutBlock stores the BLOCK on the first Keep volume with free space.
523    A failure code is returned to the user only if all volumes fail.
524
525    On success, PutBlock returns nil.
526    On failure, it returns a KeepError with one of the following codes:
527
528    400 Collision
529           A different block with the same hash already exists on this
530           Keep server.
531    401 MD5Fail
532           The MD5 hash of the BLOCK does not match the argument HASH.
533    503 Full
534           There was not enough space left in any Keep volume to store
535           the object.
536    500 Fail
537           The object could not be stored for some other reason (e.g.
538           all writes failed). The text of the error message should
539           provide as much detail as possible.
540 */
541
542 func PutBlock(block []byte, hash string) error {
543         // Check that BLOCK's checksum matches HASH.
544         blockhash := fmt.Sprintf("%x", md5.Sum(block))
545         if blockhash != hash {
546                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
547                 return MD5Error
548         }
549
550         // If we already have a block on disk under this identifier, return
551         // success (but check for MD5 collisions).
552         // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
553         // In either case, we want to write our new (good) block to disk,
554         // so there is nothing special to do if err != nil.
555         if oldblock, err := GetBlock(hash); err == nil {
556                 if bytes.Compare(block, oldblock) == 0 {
557                         return nil
558                 } else {
559                         return CollisionError
560                 }
561         }
562
563         // Choose a Keep volume to write to.
564         // If this volume fails, try all of the volumes in order.
565         vol := KeepVM.Choose()
566         if err := vol.Put(hash, block); err == nil {
567                 return nil // success!
568         } else {
569                 allFull := true
570                 for _, vol := range KeepVM.Volumes() {
571                         err := vol.Put(hash, block)
572                         if err == nil {
573                                 return nil // success!
574                         }
575                         if err != FullError {
576                                 // The volume is not full but the write did not succeed.
577                                 // Report the error and continue trying.
578                                 allFull = false
579                                 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
580                         }
581                 }
582
583                 if allFull {
584                         log.Printf("all Keep volumes full")
585                         return FullError
586                 } else {
587                         log.Printf("all Keep volumes failed")
588                         return GenericError
589                 }
590         }
591 }
592
593 // ReadAtMost
594 //     Reads bytes repeatedly from an io.Reader until either
595 //     encountering EOF, or the maxbytes byte limit has been reached.
596 //     Returns a byte slice of the bytes that were read.
597 //
598 //     If the reader contains more than maxbytes, returns a nil slice
599 //     and an error.
600 //
601 func ReadAtMost(r io.Reader, maxbytes int) ([]byte, error) {
602         // Attempt to read one more byte than maxbytes.
603         lr := io.LimitReader(r, int64(maxbytes+1))
604         buf, err := ioutil.ReadAll(lr)
605         if len(buf) > maxbytes {
606                 return nil, ReadErrorTooLong
607         }
608         return buf, err
609 }
610
611 // IsValidLocator
612 //     Return true if the specified string is a valid Keep locator.
613 //     When Keep is extended to support hash types other than MD5,
614 //     this should be updated to cover those as well.
615 //
616 func IsValidLocator(loc string) bool {
617         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
618         if err == nil {
619                 return match
620         }
621         log.Printf("IsValidLocator: %s\n", err)
622         return false
623 }
624
625 // GetApiToken returns the OAuth token from the Authorization
626 // header of a HTTP request, or an empty string if no matching
627 // token is found.
628 func GetApiToken(req *http.Request) string {
629         if auth, ok := req.Header["Authorization"]; ok {
630                 if strings.HasPrefix(auth[0], "OAuth ") {
631                         return auth[0][6:]
632                 }
633         }
634         return ""
635 }
636
637 // IsExpired returns true if the given Unix timestamp (expressed as a
638 // hexadecimal string) is in the past, or if timestamp_hex cannot be
639 // parsed as a hexadecimal string.
640 func IsExpired(timestamp_hex string) bool {
641         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
642         if err != nil {
643                 log.Printf("IsExpired: %s\n", err)
644                 return true
645         }
646         return time.Unix(ts, 0).Before(time.Now())
647 }