2328: more 80-column fixes
[arvados.git] / services / keep / src / keep / keep.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/md5"
7         "encoding/json"
8         "errors"
9         "flag"
10         "fmt"
11         "github.com/gorilla/mux"
12         "io"
13         "io/ioutil"
14         "log"
15         "net/http"
16         "os"
17         "regexp"
18         "strconv"
19         "strings"
20         "syscall"
21         "time"
22 )
23
24 // ======================
25 // Configuration settings
26 //
27 // TODO(twp): make all of these configurable via command line flags
28 // and/or configuration file settings.
29
30 // Default TCP address on which to listen for requests.
31 // Initialized by the --listen flag.
32 const DEFAULT_ADDR = ":25107"
33
34 // A Keep "block" is 64MB.
35 const BLOCKSIZE = 64 * 1024 * 1024
36
37 // A Keep volume must have at least MIN_FREE_KILOBYTES available
38 // in order to permit writes.
39 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
40
41 var PROC_MOUNTS = "/proc/mounts"
42
43 // The Keep VolumeManager maintains a list of available volumes.
44 // Initialized by the --volumes flag (or by FindKeepVolumes).
45 var KeepVM VolumeManager
46
47 // enforce_permissions controls whether permission signatures
48 // should be enforced (affecting GET and DELETE requests).
49 // Initialized by the --enforce-permissions flag.
50 var enforce_permissions bool
51
52 // permission_ttl is the time duration for which new permission
53 // signatures (returned by PUT requests) will be valid.
54 // Initialized by the --permission-ttl flag.
55 var permission_ttl time.Duration
56
57 // data_manager_token represents the API token used by the
58 // Data Manager, and is required on certain privileged operations.
59 // Initialized by the --data-manager-token-file flag.
60 var data_manager_token string
61
62 // ==========
63 // Error types.
64 //
65 type KeepError struct {
66         HTTPCode int
67         ErrMsg   string
68 }
69
70 var (
71         CollisionError  = &KeepError{400, "Collision"}
72         MD5Error        = &KeepError{401, "MD5 Failure"}
73         PermissionError = &KeepError{401, "Permission denied"}
74         CorruptError    = &KeepError{402, "Corruption"}
75         ExpiredError    = &KeepError{403, "Expired permission signature"}
76         NotFoundError   = &KeepError{404, "Not Found"}
77         GenericError    = &KeepError{500, "Fail"}
78         FullError       = &KeepError{503, "Full"}
79         TooLongError    = &KeepError{504, "Too Long"}
80 )
81
82 func (e *KeepError) Error() string {
83         return e.ErrMsg
84 }
85
86 // This error is returned by ReadAtMost if the available
87 // data exceeds BLOCKSIZE bytes.
88 var ReadErrorTooLong = errors.New("Too long")
89
90 func main() {
91         // Parse command-line flags:
92         //
93         // -listen=ipaddr:port
94         //    Interface on which to listen for requests. Use :port without
95         //    an ipaddr to listen on all network interfaces.
96         //    Examples:
97         //      -listen=127.0.0.1:4949
98         //      -listen=10.0.1.24:8000
99         //      -listen=:25107 (to listen to port 25107 on all interfaces)
100         //
101         // -volumes
102         //    A comma-separated list of directories to use as Keep volumes.
103         //    Example:
104         //      -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
105         //
106         //    If -volumes is empty or is not present, Keep will select volumes
107         //    by looking at currently mounted filesystems for /keep top-level
108         //    directories.
109
110         var (
111                 data_manager_token_file string
112                 listen                  string
113                 permission_key_file     string
114                 permission_ttl_sec      int
115                 serialize_io            bool
116                 volumearg               string
117         )
118         flag.StringVar(
119                 &data_manager_token_file,
120                 "data-manager-token-file",
121                 "",
122                 "File with the API token used by the Data Manager. All DELETE "+
123                         "requests or GET /index requests must carry this token.")
124         flag.BoolVar(
125                 &enforce_permissions,
126                 "enforce-permissions",
127                 false,
128                 "Enforce permission signatures on requests.")
129         flag.StringVar(
130                 &listen,
131                 "listen",
132                 DEFAULT_ADDR,
133                 "Interface on which to listen for requests, in the format "+
134                         "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
135                         "to listen on all network interfaces.")
136         flag.StringVar(
137                 &permission_key_file,
138                 "permission-key-file",
139                 "",
140                 "File containing the secret key for generating and verifying "+
141                         "permission signatures.")
142         flag.IntVar(
143                 &permission_ttl_sec,
144                 "permission-ttl",
145                 300,
146                 "Expiration time (in seconds) for newly generated permission "+
147                         "signatures.")
148         flag.BoolVar(
149                 &serialize_io,
150                 "serialize",
151                 false,
152                 "If set, all read and write operations on local Keep volumes will "+
153                         "be serialized.")
154         flag.StringVar(
155                 &volumearg,
156                 "volumes",
157                 "",
158                 "Comma-separated list of directories to use for Keep volumes, "+
159                         "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
160                         "supplied, Keep will scan mounted filesystems for volumes "+
161                         "with a /keep top-level directory.")
162         flag.Parse()
163
164         // Look for local keep volumes.
165         var keepvols []string
166         if volumearg == "" {
167                 // TODO(twp): decide whether this is desirable default behavior.
168                 // In production we may want to require the admin to specify
169                 // Keep volumes explicitly.
170                 keepvols = FindKeepVolumes()
171         } else {
172                 keepvols = strings.Split(volumearg, ",")
173         }
174
175         // Check that the specified volumes actually exist.
176         var goodvols []Volume = nil
177         for _, v := range keepvols {
178                 if _, err := os.Stat(v); err == nil {
179                         log.Println("adding Keep volume:", v)
180                         newvol := MakeUnixVolume(v, serialize_io)
181                         goodvols = append(goodvols, &newvol)
182                 } else {
183                         log.Printf("bad Keep volume: %s\n", err)
184                 }
185         }
186
187         if len(goodvols) == 0 {
188                 log.Fatal("could not find any keep volumes")
189         }
190
191         // Initialize data manager token and permission key.
192         // If these tokens are specified but cannot be read,
193         // raise a fatal error.
194         if data_manager_token_file != "" {
195                 if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
196                         data_manager_token = strings.TrimSpace(string(buf))
197                 } else {
198                         log.Fatalf("reading data manager token: %s\n", err)
199                 }
200         }
201         if permission_key_file != "" {
202                 if buf, err := ioutil.ReadFile(permission_key_file); err == nil {
203                         PermissionSecret = bytes.TrimSpace(buf)
204                 } else {
205                         log.Fatalf("reading permission key: %s\n", err)
206                 }
207         }
208
209         // Initialize permission TTL
210         permission_ttl = time.Duration(permission_ttl_sec) * time.Second
211
212         // If --enforce-permissions is true, we must have a permission key
213         // to continue.
214         if enforce_permissions && PermissionSecret == nil {
215                 log.Fatal("--enforce-permissions requires a permission key")
216         }
217
218         // Start a round-robin VolumeManager with the volumes we have found.
219         KeepVM = MakeRRVolumeManager(goodvols)
220
221         // Tell the built-in HTTP server to direct all requests to the REST
222         // router.
223         http.Handle("/", NewRESTRouter())
224
225         // Start listening for requests.
226         http.ListenAndServe(listen, nil)
227 }
228
229 // NewRESTRouter
230 //     Returns a mux.Router that passes GET and PUT requests to the
231 //     appropriate handlers.
232 //
233 func NewRESTRouter() *mux.Router {
234         rest := mux.NewRouter()
235         rest.HandleFunc(
236                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
237         rest.HandleFunc(
238                 `/{hash:[0-9a-f]{32}}+A{signature:[0-9a-f]+}@{timestamp:[0-9a-f]+}`,
239                 GetBlockHandler).Methods("GET", "HEAD")
240         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
241         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
242         rest.HandleFunc(
243                 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
244         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
245         return rest
246 }
247
248 // FindKeepVolumes
249 //     Returns a list of Keep volumes mounted on this system.
250 //
251 //     A Keep volume is a normal or tmpfs volume with a /keep
252 //     directory at the top level of the mount point.
253 //
254 func FindKeepVolumes() []string {
255         vols := make([]string, 0)
256
257         if f, err := os.Open(PROC_MOUNTS); err != nil {
258                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
259         } else {
260                 scanner := bufio.NewScanner(f)
261                 for scanner.Scan() {
262                         args := strings.Fields(scanner.Text())
263                         dev, mount := args[0], args[1]
264                         if mount != "/" &&
265                                 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
266                                 keep := mount + "/keep"
267                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
268                                         vols = append(vols, keep)
269                                 }
270                         }
271                 }
272                 if err := scanner.Err(); err != nil {
273                         log.Fatal(err)
274                 }
275         }
276         return vols
277 }
278
279 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
280         hash := mux.Vars(req)["hash"]
281         signature := mux.Vars(req)["signature"]
282         timestamp := mux.Vars(req)["timestamp"]
283
284         // If permission checking is in effect, verify this
285         // request's permission signature.
286         if enforce_permissions {
287                 if signature == "" || timestamp == "" {
288                         http.Error(w, PermissionError.Error(), PermissionError.HTTPCode)
289                         return
290                 } else if IsExpired(timestamp) {
291                         http.Error(w, ExpiredError.Error(), ExpiredError.HTTPCode)
292                         return
293                 } else {
294                         validsig := MakePermSignature(hash, GetApiToken(req), timestamp)
295                         if signature != validsig {
296                                 http.Error(w, PermissionError.Error(), PermissionError.HTTPCode)
297                                 return
298                         }
299                 }
300         }
301
302         block, err := GetBlock(hash)
303         if err != nil {
304                 http.Error(w, err.Error(), err.(*KeepError).HTTPCode)
305                 return
306         }
307
308         _, err = w.Write(block)
309         if err != nil {
310                 log.Printf("GetBlockHandler: writing response: %s", err)
311         }
312
313         return
314 }
315
316 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
317         hash := mux.Vars(req)["hash"]
318
319         // Read the block data to be stored.
320         // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
321         //
322         // Note: because req.Body is a buffered Reader, each Read() call will
323         // collect only the data in the network buffer (typically 16384 bytes),
324         // even if it is passed a much larger slice.
325         //
326         // Instead, call ReadAtMost to read data from the socket
327         // repeatedly until either EOF or BLOCKSIZE bytes have been read.
328         //
329         if buf, err := ReadAtMost(req.Body, BLOCKSIZE); err == nil {
330                 if err := PutBlock(buf, hash); err == nil {
331                         // Success; sign the locator and return it to the client.
332                         api_token := GetApiToken(req)
333                         expiry := time.Now().Add(permission_ttl)
334                         signed_loc := SignLocator(hash, api_token, expiry)
335                         w.Write([]byte(signed_loc))
336                 } else {
337                         ke := err.(*KeepError)
338                         http.Error(w, ke.Error(), ke.HTTPCode)
339                 }
340         } else {
341                 log.Println("error reading request: ", err)
342                 errmsg := err.Error()
343                 if err == ReadErrorTooLong {
344                         // Use a more descriptive error message that includes
345                         // the maximum request size.
346                         errmsg = fmt.Sprintf("Max request size %d bytes", BLOCKSIZE)
347                 }
348                 http.Error(w, errmsg, 500)
349         }
350 }
351
352 // IndexHandler
353 //     A HandleFunc to address /index and /index/{prefix} requests.
354 //
355 func IndexHandler(w http.ResponseWriter, req *http.Request) {
356         prefix := mux.Vars(req)["prefix"]
357
358         // Only the data manager may issue /index requests,
359         // and only if enforce_permissions is enabled.
360         // All other requests return 403 Permission denied.
361         api_token := GetApiToken(req)
362         if !enforce_permissions ||
363                 api_token == "" ||
364                 data_manager_token != GetApiToken(req) {
365                 http.Error(w, PermissionError.Error(), PermissionError.HTTPCode)
366                 return
367         }
368         var index string
369         for _, vol := range KeepVM.Volumes() {
370                 index = index + vol.Index(prefix)
371         }
372         w.Write([]byte(index))
373 }
374
375 // StatusHandler
376 //     Responds to /status.json requests with the current node status,
377 //     described in a JSON structure.
378 //
379 //     The data given in a status.json response includes:
380 //        volumes - a list of Keep volumes currently in use by this server
381 //          each volume is an object with the following fields:
382 //            * mount_point
383 //            * device_num (an integer identifying the underlying filesystem)
384 //            * bytes_free
385 //            * bytes_used
386 //
387 type VolumeStatus struct {
388         MountPoint string `json:"mount_point"`
389         DeviceNum  uint64 `json:"device_num"`
390         BytesFree  uint64 `json:"bytes_free"`
391         BytesUsed  uint64 `json:"bytes_used"`
392 }
393
394 type NodeStatus struct {
395         Volumes []*VolumeStatus `json:"volumes"`
396 }
397
398 func StatusHandler(w http.ResponseWriter, req *http.Request) {
399         st := GetNodeStatus()
400         if jstat, err := json.Marshal(st); err == nil {
401                 w.Write(jstat)
402         } else {
403                 log.Printf("json.Marshal: %s\n", err)
404                 log.Printf("NodeStatus = %v\n", st)
405                 http.Error(w, err.Error(), 500)
406         }
407 }
408
409 // GetNodeStatus
410 //     Returns a NodeStatus struct describing this Keep
411 //     node's current status.
412 //
413 func GetNodeStatus() *NodeStatus {
414         st := new(NodeStatus)
415
416         st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
417         for i, vol := range KeepVM.Volumes() {
418                 st.Volumes[i] = vol.Status()
419         }
420         return st
421 }
422
423 // GetVolumeStatus
424 //     Returns a VolumeStatus describing the requested volume.
425 //
426 func GetVolumeStatus(volume string) *VolumeStatus {
427         var fs syscall.Statfs_t
428         var devnum uint64
429
430         if fi, err := os.Stat(volume); err == nil {
431                 devnum = fi.Sys().(*syscall.Stat_t).Dev
432         } else {
433                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
434                 return nil
435         }
436
437         err := syscall.Statfs(volume, &fs)
438         if err != nil {
439                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
440                 return nil
441         }
442         // These calculations match the way df calculates disk usage:
443         // "free" space is measured by fs.Bavail, but "used" space
444         // uses fs.Blocks - fs.Bfree.
445         free := fs.Bavail * uint64(fs.Bsize)
446         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
447         return &VolumeStatus{volume, devnum, free, used}
448 }
449
450 func GetBlock(hash string) ([]byte, error) {
451         // Attempt to read the requested hash from a keep volume.
452         for _, vol := range KeepVM.Volumes() {
453                 if buf, err := vol.Get(hash); err != nil {
454                         // IsNotExist is an expected error and may be ignored.
455                         // (If all volumes report IsNotExist, we return a NotFoundError)
456                         // A CorruptError should be returned immediately.
457                         // Any other errors should be logged but we continue trying to
458                         // read.
459                         switch {
460                         case os.IsNotExist(err):
461                                 continue
462                         default:
463                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
464                         }
465                 } else {
466                         // Double check the file checksum.
467                         //
468                         filehash := fmt.Sprintf("%x", md5.Sum(buf))
469                         if filehash != hash {
470                                 // TODO(twp): this condition probably represents a bad disk and
471                                 // should raise major alarm bells for an administrator: e.g.
472                                 // they should be sent directly to an event manager at high
473                                 // priority or logged as urgent problems.
474                                 //
475                                 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
476                                         vol, hash, filehash)
477                                 return buf, CorruptError
478                         }
479                         // Success!
480                         return buf, nil
481                 }
482         }
483
484         log.Printf("%s: not found on any volumes, giving up\n", hash)
485         return nil, NotFoundError
486 }
487
488 /* PutBlock(block, hash)
489    Stores the BLOCK (identified by the content id HASH) in Keep.
490
491    The MD5 checksum of the block must be identical to the content id HASH.
492    If not, an error is returned.
493
494    PutBlock stores the BLOCK on the first Keep volume with free space.
495    A failure code is returned to the user only if all volumes fail.
496
497    On success, PutBlock returns nil.
498    On failure, it returns a KeepError with one of the following codes:
499
500    400 Collision
501           A different block with the same hash already exists on this
502           Keep server.
503    401 MD5Fail
504           The MD5 hash of the BLOCK does not match the argument HASH.
505    503 Full
506           There was not enough space left in any Keep volume to store
507           the object.
508    500 Fail
509           The object could not be stored for some other reason (e.g.
510           all writes failed). The text of the error message should
511           provide as much detail as possible.
512 */
513
514 func PutBlock(block []byte, hash string) error {
515         // Check that BLOCK's checksum matches HASH.
516         blockhash := fmt.Sprintf("%x", md5.Sum(block))
517         if blockhash != hash {
518                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
519                 return MD5Error
520         }
521
522         // If we already have a block on disk under this identifier, return
523         // success (but check for MD5 collisions).
524         // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
525         // In either case, we want to write our new (good) block to disk,
526         // so there is nothing special to do if err != nil.
527         if oldblock, err := GetBlock(hash); err == nil {
528                 if bytes.Compare(block, oldblock) == 0 {
529                         return nil
530                 } else {
531                         return CollisionError
532                 }
533         }
534
535         // Choose a Keep volume to write to.
536         // If this volume fails, try all of the volumes in order.
537         vol := KeepVM.Choose()
538         if err := vol.Put(hash, block); err == nil {
539                 return nil // success!
540         } else {
541                 allFull := true
542                 for _, vol := range KeepVM.Volumes() {
543                         err := vol.Put(hash, block)
544                         if err == nil {
545                                 return nil // success!
546                         }
547                         if err != FullError {
548                                 // The volume is not full but the write did not succeed.
549                                 // Report the error and continue trying.
550                                 allFull = false
551                                 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
552                         }
553                 }
554
555                 if allFull {
556                         log.Printf("all Keep volumes full")
557                         return FullError
558                 } else {
559                         log.Printf("all Keep volumes failed")
560                         return GenericError
561                 }
562         }
563 }
564
565 // ReadAtMost
566 //     Reads bytes repeatedly from an io.Reader until either
567 //     encountering EOF, or the maxbytes byte limit has been reached.
568 //     Returns a byte slice of the bytes that were read.
569 //
570 //     If the reader contains more than maxbytes, returns a nil slice
571 //     and an error.
572 //
573 func ReadAtMost(r io.Reader, maxbytes int) ([]byte, error) {
574         // Attempt to read one more byte than maxbytes.
575         lr := io.LimitReader(r, int64(maxbytes+1))
576         buf, err := ioutil.ReadAll(lr)
577         if len(buf) > maxbytes {
578                 return nil, ReadErrorTooLong
579         }
580         return buf, err
581 }
582
583 // IsValidLocator
584 //     Return true if the specified string is a valid Keep locator.
585 //     When Keep is extended to support hash types other than MD5,
586 //     this should be updated to cover those as well.
587 //
588 func IsValidLocator(loc string) bool {
589         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
590         if err == nil {
591                 return match
592         }
593         log.Printf("IsValidLocator: %s\n", err)
594         return false
595 }
596
597 // GetApiToken returns the OAuth token from the Authorization
598 // header of a HTTP request, or an empty string if no matching
599 // token is found.
600 func GetApiToken(req *http.Request) string {
601         if auth, ok := req.Header["Authorization"]; ok {
602                 if strings.HasPrefix(auth[0], "OAuth ") {
603                         return auth[0][6:]
604                 }
605         }
606         return ""
607 }
608
609 // IsExpired returns true if the given Unix timestamp (expressed as a
610 // hexadecimal string) is in the past.
611 func IsExpired(timestamp_hex string) bool {
612         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
613         if err != nil {
614                 log.Printf("IsExpired: %s\n", err)
615                 return true
616         }
617         return time.Unix(ts, 0).Before(time.Now())
618 }