11 "github.com/gorilla/mux"
24 // ======================
25 // Configuration settings
27 // TODO(twp): make all of these configurable via command line flags
28 // and/or configuration file settings.
30 // Default TCP address on which to listen for requests.
31 // Initialized by the --listen flag.
32 const DEFAULT_ADDR = ":25107"
34 // A Keep "block" is 64MB.
35 const BLOCKSIZE = 64 * 1024 * 1024
37 // A Keep volume must have at least MIN_FREE_KILOBYTES available
38 // in order to permit writes.
39 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
41 var PROC_MOUNTS = "/proc/mounts"
43 // The Keep VolumeManager maintains a list of available volumes.
44 // Initialized by the --volumes flag (or by FindKeepVolumes).
45 var KeepVM VolumeManager
47 // enforce_permissions controls whether permission signatures
48 // should be enforced (affecting GET and DELETE requests).
49 // Initialized by the --enforce-permissions flag.
50 var enforce_permissions bool
52 // permission_ttl is the time duration for which new permission
53 // signatures (returned by PUT requests) will be valid.
54 // Initialized by the --permission-ttl flag.
55 var permission_ttl time.Duration
57 // data_manager_token represents the API token used by the
58 // Data Manager, and is required on certain privileged operations.
59 // Initialized by the --data-manager-token-file flag.
60 var data_manager_token string
65 type KeepError struct {
71 CollisionError = &KeepError{400, "Collision"}
72 MD5Error = &KeepError{401, "MD5 Failure"}
73 PermissionError = &KeepError{401, "Permission denied"}
74 CorruptError = &KeepError{402, "Corruption"}
75 ExpiredError = &KeepError{403, "Expired permission signature"}
76 NotFoundError = &KeepError{404, "Not Found"}
77 GenericError = &KeepError{500, "Fail"}
78 FullError = &KeepError{503, "Full"}
79 TooLongError = &KeepError{504, "Too Long"}
82 func (e *KeepError) Error() string {
86 // This error is returned by ReadAtMost if the available
87 // data exceeds BLOCKSIZE bytes.
88 var ReadErrorTooLong = errors.New("Too long")
90 // TODO(twp): continue moving as much code as possible out of main
91 // so it can be effectively tested. Esp. handling and postprocessing
92 // of command line flags (identifying Keep volumes and initializing
93 // permission arguments).
96 // Parse command-line flags:
98 // -listen=ipaddr:port
99 // Interface on which to listen for requests. Use :port without
100 // an ipaddr to listen on all network interfaces.
102 // -listen=127.0.0.1:4949
103 // -listen=10.0.1.24:8000
104 // -listen=:25107 (to listen to port 25107 on all interfaces)
107 // A comma-separated list of directories to use as Keep volumes.
109 // -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
111 // If -volumes is empty or is not present, Keep will select volumes
112 // by looking at currently mounted filesystems for /keep top-level
116 data_manager_token_file string
118 permission_key_file string
119 permission_ttl_sec int
124 &data_manager_token_file,
125 "data-manager-token-file",
127 "File with the API token used by the Data Manager. All DELETE "+
128 "requests or GET /index requests must carry this token.")
130 &enforce_permissions,
131 "enforce-permissions",
133 "Enforce permission signatures on requests.")
138 "Interface on which to listen for requests, in the format "+
139 "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
140 "to listen on all network interfaces.")
142 &permission_key_file,
143 "permission-key-file",
145 "File containing the secret key for generating and verifying "+
146 "permission signatures.")
151 "Expiration time (in seconds) for newly generated permission "+
157 "If set, all read and write operations on local Keep volumes will "+
163 "Comma-separated list of directories to use for Keep volumes, "+
164 "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
165 "supplied, Keep will scan mounted filesystems for volumes "+
166 "with a /keep top-level directory.")
169 // Look for local keep volumes.
170 var keepvols []string
172 // TODO(twp): decide whether this is desirable default behavior.
173 // In production we may want to require the admin to specify
174 // Keep volumes explicitly.
175 keepvols = FindKeepVolumes()
177 keepvols = strings.Split(volumearg, ",")
180 // Check that the specified volumes actually exist.
181 var goodvols []Volume = nil
182 for _, v := range keepvols {
183 if _, err := os.Stat(v); err == nil {
184 log.Println("adding Keep volume:", v)
185 newvol := MakeUnixVolume(v, serialize_io)
186 goodvols = append(goodvols, &newvol)
188 log.Printf("bad Keep volume: %s\n", err)
192 if len(goodvols) == 0 {
193 log.Fatal("could not find any keep volumes")
196 // Initialize data manager token and permission key.
197 // If these tokens are specified but cannot be read,
198 // raise a fatal error.
199 if data_manager_token_file != "" {
200 if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
201 data_manager_token = strings.TrimSpace(string(buf))
203 log.Fatalf("reading data manager token: %s\n", err)
206 if permission_key_file != "" {
207 if buf, err := ioutil.ReadFile(permission_key_file); err == nil {
208 PermissionSecret = bytes.TrimSpace(buf)
210 log.Fatalf("reading permission key: %s\n", err)
214 // Initialize permission TTL
215 permission_ttl = time.Duration(permission_ttl_sec) * time.Second
217 // If --enforce-permissions is true, we must have a permission key
219 if PermissionSecret == nil {
220 if enforce_permissions {
221 log.Fatal("--enforce-permissions requires a permission key")
223 log.Warning("Running without a PermissionSecret. Block locators " +
224 "returned by this server will not be signed, and will be rejected " +
225 "by a server that enforces permissions.")
226 log.Warning("To fix this, run Keep with --permission-key-file=<path> " +
227 "to define the location of a file containing the permission key.")
231 // Start a round-robin VolumeManager with the volumes we have found.
232 KeepVM = MakeRRVolumeManager(goodvols)
234 // Tell the built-in HTTP server to direct all requests to the REST
236 http.Handle("/", MakeRESTRouter())
238 // Start listening for requests.
239 http.ListenAndServe(listen, nil)
243 // Returns a mux.Router that passes GET and PUT requests to the
244 // appropriate handlers.
246 func MakeRESTRouter() *mux.Router {
247 rest := mux.NewRouter()
249 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
251 `/{hash:[0-9a-f]{32}}+A{signature:[0-9a-f]+}@{timestamp:[0-9a-f]+}`,
252 GetBlockHandler).Methods("GET", "HEAD")
253 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
255 // For IndexHandler we support:
256 // /index - returns all locators
257 // /index/{prefix} - returns all locators that begin with {prefix}
258 // {prefix} is a string of hexadecimal digits between 0 and 32 digits.
259 // If {prefix} is the empty string, return an index of all locators
260 // (so /index and /index/ behave identically)
261 // A client may supply a full 32-digit locator string, in which
262 // case the server will return an index with either zero or one
263 // entries. This usage allows a client to check whether a block is
264 // present, and its size and upload time, without retrieving the
267 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
269 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
270 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
275 // Returns a list of Keep volumes mounted on this system.
277 // A Keep volume is a normal or tmpfs volume with a /keep
278 // directory at the top level of the mount point.
280 func FindKeepVolumes() []string {
281 vols := make([]string, 0)
283 if f, err := os.Open(PROC_MOUNTS); err != nil {
284 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
286 scanner := bufio.NewScanner(f)
288 args := strings.Fields(scanner.Text())
289 dev, mount := args[0], args[1]
291 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
292 keep := mount + "/keep"
293 if st, err := os.Stat(keep); err == nil && st.IsDir() {
294 vols = append(vols, keep)
298 if err := scanner.Err(); err != nil {
305 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
306 hash := mux.Vars(req)["hash"]
307 signature := mux.Vars(req)["signature"]
308 timestamp := mux.Vars(req)["timestamp"]
310 // If permission checking is in effect, verify this
311 // request's permission signature.
312 if enforce_permissions {
313 if signature == "" || timestamp == "" {
314 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
316 } else if IsExpired(timestamp) {
317 http.Error(resp, ExpiredError.Error(), ExpiredError.HTTPCode)
320 validsig := MakePermSignature(hash, GetApiToken(req), timestamp)
321 if signature != validsig {
322 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
328 block, err := GetBlock(hash)
330 // This type assertion is safe because the only errors
331 // GetBlock can return are CorruptError or NotFoundError.
332 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
336 _, err = resp.Write(block)
338 log.Printf("GetBlockHandler: writing response: %s", err)
344 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
345 hash := mux.Vars(req)["hash"]
347 // Read the block data to be stored.
348 // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
350 // Note: because req.Body is a buffered Reader, each Read() call will
351 // collect only the data in the network buffer (typically 16384 bytes),
352 // even if it is passed a much larger slice.
354 // Instead, call ReadAtMost to read data from the socket
355 // repeatedly until either EOF or BLOCKSIZE bytes have been read.
357 if buf, err := ReadAtMost(req.Body, BLOCKSIZE); err == nil {
358 if err := PutBlock(buf, hash); err == nil {
359 // Success; sign the locator and return it to the client.
360 api_token := GetApiToken(req)
361 expiry := time.Now().Add(permission_ttl)
362 signed_loc := SignLocator(hash, api_token, expiry)
363 resp.Write([]byte(signed_loc))
365 ke := err.(*KeepError)
366 http.Error(resp, ke.Error(), ke.HTTPCode)
369 log.Println("error reading request: ", err)
370 errmsg := err.Error()
371 if err == ReadErrorTooLong {
372 // Use a more descriptive error message that includes
373 // the maximum request size.
374 errmsg = fmt.Sprintf("Max request size %d bytes", BLOCKSIZE)
376 http.Error(resp, errmsg, 500)
381 // A HandleFunc to address /index and /index/{prefix} requests.
383 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
384 prefix := mux.Vars(req)["prefix"]
386 // Only the data manager may issue /index requests,
387 // and only if enforce_permissions is enabled.
388 // All other requests return 403 Permission denied.
389 api_token := GetApiToken(req)
390 if !enforce_permissions ||
392 data_manager_token != api_token {
393 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
397 for _, vol := range KeepVM.Volumes() {
398 index = index + vol.Index(prefix)
400 resp.Write([]byte(index))
404 // Responds to /status.json requests with the current node status,
405 // described in a JSON structure.
407 // The data given in a status.json response includes:
408 // volumes - a list of Keep volumes currently in use by this server
409 // each volume is an object with the following fields:
411 // * device_num (an integer identifying the underlying filesystem)
415 type VolumeStatus struct {
416 MountPoint string `json:"mount_point"`
417 DeviceNum uint64 `json:"device_num"`
418 BytesFree uint64 `json:"bytes_free"`
419 BytesUsed uint64 `json:"bytes_used"`
422 type NodeStatus struct {
423 Volumes []*VolumeStatus `json:"volumes"`
426 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
427 st := GetNodeStatus()
428 if jstat, err := json.Marshal(st); err == nil {
431 log.Printf("json.Marshal: %s\n", err)
432 log.Printf("NodeStatus = %v\n", st)
433 http.Error(resp, err.Error(), 500)
438 // Returns a NodeStatus struct describing this Keep
439 // node's current status.
441 func GetNodeStatus() *NodeStatus {
442 st := new(NodeStatus)
444 st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
445 for i, vol := range KeepVM.Volumes() {
446 st.Volumes[i] = vol.Status()
452 // Returns a VolumeStatus describing the requested volume.
454 func GetVolumeStatus(volume string) *VolumeStatus {
455 var fs syscall.Statfs_t
458 if fi, err := os.Stat(volume); err == nil {
459 devnum = fi.Sys().(*syscall.Stat_t).Dev
461 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
465 err := syscall.Statfs(volume, &fs)
467 log.Printf("GetVolumeStatus: statfs: %s\n", err)
470 // These calculations match the way df calculates disk usage:
471 // "free" space is measured by fs.Bavail, but "used" space
472 // uses fs.Blocks - fs.Bfree.
473 free := fs.Bavail * uint64(fs.Bsize)
474 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
475 return &VolumeStatus{volume, devnum, free, used}
478 func GetBlock(hash string) ([]byte, error) {
479 // Attempt to read the requested hash from a keep volume.
480 for _, vol := range KeepVM.Volumes() {
481 if buf, err := vol.Get(hash); err != nil {
482 // IsNotExist is an expected error and may be ignored.
483 // (If all volumes report IsNotExist, we return a NotFoundError)
484 // A CorruptError should be returned immediately.
485 // Any other errors should be logged but we continue trying to
488 case os.IsNotExist(err):
491 log.Printf("GetBlock: reading %s: %s\n", hash, err)
494 // Double check the file checksum.
496 filehash := fmt.Sprintf("%x", md5.Sum(buf))
497 if filehash != hash {
498 // TODO(twp): this condition probably represents a bad disk and
499 // should raise major alarm bells for an administrator: e.g.
500 // they should be sent directly to an event manager at high
501 // priority or logged as urgent problems.
503 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
505 return buf, CorruptError
512 log.Printf("%s: not found on any volumes, giving up\n", hash)
513 return nil, NotFoundError
516 /* PutBlock(block, hash)
517 Stores the BLOCK (identified by the content id HASH) in Keep.
519 The MD5 checksum of the block must be identical to the content id HASH.
520 If not, an error is returned.
522 PutBlock stores the BLOCK on the first Keep volume with free space.
523 A failure code is returned to the user only if all volumes fail.
525 On success, PutBlock returns nil.
526 On failure, it returns a KeepError with one of the following codes:
529 A different block with the same hash already exists on this
532 The MD5 hash of the BLOCK does not match the argument HASH.
534 There was not enough space left in any Keep volume to store
537 The object could not be stored for some other reason (e.g.
538 all writes failed). The text of the error message should
539 provide as much detail as possible.
542 func PutBlock(block []byte, hash string) error {
543 // Check that BLOCK's checksum matches HASH.
544 blockhash := fmt.Sprintf("%x", md5.Sum(block))
545 if blockhash != hash {
546 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
550 // If we already have a block on disk under this identifier, return
551 // success (but check for MD5 collisions).
552 // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
553 // In either case, we want to write our new (good) block to disk,
554 // so there is nothing special to do if err != nil.
555 if oldblock, err := GetBlock(hash); err == nil {
556 if bytes.Compare(block, oldblock) == 0 {
559 return CollisionError
563 // Choose a Keep volume to write to.
564 // If this volume fails, try all of the volumes in order.
565 vol := KeepVM.Choose()
566 if err := vol.Put(hash, block); err == nil {
567 return nil // success!
570 for _, vol := range KeepVM.Volumes() {
571 err := vol.Put(hash, block)
573 return nil // success!
575 if err != FullError {
576 // The volume is not full but the write did not succeed.
577 // Report the error and continue trying.
579 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
584 log.Printf("all Keep volumes full")
587 log.Printf("all Keep volumes failed")
594 // Reads bytes repeatedly from an io.Reader until either
595 // encountering EOF, or the maxbytes byte limit has been reached.
596 // Returns a byte slice of the bytes that were read.
598 // If the reader contains more than maxbytes, returns a nil slice
601 func ReadAtMost(r io.Reader, maxbytes int) ([]byte, error) {
602 // Attempt to read one more byte than maxbytes.
603 lr := io.LimitReader(r, int64(maxbytes+1))
604 buf, err := ioutil.ReadAll(lr)
605 if len(buf) > maxbytes {
606 return nil, ReadErrorTooLong
612 // Return true if the specified string is a valid Keep locator.
613 // When Keep is extended to support hash types other than MD5,
614 // this should be updated to cover those as well.
616 func IsValidLocator(loc string) bool {
617 match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
621 log.Printf("IsValidLocator: %s\n", err)
625 // GetApiToken returns the OAuth token from the Authorization
626 // header of a HTTP request, or an empty string if no matching
628 func GetApiToken(req *http.Request) string {
629 if auth, ok := req.Header["Authorization"]; ok {
630 if strings.HasPrefix(auth[0], "OAuth ") {
637 // IsExpired returns true if the given Unix timestamp (expressed as a
638 // hexadecimal string) is in the past, or if timestamp_hex cannot be
639 // parsed as a hexadecimal string.
640 func IsExpired(timestamp_hex string) bool {
641 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
643 log.Printf("IsExpired: %s\n", err)
646 return time.Unix(ts, 0).Before(time.Now())