2328: do not expose keys to /bin/ps
[arvados.git] / services / keep / src / keep / keep.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/md5"
7         "encoding/json"
8         "errors"
9         "flag"
10         "fmt"
11         "github.com/gorilla/mux"
12         "io"
13         "io/ioutil"
14         "log"
15         "net/http"
16         "os"
17         "regexp"
18         "strconv"
19         "strings"
20         "syscall"
21         "time"
22 )
23
24 // ======================
25 // Configuration settings
26 //
27 // TODO(twp): make all of these configurable via command line flags
28 // and/or configuration file settings.
29
30 // Default TCP address on which to listen for requests.
31 const DEFAULT_ADDR = ":25107"
32
33 // A Keep "block" is 64MB.
34 const BLOCKSIZE = 64 * 1024 * 1024
35
36 // A Keep volume must have at least MIN_FREE_KILOBYTES available
37 // in order to permit writes.
38 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
39
40 var PROC_MOUNTS = "/proc/mounts"
41
42 // The Keep VolumeManager maintains a list of available volumes.
43 var KeepVM VolumeManager
44
45 // enforce_permissions controls whether permission signatures
46 // should be enforced (affecting GET and DELETE requests)
47 var enforce_permissions bool
48
49 // permission_ttl is the time duration (in seconds) for which
50 // new permission signatures (returned by PUT requests) will be
51 // valid.
52 var permission_ttl int
53
54 // data_manager_token represents the API token used by the
55 // Data Manager, and is required on certain privileged operations.
56 var data_manager_token string
57
58 // ==========
59 // Error types.
60 //
61 type KeepError struct {
62         HTTPCode int
63         ErrMsg   string
64 }
65
66 var (
67         CollisionError  = &KeepError{400, "Collision"}
68         MD5Error        = &KeepError{401, "MD5 Failure"}
69         PermissionError = &KeepError{401, "Permission denied"}
70         CorruptError    = &KeepError{402, "Corruption"}
71         ExpiredError    = &KeepError{403, "Expired permission signature"}
72         NotFoundError   = &KeepError{404, "Not Found"}
73         GenericError    = &KeepError{500, "Fail"}
74         FullError       = &KeepError{503, "Full"}
75         TooLongError    = &KeepError{504, "Too Long"}
76 )
77
78 func (e *KeepError) Error() string {
79         return e.ErrMsg
80 }
81
82 // This error is returned by ReadAtMost if the available
83 // data exceeds BLOCKSIZE bytes.
84 var ReadErrorTooLong = errors.New("Too long")
85
86 func main() {
87         // Parse command-line flags:
88         //
89         // -listen=ipaddr:port
90         //    Interface on which to listen for requests. Use :port without
91         //    an ipaddr to listen on all network interfaces.
92         //    Examples:
93         //      -listen=127.0.0.1:4949
94         //      -listen=10.0.1.24:8000
95         //      -listen=:25107 (to listen to port 25107 on all interfaces)
96         //
97         // -volumes
98         //    A comma-separated list of directories to use as Keep volumes.
99         //    Example:
100         //      -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
101         //
102         //    If -volumes is empty or is not present, Keep will select volumes
103         //    by looking at currently mounted filesystems for /keep top-level
104         //    directories.
105
106         var data_manager_token_file, listen, permission_key_file, volumearg string
107         var serialize_io bool
108         flag.StringVar(
109                 &data_manager_token_file,
110                 "data-manager-token-file",
111                 "",
112                 "File with the API token used by the Data Manager. All DELETE requests or unqualified GET /index requests must carry this token.")
113         flag.BoolVar(
114                 &enforce_permissions,
115                 "enforce-permissions",
116                 false,
117                 "Enforce permission signatures on requests.")
118         flag.StringVar(
119                 &listen,
120                 "listen",
121                 DEFAULT_ADDR,
122                 "interface on which to listen for requests, in the format ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port to listen on all network interfaces.")
123         flag.StringVar(
124                 &permission_key_file,
125                 "permission-key-file",
126                 "",
127                 "File containing the secret key for generating and verifying permission signatures.")
128         flag.IntVar(
129                 &permission_ttl,
130                 "permission-ttl",
131                 300,
132                 "Expiration time (in seconds) for newly generated permission signatures.")
133         flag.BoolVar(
134                 &serialize_io,
135                 "serialize",
136                 false,
137                 "If set, all read and write operations on local Keep volumes will be serialized.")
138         flag.StringVar(
139                 &volumearg,
140                 "volumes",
141                 "",
142                 "Comma-separated list of directories to use for Keep volumes, e.g. -volumes=/var/keep1,/var/keep2. If empty or not supplied, Keep will scan mounted filesystems for volumes with a /keep top-level directory.")
143         flag.Parse()
144
145         // Look for local keep volumes.
146         var keepvols []string
147         if volumearg == "" {
148                 // TODO(twp): decide whether this is desirable default behavior.
149                 // In production we may want to require the admin to specify
150                 // Keep volumes explicitly.
151                 keepvols = FindKeepVolumes()
152         } else {
153                 keepvols = strings.Split(volumearg, ",")
154         }
155
156         // Check that the specified volumes actually exist.
157         var goodvols []Volume = nil
158         for _, v := range keepvols {
159                 if _, err := os.Stat(v); err == nil {
160                         log.Println("adding Keep volume:", v)
161                         newvol := MakeUnixVolume(v, serialize_io)
162                         goodvols = append(goodvols, &newvol)
163                 } else {
164                         log.Printf("bad Keep volume: %s\n", err)
165                 }
166         }
167
168         if len(goodvols) == 0 {
169                 log.Fatal("could not find any keep volumes")
170         }
171
172         // Initialize data manager token and permission key.
173         if data_manager_token_file != "" {
174                 if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
175                         data_manager_token = strings.TrimSpace(string(buf))
176                 } else {
177                         log.Printf("reading data_manager_token: %s\n", err)
178                 }
179         }
180         if permission_key_file != "" {
181                 if buf, err := ioutil.ReadFile(permission_key_file); err == nil {
182                         PermissionSecret = bytes.TrimSpace(buf)
183                 } else {
184                         log.Printf("reading data_manager_token: %s\n", err)
185                 }
186         }
187
188         // If --enforce-permissions is true, we must have a permission key to continue.
189         if enforce_permissions && PermissionSecret == nil {
190                 log.Fatal("--enforce-permissions requires a permission key")
191         }
192
193         // Start a round-robin VolumeManager with the volumes we have found.
194         KeepVM = MakeRRVolumeManager(goodvols)
195
196         // Tell the built-in HTTP server to direct all requests to the REST
197         // router.
198         http.Handle("/", NewRESTRouter())
199
200         // Start listening for requests.
201         http.ListenAndServe(listen, nil)
202 }
203
204 // NewRESTRouter
205 //     Returns a mux.Router that passes GET and PUT requests to the
206 //     appropriate handlers.
207 //
208 func NewRESTRouter() *mux.Router {
209         rest := mux.NewRouter()
210         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
211         rest.HandleFunc(`/{hash:[0-9a-f]{32}}+A{signature:[0-9a-f]+}@{timestamp:[0-9a-f]+}`, GetBlockHandler).Methods("GET", "HEAD")
212         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
213         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
214         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
215         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
216         return rest
217 }
218
219 // FindKeepVolumes
220 //     Returns a list of Keep volumes mounted on this system.
221 //
222 //     A Keep volume is a normal or tmpfs volume with a /keep
223 //     directory at the top level of the mount point.
224 //
225 func FindKeepVolumes() []string {
226         vols := make([]string, 0)
227
228         if f, err := os.Open(PROC_MOUNTS); err != nil {
229                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
230         } else {
231                 scanner := bufio.NewScanner(f)
232                 for scanner.Scan() {
233                         args := strings.Fields(scanner.Text())
234                         dev, mount := args[0], args[1]
235                         if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
236                                 keep := mount + "/keep"
237                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
238                                         vols = append(vols, keep)
239                                 }
240                         }
241                 }
242                 if err := scanner.Err(); err != nil {
243                         log.Fatal(err)
244                 }
245         }
246         return vols
247 }
248
249 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
250         hash := mux.Vars(req)["hash"]
251         signature := mux.Vars(req)["signature"]
252         timestamp := mux.Vars(req)["timestamp"]
253
254         // If permission checking is in effect, verify this
255         // request's permission signature.
256         if enforce_permissions {
257                 if signature == "" || timestamp == "" {
258                         http.Error(w, PermissionError.Error(), PermissionError.HTTPCode)
259                         return
260                 } else if IsExpired(timestamp) {
261                         http.Error(w, ExpiredError.Error(), ExpiredError.HTTPCode)
262                         return
263                 } else if signature != MakePermSignature(hash, GetApiToken(req), timestamp) {
264                         http.Error(w, PermissionError.Error(), PermissionError.HTTPCode)
265                         return
266                 }
267         }
268
269         block, err := GetBlock(hash)
270         if err != nil {
271                 http.Error(w, err.Error(), err.(*KeepError).HTTPCode)
272                 return
273         }
274
275         _, err = w.Write(block)
276         if err != nil {
277                 log.Printf("GetBlockHandler: writing response: %s", err)
278         }
279
280         return
281 }
282
283 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
284         hash := mux.Vars(req)["hash"]
285
286         // Read the block data to be stored.
287         // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
288         //
289         // Note: because req.Body is a buffered Reader, each Read() call will
290         // collect only the data in the network buffer (typically 16384 bytes),
291         // even if it is passed a much larger slice.
292         //
293         // Instead, call ReadAtMost to read data from the socket
294         // repeatedly until either EOF or BLOCKSIZE bytes have been read.
295         //
296         if buf, err := ReadAtMost(req.Body, BLOCKSIZE); err == nil {
297                 if err := PutBlock(buf, hash); err == nil {
298                         // Success; sign the locator and return it to the client.
299                         api_token := GetApiToken(req)
300                         expiry := time.Now().Add( // convert permission_ttl to time.Duration
301                                 time.Duration(permission_ttl) * time.Second)
302                         signed_loc := SignLocator(hash, api_token, expiry)
303                         w.Write([]byte(signed_loc))
304                 } else {
305                         ke := err.(*KeepError)
306                         http.Error(w, ke.Error(), ke.HTTPCode)
307                 }
308         } else {
309                 log.Println("error reading request: ", err)
310                 errmsg := err.Error()
311                 if err == ReadErrorTooLong {
312                         // Use a more descriptive error message that includes
313                         // the maximum request size.
314                         errmsg = fmt.Sprintf("Max request size %d bytes", BLOCKSIZE)
315                 }
316                 http.Error(w, errmsg, 500)
317         }
318 }
319
320 // IndexHandler
321 //     A HandleFunc to address /index and /index/{prefix} requests.
322 //
323 func IndexHandler(w http.ResponseWriter, req *http.Request) {
324         prefix := mux.Vars(req)["prefix"]
325
326         // Only the data manager may issue unqualified "GET /index" requests,
327         // and only if enforce_permissions is enabled.
328         // If the request is unauthenticated, or does not match the data manager's
329         // API token, return 403 Permission denied.
330         if prefix == "" {
331                 api_token := GetApiToken(req)
332                 if !enforce_permissions ||
333                         api_token == "" ||
334                         data_manager_token != GetApiToken(req) {
335                         http.Error(w, PermissionError.Error(), PermissionError.HTTPCode)
336                         return
337                 }
338         }
339         var index string
340         for _, vol := range KeepVM.Volumes() {
341                 index = index + vol.Index(prefix)
342         }
343         w.Write([]byte(index))
344 }
345
346 // StatusHandler
347 //     Responds to /status.json requests with the current node status,
348 //     described in a JSON structure.
349 //
350 //     The data given in a status.json response includes:
351 //        volumes - a list of Keep volumes currently in use by this server
352 //          each volume is an object with the following fields:
353 //            * mount_point
354 //            * device_num (an integer identifying the underlying filesystem)
355 //            * bytes_free
356 //            * bytes_used
357 //
358 type VolumeStatus struct {
359         MountPoint string `json:"mount_point"`
360         DeviceNum  uint64 `json:"device_num"`
361         BytesFree  uint64 `json:"bytes_free"`
362         BytesUsed  uint64 `json:"bytes_used"`
363 }
364
365 type NodeStatus struct {
366         Volumes []*VolumeStatus `json:"volumes"`
367 }
368
369 func StatusHandler(w http.ResponseWriter, req *http.Request) {
370         st := GetNodeStatus()
371         if jstat, err := json.Marshal(st); err == nil {
372                 w.Write(jstat)
373         } else {
374                 log.Printf("json.Marshal: %s\n", err)
375                 log.Printf("NodeStatus = %v\n", st)
376                 http.Error(w, err.Error(), 500)
377         }
378 }
379
380 // GetNodeStatus
381 //     Returns a NodeStatus struct describing this Keep
382 //     node's current status.
383 //
384 func GetNodeStatus() *NodeStatus {
385         st := new(NodeStatus)
386
387         st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
388         for i, vol := range KeepVM.Volumes() {
389                 st.Volumes[i] = vol.Status()
390         }
391         return st
392 }
393
394 // GetVolumeStatus
395 //     Returns a VolumeStatus describing the requested volume.
396 //
397 func GetVolumeStatus(volume string) *VolumeStatus {
398         var fs syscall.Statfs_t
399         var devnum uint64
400
401         if fi, err := os.Stat(volume); err == nil {
402                 devnum = fi.Sys().(*syscall.Stat_t).Dev
403         } else {
404                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
405                 return nil
406         }
407
408         err := syscall.Statfs(volume, &fs)
409         if err != nil {
410                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
411                 return nil
412         }
413         // These calculations match the way df calculates disk usage:
414         // "free" space is measured by fs.Bavail, but "used" space
415         // uses fs.Blocks - fs.Bfree.
416         free := fs.Bavail * uint64(fs.Bsize)
417         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
418         return &VolumeStatus{volume, devnum, free, used}
419 }
420
421 func GetBlock(hash string) ([]byte, error) {
422         // Attempt to read the requested hash from a keep volume.
423         for _, vol := range KeepVM.Volumes() {
424                 if buf, err := vol.Get(hash); err != nil {
425                         // IsNotExist is an expected error and may be ignored.
426                         // (If all volumes report IsNotExist, we return a NotFoundError)
427                         // A CorruptError should be returned immediately.
428                         // Any other errors should be logged but we continue trying to
429                         // read.
430                         switch {
431                         case os.IsNotExist(err):
432                                 continue
433                         default:
434                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
435                         }
436                 } else {
437                         // Double check the file checksum.
438                         //
439                         filehash := fmt.Sprintf("%x", md5.Sum(buf))
440                         if filehash != hash {
441                                 // TODO(twp): this condition probably represents a bad disk and
442                                 // should raise major alarm bells for an administrator: e.g.
443                                 // they should be sent directly to an event manager at high
444                                 // priority or logged as urgent problems.
445                                 //
446                                 log.Printf("%s: checksum mismatch for request %s (actual hash %s)\n",
447                                         vol, hash, filehash)
448                                 return buf, CorruptError
449                         }
450                         // Success!
451                         return buf, nil
452                 }
453         }
454
455         log.Printf("%s: not found on any volumes, giving up\n", hash)
456         return nil, NotFoundError
457 }
458
459 /* PutBlock(block, hash)
460    Stores the BLOCK (identified by the content id HASH) in Keep.
461
462    The MD5 checksum of the block must be identical to the content id HASH.
463    If not, an error is returned.
464
465    PutBlock stores the BLOCK on the first Keep volume with free space.
466    A failure code is returned to the user only if all volumes fail.
467
468    On success, PutBlock returns nil.
469    On failure, it returns a KeepError with one of the following codes:
470
471    400 Collision
472           A different block with the same hash already exists on this
473           Keep server.
474    401 MD5Fail
475           The MD5 hash of the BLOCK does not match the argument HASH.
476    503 Full
477           There was not enough space left in any Keep volume to store
478           the object.
479    500 Fail
480           The object could not be stored for some other reason (e.g.
481           all writes failed). The text of the error message should
482           provide as much detail as possible.
483 */
484
485 func PutBlock(block []byte, hash string) error {
486         // Check that BLOCK's checksum matches HASH.
487         blockhash := fmt.Sprintf("%x", md5.Sum(block))
488         if blockhash != hash {
489                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
490                 return MD5Error
491         }
492
493         // If we already have a block on disk under this identifier, return
494         // success (but check for MD5 collisions).
495         // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
496         // In either case, we want to write our new (good) block to disk, so there is
497         // nothing special to do if err != nil.
498         if oldblock, err := GetBlock(hash); err == nil {
499                 if bytes.Compare(block, oldblock) == 0 {
500                         return nil
501                 } else {
502                         return CollisionError
503                 }
504         }
505
506         // Choose a Keep volume to write to.
507         // If this volume fails, try all of the volumes in order.
508         vol := KeepVM.Choose()
509         if err := vol.Put(hash, block); err == nil {
510                 return nil // success!
511         } else {
512                 allFull := true
513                 for _, vol := range KeepVM.Volumes() {
514                         err := vol.Put(hash, block)
515                         if err == nil {
516                                 return nil // success!
517                         }
518                         if err != FullError {
519                                 // The volume is not full but the write did not succeed.
520                                 // Report the error and continue trying.
521                                 allFull = false
522                                 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
523                         }
524                 }
525
526                 if allFull {
527                         log.Printf("all Keep volumes full")
528                         return FullError
529                 } else {
530                         log.Printf("all Keep volumes failed")
531                         return GenericError
532                 }
533         }
534 }
535
536 // ReadAtMost
537 //     Reads bytes repeatedly from an io.Reader until either
538 //     encountering EOF, or the maxbytes byte limit has been reached.
539 //     Returns a byte slice of the bytes that were read.
540 //
541 //     If the reader contains more than maxbytes, returns a nil slice
542 //     and an error.
543 //
544 func ReadAtMost(r io.Reader, maxbytes int) ([]byte, error) {
545         // Attempt to read one more byte than maxbytes.
546         lr := io.LimitReader(r, int64(maxbytes+1))
547         buf, err := ioutil.ReadAll(lr)
548         if len(buf) > maxbytes {
549                 return nil, ReadErrorTooLong
550         }
551         return buf, err
552 }
553
554 // IsValidLocator
555 //     Return true if the specified string is a valid Keep locator.
556 //     When Keep is extended to support hash types other than MD5,
557 //     this should be updated to cover those as well.
558 //
559 func IsValidLocator(loc string) bool {
560         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
561         if err == nil {
562                 return match
563         }
564         log.Printf("IsValidLocator: %s\n", err)
565         return false
566 }
567
568 // GetApiToken returns the OAuth token from the Authorization
569 // header of a HTTP request, or an empty string if no matching
570 // token is found.
571 func GetApiToken(req *http.Request) string {
572         if auth, ok := req.Header["Authorization"]; ok {
573                 if strings.HasPrefix(auth[0], "OAuth ") {
574                         return auth[0][6:]
575                 }
576         }
577         return ""
578 }
579
580 // IsExpired returns true if the given Unix timestamp (expressed as a
581 // hexadecimal string) is in the past.
582 func IsExpired(timestamp_hex string) bool {
583         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
584         if err != nil {
585                 log.Printf("IsExpired: %s\n", err)
586                 return true
587         }
588         return time.Unix(ts, 0).Before(time.Now())
589 }