11 "github.com/gorilla/mux"
24 // ======================
25 // Configuration settings
27 // TODO(twp): make all of these configurable via command line flags
28 // and/or configuration file settings.
30 // Default TCP address on which to listen for requests.
31 // Initialized by the --listen flag.
32 const DEFAULT_ADDR = ":25107"
34 // A Keep "block" is 64MB.
35 const BLOCKSIZE = 64 * 1024 * 1024
37 // A Keep volume must have at least MIN_FREE_KILOBYTES available
38 // in order to permit writes.
39 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
41 var PROC_MOUNTS = "/proc/mounts"
43 // The Keep VolumeManager maintains a list of available volumes.
44 // Initialized by the --volumes flag (or by FindKeepVolumes).
45 var KeepVM VolumeManager
47 // enforce_permissions controls whether permission signatures
48 // should be enforced (affecting GET and DELETE requests).
49 // Initialized by the --enforce-permissions flag.
50 var enforce_permissions bool
52 // permission_ttl is the time duration for which new permission
53 // signatures (returned by PUT requests) will be valid.
54 // Initialized by the --permission-ttl flag.
55 var permission_ttl time.Duration
57 // data_manager_token represents the API token used by the
58 // Data Manager, and is required on certain privileged operations.
59 // Initialized by the --data-manager-token-file flag.
60 var data_manager_token string
65 type KeepError struct {
71 CollisionError = &KeepError{400, "Collision"}
72 MD5Error = &KeepError{401, "MD5 Failure"}
73 PermissionError = &KeepError{401, "Permission denied"}
74 CorruptError = &KeepError{402, "Corruption"}
75 ExpiredError = &KeepError{403, "Expired permission signature"}
76 NotFoundError = &KeepError{404, "Not Found"}
77 GenericError = &KeepError{500, "Fail"}
78 FullError = &KeepError{503, "Full"}
79 TooLongError = &KeepError{504, "Too Long"}
82 func (e *KeepError) Error() string {
86 // This error is returned by ReadAtMost if the available
87 // data exceeds BLOCKSIZE bytes.
88 var ReadErrorTooLong = errors.New("Too long")
91 // Parse command-line flags:
93 // -listen=ipaddr:port
94 // Interface on which to listen for requests. Use :port without
95 // an ipaddr to listen on all network interfaces.
97 // -listen=127.0.0.1:4949
98 // -listen=10.0.1.24:8000
99 // -listen=:25107 (to listen to port 25107 on all interfaces)
102 // A comma-separated list of directories to use as Keep volumes.
104 // -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
106 // If -volumes is empty or is not present, Keep will select volumes
107 // by looking at currently mounted filesystems for /keep top-level
111 data_manager_token_file string
113 permission_key_file string
114 permission_ttl_sec int
119 &data_manager_token_file,
120 "data-manager-token-file",
122 "File with the API token used by the Data Manager. All DELETE "+
123 "requests or GET /index requests must carry this token.")
125 &enforce_permissions,
126 "enforce-permissions",
128 "Enforce permission signatures on requests.")
133 "Interface on which to listen for requests, in the format "+
134 "ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port "+
135 "to listen on all network interfaces.")
137 &permission_key_file,
138 "permission-key-file",
140 "File containing the secret key for generating and verifying "+
141 "permission signatures.")
146 "Expiration time (in seconds) for newly generated permission "+
152 "If set, all read and write operations on local Keep volumes will "+
158 "Comma-separated list of directories to use for Keep volumes, "+
159 "e.g. -volumes=/var/keep1,/var/keep2. If empty or not "+
160 "supplied, Keep will scan mounted filesystems for volumes "+
161 "with a /keep top-level directory.")
164 // Look for local keep volumes.
165 var keepvols []string
167 // TODO(twp): decide whether this is desirable default behavior.
168 // In production we may want to require the admin to specify
169 // Keep volumes explicitly.
170 keepvols = FindKeepVolumes()
172 keepvols = strings.Split(volumearg, ",")
175 // Check that the specified volumes actually exist.
176 var goodvols []Volume = nil
177 for _, v := range keepvols {
178 if _, err := os.Stat(v); err == nil {
179 log.Println("adding Keep volume:", v)
180 newvol := MakeUnixVolume(v, serialize_io)
181 goodvols = append(goodvols, &newvol)
183 log.Printf("bad Keep volume: %s\n", err)
187 if len(goodvols) == 0 {
188 log.Fatal("could not find any keep volumes")
191 // Initialize data manager token and permission key.
192 // If these tokens are specified but cannot be read,
193 // raise a fatal error.
194 if data_manager_token_file != "" {
195 if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
196 data_manager_token = strings.TrimSpace(string(buf))
198 log.Fatalf("reading data manager token: %s\n", err)
201 if permission_key_file != "" {
202 if buf, err := ioutil.ReadFile(permission_key_file); err == nil {
203 PermissionSecret = bytes.TrimSpace(buf)
205 log.Fatalf("reading permission key: %s\n", err)
209 // Initialize permission TTL
210 permission_ttl = time.Duration(permission_ttl_sec) * time.Second
212 // If --enforce-permissions is true, we must have a permission key
214 if enforce_permissions && PermissionSecret == nil {
215 log.Fatal("--enforce-permissions requires a permission key")
218 // Start a round-robin VolumeManager with the volumes we have found.
219 KeepVM = MakeRRVolumeManager(goodvols)
221 // Tell the built-in HTTP server to direct all requests to the REST
223 http.Handle("/", NewRESTRouter())
225 // Start listening for requests.
226 http.ListenAndServe(listen, nil)
230 // Returns a mux.Router that passes GET and PUT requests to the
231 // appropriate handlers.
233 func NewRESTRouter() *mux.Router {
234 rest := mux.NewRouter()
236 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
238 `/{hash:[0-9a-f]{32}}+A{signature:[0-9a-f]+}@{timestamp:[0-9a-f]+}`,
239 GetBlockHandler).Methods("GET", "HEAD")
240 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
241 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
243 `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
244 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
249 // Returns a list of Keep volumes mounted on this system.
251 // A Keep volume is a normal or tmpfs volume with a /keep
252 // directory at the top level of the mount point.
254 func FindKeepVolumes() []string {
255 vols := make([]string, 0)
257 if f, err := os.Open(PROC_MOUNTS); err != nil {
258 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
260 scanner := bufio.NewScanner(f)
262 args := strings.Fields(scanner.Text())
263 dev, mount := args[0], args[1]
265 (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) {
266 keep := mount + "/keep"
267 if st, err := os.Stat(keep); err == nil && st.IsDir() {
268 vols = append(vols, keep)
272 if err := scanner.Err(); err != nil {
279 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
280 hash := mux.Vars(req)["hash"]
281 signature := mux.Vars(req)["signature"]
282 timestamp := mux.Vars(req)["timestamp"]
284 // If permission checking is in effect, verify this
285 // request's permission signature.
286 if enforce_permissions {
287 if signature == "" || timestamp == "" {
288 http.Error(w, PermissionError.Error(), PermissionError.HTTPCode)
290 } else if IsExpired(timestamp) {
291 http.Error(w, ExpiredError.Error(), ExpiredError.HTTPCode)
294 validsig := MakePermSignature(hash, GetApiToken(req), timestamp)
295 if signature != validsig {
296 http.Error(w, PermissionError.Error(), PermissionError.HTTPCode)
302 block, err := GetBlock(hash)
304 http.Error(w, err.Error(), err.(*KeepError).HTTPCode)
308 _, err = w.Write(block)
310 log.Printf("GetBlockHandler: writing response: %s", err)
316 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
317 hash := mux.Vars(req)["hash"]
319 // Read the block data to be stored.
320 // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
322 // Note: because req.Body is a buffered Reader, each Read() call will
323 // collect only the data in the network buffer (typically 16384 bytes),
324 // even if it is passed a much larger slice.
326 // Instead, call ReadAtMost to read data from the socket
327 // repeatedly until either EOF or BLOCKSIZE bytes have been read.
329 if buf, err := ReadAtMost(req.Body, BLOCKSIZE); err == nil {
330 if err := PutBlock(buf, hash); err == nil {
331 // Success; sign the locator and return it to the client.
332 api_token := GetApiToken(req)
333 expiry := time.Now().Add(permission_ttl)
334 signed_loc := SignLocator(hash, api_token, expiry)
335 w.Write([]byte(signed_loc))
337 ke := err.(*KeepError)
338 http.Error(w, ke.Error(), ke.HTTPCode)
341 log.Println("error reading request: ", err)
342 errmsg := err.Error()
343 if err == ReadErrorTooLong {
344 // Use a more descriptive error message that includes
345 // the maximum request size.
346 errmsg = fmt.Sprintf("Max request size %d bytes", BLOCKSIZE)
348 http.Error(w, errmsg, 500)
353 // A HandleFunc to address /index and /index/{prefix} requests.
355 func IndexHandler(w http.ResponseWriter, req *http.Request) {
356 prefix := mux.Vars(req)["prefix"]
358 // Only the data manager may issue /index requests,
359 // and only if enforce_permissions is enabled.
360 // All other requests return 403 Permission denied.
361 api_token := GetApiToken(req)
362 if !enforce_permissions ||
364 data_manager_token != GetApiToken(req) {
365 http.Error(w, PermissionError.Error(), PermissionError.HTTPCode)
369 for _, vol := range KeepVM.Volumes() {
370 index = index + vol.Index(prefix)
372 w.Write([]byte(index))
376 // Responds to /status.json requests with the current node status,
377 // described in a JSON structure.
379 // The data given in a status.json response includes:
380 // volumes - a list of Keep volumes currently in use by this server
381 // each volume is an object with the following fields:
383 // * device_num (an integer identifying the underlying filesystem)
387 type VolumeStatus struct {
388 MountPoint string `json:"mount_point"`
389 DeviceNum uint64 `json:"device_num"`
390 BytesFree uint64 `json:"bytes_free"`
391 BytesUsed uint64 `json:"bytes_used"`
394 type NodeStatus struct {
395 Volumes []*VolumeStatus `json:"volumes"`
398 func StatusHandler(w http.ResponseWriter, req *http.Request) {
399 st := GetNodeStatus()
400 if jstat, err := json.Marshal(st); err == nil {
403 log.Printf("json.Marshal: %s\n", err)
404 log.Printf("NodeStatus = %v\n", st)
405 http.Error(w, err.Error(), 500)
410 // Returns a NodeStatus struct describing this Keep
411 // node's current status.
413 func GetNodeStatus() *NodeStatus {
414 st := new(NodeStatus)
416 st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
417 for i, vol := range KeepVM.Volumes() {
418 st.Volumes[i] = vol.Status()
424 // Returns a VolumeStatus describing the requested volume.
426 func GetVolumeStatus(volume string) *VolumeStatus {
427 var fs syscall.Statfs_t
430 if fi, err := os.Stat(volume); err == nil {
431 devnum = fi.Sys().(*syscall.Stat_t).Dev
433 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
437 err := syscall.Statfs(volume, &fs)
439 log.Printf("GetVolumeStatus: statfs: %s\n", err)
442 // These calculations match the way df calculates disk usage:
443 // "free" space is measured by fs.Bavail, but "used" space
444 // uses fs.Blocks - fs.Bfree.
445 free := fs.Bavail * uint64(fs.Bsize)
446 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
447 return &VolumeStatus{volume, devnum, free, used}
450 func GetBlock(hash string) ([]byte, error) {
451 // Attempt to read the requested hash from a keep volume.
452 for _, vol := range KeepVM.Volumes() {
453 if buf, err := vol.Get(hash); err != nil {
454 // IsNotExist is an expected error and may be ignored.
455 // (If all volumes report IsNotExist, we return a NotFoundError)
456 // A CorruptError should be returned immediately.
457 // Any other errors should be logged but we continue trying to
460 case os.IsNotExist(err):
463 log.Printf("GetBlock: reading %s: %s\n", hash, err)
466 // Double check the file checksum.
468 filehash := fmt.Sprintf("%x", md5.Sum(buf))
469 if filehash != hash {
470 // TODO(twp): this condition probably represents a bad disk and
471 // should raise major alarm bells for an administrator: e.g.
472 // they should be sent directly to an event manager at high
473 // priority or logged as urgent problems.
475 log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
477 return buf, CorruptError
484 log.Printf("%s: not found on any volumes, giving up\n", hash)
485 return nil, NotFoundError
488 /* PutBlock(block, hash)
489 Stores the BLOCK (identified by the content id HASH) in Keep.
491 The MD5 checksum of the block must be identical to the content id HASH.
492 If not, an error is returned.
494 PutBlock stores the BLOCK on the first Keep volume with free space.
495 A failure code is returned to the user only if all volumes fail.
497 On success, PutBlock returns nil.
498 On failure, it returns a KeepError with one of the following codes:
501 A different block with the same hash already exists on this
504 The MD5 hash of the BLOCK does not match the argument HASH.
506 There was not enough space left in any Keep volume to store
509 The object could not be stored for some other reason (e.g.
510 all writes failed). The text of the error message should
511 provide as much detail as possible.
514 func PutBlock(block []byte, hash string) error {
515 // Check that BLOCK's checksum matches HASH.
516 blockhash := fmt.Sprintf("%x", md5.Sum(block))
517 if blockhash != hash {
518 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
522 // If we already have a block on disk under this identifier, return
523 // success (but check for MD5 collisions).
524 // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
525 // In either case, we want to write our new (good) block to disk,
526 // so there is nothing special to do if err != nil.
527 if oldblock, err := GetBlock(hash); err == nil {
528 if bytes.Compare(block, oldblock) == 0 {
531 return CollisionError
535 // Choose a Keep volume to write to.
536 // If this volume fails, try all of the volumes in order.
537 vol := KeepVM.Choose()
538 if err := vol.Put(hash, block); err == nil {
539 return nil // success!
542 for _, vol := range KeepVM.Volumes() {
543 err := vol.Put(hash, block)
545 return nil // success!
547 if err != FullError {
548 // The volume is not full but the write did not succeed.
549 // Report the error and continue trying.
551 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
556 log.Printf("all Keep volumes full")
559 log.Printf("all Keep volumes failed")
566 // Reads bytes repeatedly from an io.Reader until either
567 // encountering EOF, or the maxbytes byte limit has been reached.
568 // Returns a byte slice of the bytes that were read.
570 // If the reader contains more than maxbytes, returns a nil slice
573 func ReadAtMost(r io.Reader, maxbytes int) ([]byte, error) {
574 // Attempt to read one more byte than maxbytes.
575 lr := io.LimitReader(r, int64(maxbytes+1))
576 buf, err := ioutil.ReadAll(lr)
577 if len(buf) > maxbytes {
578 return nil, ReadErrorTooLong
584 // Return true if the specified string is a valid Keep locator.
585 // When Keep is extended to support hash types other than MD5,
586 // this should be updated to cover those as well.
588 func IsValidLocator(loc string) bool {
589 match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
593 log.Printf("IsValidLocator: %s\n", err)
597 // GetApiToken returns the OAuth token from the Authorization
598 // header of a HTTP request, or an empty string if no matching
600 func GetApiToken(req *http.Request) string {
601 if auth, ok := req.Header["Authorization"]; ok {
602 if strings.HasPrefix(auth[0], "OAuth ") {
609 // IsExpired returns true if the given Unix timestamp (expressed as a
610 // hexadecimal string) is in the past.
611 func IsExpired(timestamp_hex string) bool {
612 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
614 log.Printf("IsExpired: %s\n", err)
617 return time.Unix(ts, 0).Before(time.Now())