10 "github.com/gorilla/mux"
26 // ======================
27 // Configuration settings
29 // TODO(twp): make all of these configurable via command line flags
30 // and/or configuration file settings.
32 // Default TCP address on which to listen for requests.
33 // Initialized by the --listen flag.
34 const DEFAULT_ADDR = ":25107"
36 // A Keep "block" is 64MB.
37 const BLOCKSIZE = 64 * 1024 * 1024
39 // A Keep volume must have at least MIN_FREE_KILOBYTES available
40 // in order to permit writes.
41 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
43 var PROC_MOUNTS = "/proc/mounts"
45 // The Keep VolumeManager maintains a list of available volumes.
46 // Initialized by the --volumes flag (or by FindKeepVolumes).
47 var KeepVM VolumeManager
49 // enforce_permissions controls whether permission signatures
50 // should be enforced (affecting GET and DELETE requests).
51 // Initialized by the --enforce-permissions flag.
52 var enforce_permissions bool
54 // permission_ttl is the time duration for which new permission
55 // signatures (returned by PUT requests) will be valid.
56 // Initialized by the --permission-ttl flag.
57 var permission_ttl time.Duration
59 // data_manager_token represents the API token used by the
60 // Data Manager, and is required on certain privileged operations.
61 // Initialized by the --data-manager-token-file flag.
62 var data_manager_token string
67 type KeepError struct {
73 BadRequestError = &KeepError{400, "Bad Request"}
74 CollisionError = &KeepError{500, "Collision"}
75 RequestHashError= &KeepError{422, "Hash mismatch in request"}
76 PermissionError = &KeepError{403, "Forbidden"}
77 DiskHashError = &KeepError{500, "Hash mismatch in stored data"}
78 ExpiredError = &KeepError{401, "Expired permission signature"}
79 NotFoundError = &KeepError{404, "Not Found"}
80 GenericError = &KeepError{500, "Fail"}
81 FullError = &KeepError{503, "Full"}
82 TooLongError = &KeepError{504, "Timeout"}
85 func (e *KeepError) Error() string {
89 // TODO(twp): continue moving as much code as possible out of main
90 // so it can be effectively tested. Esp. handling and postprocessing
91 // of command line flags (identifying Keep volumes and initializing
92 // permission arguments).
95 log.Println("Keep started: pid", os.Getpid())
97 // Parse command-line flags:
99 // -listen=ipaddr:port
100 // Interface on which to listen for requests. Use :port without
101 // an ipaddr to listen on all network interfaces.
103 // -listen=127.0.0.1:4949
104 // -listen=10.0.1.24:8000
105 // -listen=:25107 (to listen to port 25107 on all interfaces)
108 // A comma-separated list of directories to use as Keep volumes.
110 // -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
112 // If -volumes is empty or is not present, Keep will select volumes
113 // by looking at currently mounted filesystems for /keep top-level
117 data_manager_token_file string
119 permission_key_file string
120 permission_ttl_sec int
126 &data_manager_token_file,
127 "data-manager-token-file",
129 "File with the API token used by the Data Manager. All DELETE "+
130 "requests or GET /index requests must carry this token.")
132 &enforce_permissions,
133 "enforce-permissions",
135 "Enforce permission signatures on requests.")
140 "Interface on which to listen for requests, in the format "+
141 "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
142 "to listen on all network interfaces.")
144 &permission_key_file,
145 "permission-key-file",
147 "File containing the secret key for generating and verifying "+
148 "permission signatures.")
153 "Expiration time (in seconds) for newly generated permission "+
159 "If set, all read and write operations on local Keep volumes will "+
165 "Comma-separated list of directories to use for Keep volumes, "+
166 "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
167 "supplied, Keep will scan mounted filesystems for volumes "+
168 "with a /keep top-level directory.")
174 "Path to write pid file")
178 // Look for local keep volumes.
179 var keepvols []string
181 // TODO(twp): decide whether this is desirable default behavior.
182 // In production we may want to require the admin to specify
183 // Keep volumes explicitly.
184 keepvols = FindKeepVolumes()
186 keepvols = strings.Split(volumearg, ",")
189 // Check that the specified volumes actually exist.
190 var goodvols []Volume = nil
191 for _, v := range keepvols {
192 if _, err := os.Stat(v); err == nil {
193 log.Println("adding Keep volume:", v)
194 newvol := MakeUnixVolume(v, serialize_io)
195 goodvols = append(goodvols, &newvol)
197 log.Printf("bad Keep volume: %s\n", err)
201 if len(goodvols) == 0 {
202 log.Fatal("could not find any keep volumes")
205 // Initialize data manager token and permission key.
206 // If these tokens are specified but cannot be read,
207 // raise a fatal error.
208 if data_manager_token_file != "" {
209 if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
210 data_manager_token = strings.TrimSpace(string(buf))
212 log.Fatalf("reading data manager token: %s\n", err)
215 if permission_key_file != "" {
216 if buf, err := ioutil.ReadFile(permission_key_file); err == nil {
217 PermissionSecret = bytes.TrimSpace(buf)
219 log.Fatalf("reading permission key: %s\n", err)
223 // Initialize permission TTL
224 permission_ttl = time.Duration(permission_ttl_sec) * time.Second
226 // If --enforce-permissions is true, we must have a permission key
228 if PermissionSecret == nil {
229 if enforce_permissions {
230 log.Fatal("--enforce-permissions requires a permission key")
232 log.Println("Running without a PermissionSecret. Block locators " +
233 "returned by this server will not be signed, and will be rejected " +
234 "by a server that enforces permissions.")
235 log.Println("To fix this, run Keep with --permission-key-file=<path> " +
236 "to define the location of a file containing the permission key.")
240 // Start a round-robin VolumeManager with the volumes we have found.
241 KeepVM = MakeRRVolumeManager(goodvols)
243 // Tell the built-in HTTP server to direct all requests to the REST
245 http.Handle("/", MakeRESTRouter())
247 // Set up a TCP listener.
248 listener, err := net.Listen("tcp", listen)
253 // Shut down the server gracefully (by closing the listener)
254 // if SIGTERM is received.
255 term := make(chan os.Signal, 1)
256 go func(sig <-chan os.Signal) {
258 log.Println("caught signal:", s)
261 signal.Notify(term, syscall.SIGTERM)
264 f, err := os.Create(pidfile)
266 fmt.Fprint(f, os.Getpid())
269 log.Printf("Error writing pid file (%s): %s", pidfile, err.Error())
273 // Start listening for requests.
274 srv := &http.Server{Addr: listen}
277 log.Println("shutting down")
285 // Returns a mux.Router that passes GET and PUT requests to the
286 // appropriate handlers.
288 func MakeRESTRouter() *mux.Router {
289 rest := mux.NewRouter()
292 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
294 `/{hash:[0-9a-f]{32}}+{hints}`,
295 GetBlockHandler).Methods("GET", "HEAD")
297 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
299 // For IndexHandler we support:
300 // /index - returns all locators
301 // /index/{prefix} - returns all locators that begin with {prefix}
302 // {prefix} is a string of hexadecimal digits between 0 and 32 digits.
303 // If {prefix} is the empty string, return an index of all locators
304 // (so /index and /index/ behave identically)
305 // A client may supply a full 32-digit locator string, in which
306 // case the server will return an index with either zero or one
307 // entries. This usage allows a client to check whether a block is
308 // present, and its size and upload time, without retrieving the
311 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
313 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
314 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
316 // Any request which does not match any of these routes gets
318 rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
323 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
324 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
328 // Returns a list of Keep volumes mounted on this system.
330 // A Keep volume is a normal or tmpfs volume with a /keep
331 // directory at the top level of the mount point.
333 func FindKeepVolumes() []string {
334 vols := make([]string, 0)
336 if f, err := os.Open(PROC_MOUNTS); err != nil {
337 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
339 scanner := bufio.NewScanner(f)
341 args := strings.Fields(scanner.Text())
342 dev, mount := args[0], args[1]
344 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
345 keep := mount + "/keep"
346 if st, err := os.Stat(keep); err == nil && st.IsDir() {
347 vols = append(vols, keep)
351 if err := scanner.Err(); err != nil {
358 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
359 hash := mux.Vars(req)["hash"]
361 log.Printf("%s %s", req.Method, hash)
363 hints := mux.Vars(req)["hints"]
365 // Parse the locator string and hints from the request.
366 // TODO(twp): implement a Locator type.
367 var signature, timestamp string
369 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
370 for _, hint := range strings.Split(hints, "+") {
371 if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
372 // Server ignores size hints
373 } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
376 } else if match, _ := regexp.MatchString("^[[:upper:]]", hint); match {
377 // Any unknown hint that starts with an uppercase letter is
378 // presumed to be valid and ignored, to permit forward compatibility.
380 // Unknown format; not a valid locator.
381 http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
387 // If permission checking is in effect, verify this
388 // request's permission signature.
389 if enforce_permissions {
390 if signature == "" || timestamp == "" {
391 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
393 } else if IsExpired(timestamp) {
394 http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
397 req_locator := req.URL.Path[1:] // strip leading slash
398 if !VerifySignature(req_locator, GetApiToken(req)) {
399 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
405 block, err := GetBlock(hash)
407 // Garbage collect after each GET. Fixes #2865.
408 // TODO(twp): review Keep memory usage and see if there's
409 // a better way to do this than blindly garbage collecting
410 // after every block.
414 // This type assertion is safe because the only errors
415 // GetBlock can return are DiskHashError or NotFoundError.
416 if err == NotFoundError {
417 log.Printf("%s: not found, giving up\n", hash)
419 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
423 resp.Header().Set("X-Block-Size", fmt.Sprintf("%d", len(block)))
425 _, err = resp.Write(block)
427 log.Printf("GetBlockHandler: writing response: %s", err)
433 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
434 // Garbage collect after each PUT. Fixes #2865.
435 // See also GetBlockHandler.
438 hash := mux.Vars(req)["hash"]
440 log.Printf("%s %s", req.Method, hash)
442 // Read the block data to be stored.
443 // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
445 if req.ContentLength > BLOCKSIZE {
446 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
450 buf := make([]byte, req.ContentLength)
451 nread, err := io.ReadFull(req.Body, buf)
453 http.Error(resp, err.Error(), 500)
454 } else if int64(nread) < req.ContentLength {
455 http.Error(resp, "request truncated", 500)
457 if err := PutBlock(buf, hash); err == nil {
458 // Success; add a size hint, sign the locator if
459 // possible, and return it to the client.
460 return_hash := fmt.Sprintf("%s+%d", hash, len(buf))
461 api_token := GetApiToken(req)
462 if PermissionSecret != nil && api_token != "" {
463 expiry := time.Now().Add(permission_ttl)
464 return_hash = SignLocator(return_hash, api_token, expiry)
466 resp.Write([]byte(return_hash + "\n"))
468 ke := err.(*KeepError)
469 http.Error(resp, ke.Error(), ke.HTTPCode)
476 // A HandleFunc to address /index and /index/{prefix} requests.
478 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
479 prefix := mux.Vars(req)["prefix"]
481 // Only the data manager may issue /index requests,
482 // and only if enforce_permissions is enabled.
483 // All other requests return 403 Forbidden.
484 api_token := GetApiToken(req)
485 if !enforce_permissions ||
487 data_manager_token != api_token {
488 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
492 for _, vol := range KeepVM.Volumes() {
493 index = index + vol.Index(prefix)
495 resp.Write([]byte(index))
499 // Responds to /status.json requests with the current node status,
500 // described in a JSON structure.
502 // The data given in a status.json response includes:
503 // volumes - a list of Keep volumes currently in use by this server
504 // each volume is an object with the following fields:
506 // * device_num (an integer identifying the underlying filesystem)
510 type VolumeStatus struct {
511 MountPoint string `json:"mount_point"`
512 DeviceNum uint64 `json:"device_num"`
513 BytesFree uint64 `json:"bytes_free"`
514 BytesUsed uint64 `json:"bytes_used"`
517 type NodeStatus struct {
518 Volumes []*VolumeStatus `json:"volumes"`
521 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
522 st := GetNodeStatus()
523 if jstat, err := json.Marshal(st); err == nil {
526 log.Printf("json.Marshal: %s\n", err)
527 log.Printf("NodeStatus = %v\n", st)
528 http.Error(resp, err.Error(), 500)
533 // Returns a NodeStatus struct describing this Keep
534 // node's current status.
536 func GetNodeStatus() *NodeStatus {
537 st := new(NodeStatus)
539 st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
540 for i, vol := range KeepVM.Volumes() {
541 st.Volumes[i] = vol.Status()
547 // Returns a VolumeStatus describing the requested volume.
549 func GetVolumeStatus(volume string) *VolumeStatus {
550 var fs syscall.Statfs_t
553 if fi, err := os.Stat(volume); err == nil {
554 devnum = fi.Sys().(*syscall.Stat_t).Dev
556 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
560 err := syscall.Statfs(volume, &fs)
562 log.Printf("GetVolumeStatus: statfs: %s\n", err)
565 // These calculations match the way df calculates disk usage:
566 // "free" space is measured by fs.Bavail, but "used" space
567 // uses fs.Blocks - fs.Bfree.
568 free := fs.Bavail * uint64(fs.Bsize)
569 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
570 return &VolumeStatus{volume, devnum, free, used}
573 func GetBlock(hash string) ([]byte, error) {
574 // Attempt to read the requested hash from a keep volume.
575 error_to_caller := NotFoundError
577 for _, vol := range KeepVM.Volumes() {
578 if buf, err := vol.Get(hash); err != nil {
579 // IsNotExist is an expected error and may be ignored.
580 // (If all volumes report IsNotExist, we return a NotFoundError)
581 // All other errors should be logged but we continue trying to
584 case os.IsNotExist(err):
587 log.Printf("GetBlock: reading %s: %s\n", hash, err)
590 // Double check the file checksum.
592 filehash := fmt.Sprintf("%x", md5.Sum(buf))
593 if filehash != hash {
594 // TODO(twp): this condition probably represents a bad disk and
595 // should raise major alarm bells for an administrator: e.g.
596 // they should be sent directly to an event manager at high
597 // priority or logged as urgent problems.
599 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
601 error_to_caller = DiskHashError
604 if error_to_caller != NotFoundError {
605 log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned\n",
613 if error_to_caller != NotFoundError {
614 log.Printf("%s: checksum mismatch, no good copy found\n", hash)
616 return nil, error_to_caller
619 /* PutBlock(block, hash)
620 Stores the BLOCK (identified by the content id HASH) in Keep.
622 The MD5 checksum of the block must be identical to the content id HASH.
623 If not, an error is returned.
625 PutBlock stores the BLOCK on the first Keep volume with free space.
626 A failure code is returned to the user only if all volumes fail.
628 On success, PutBlock returns nil.
629 On failure, it returns a KeepError with one of the following codes:
632 A different block with the same hash already exists on this
635 The MD5 hash of the BLOCK does not match the argument HASH.
637 There was not enough space left in any Keep volume to store
640 The object could not be stored for some other reason (e.g.
641 all writes failed). The text of the error message should
642 provide as much detail as possible.
645 func PutBlock(block []byte, hash string) error {
646 // Check that BLOCK's checksum matches HASH.
647 blockhash := fmt.Sprintf("%x", md5.Sum(block))
648 if blockhash != hash {
649 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
650 return RequestHashError
653 // If we already have a block on disk under this identifier, return
654 // success (but check for MD5 collisions).
655 // The only errors that GetBlock can return are DiskHashError and NotFoundError.
656 // In either case, we want to write our new (good) block to disk,
657 // so there is nothing special to do if err != nil.
658 if oldblock, err := GetBlock(hash); err == nil {
659 if bytes.Compare(block, oldblock) == 0 {
662 return CollisionError
666 // Choose a Keep volume to write to.
667 // If this volume fails, try all of the volumes in order.
668 vol := KeepVM.Choose()
669 if err := vol.Put(hash, block); err == nil {
670 return nil // success!
673 for _, vol := range KeepVM.Volumes() {
674 err := vol.Put(hash, block)
676 return nil // success!
678 if err != FullError {
679 // The volume is not full but the write did not succeed.
680 // Report the error and continue trying.
682 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
687 log.Printf("all Keep volumes full")
690 log.Printf("all Keep volumes failed")
697 // Return true if the specified string is a valid Keep locator.
698 // When Keep is extended to support hash types other than MD5,
699 // this should be updated to cover those as well.
701 func IsValidLocator(loc string) bool {
702 match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
706 log.Printf("IsValidLocator: %s\n", err)
710 // GetApiToken returns the OAuth2 token from the Authorization
711 // header of a HTTP request, or an empty string if no matching
713 func GetApiToken(req *http.Request) string {
714 if auth, ok := req.Header["Authorization"]; ok {
715 if pat, err := regexp.Compile(`^OAuth2\s+(.*)`); err != nil {
717 } else if match := pat.FindStringSubmatch(auth[0]); match != nil {
724 // IsExpired returns true if the given Unix timestamp (expressed as a
725 // hexadecimal string) is in the past, or if timestamp_hex cannot be
726 // parsed as a hexadecimal string.
727 func IsExpired(timestamp_hex string) bool {
728 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
730 log.Printf("IsExpired: %s\n", err)
733 return time.Unix(ts, 0).Before(time.Now())