11 "github.com/gorilla/mux"
24 // ======================
25 // Configuration settings
27 // TODO(twp): make all of these configurable via command line flags
28 // and/or configuration file settings.
30 // Default TCP address on which to listen for requests.
31 const DEFAULT_ADDR = ":25107"
33 // A Keep "block" is 64MB.
34 const BLOCKSIZE = 64 * 1024 * 1024
36 // A Keep volume must have at least MIN_FREE_KILOBYTES available
37 // in order to permit writes.
38 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
40 var PROC_MOUNTS = "/proc/mounts"
42 // The Keep VolumeManager maintains a list of available volumes.
43 var KeepVM VolumeManager
45 // enforce_permissions controls whether permission signatures
46 // should be enforced (affecting GET and DELETE requests)
47 var enforce_permissions bool
49 // permission_ttl is the time duration (in seconds) for which
50 // new permission signatures (returned by PUT requests) will be
52 var permission_ttl int
54 // data_manager_token represents the API token used by the
55 // Data Manager, and is required on certain privileged operations.
56 var data_manager_token string
61 type KeepError struct {
67 CollisionError = &KeepError{400, "Collision"}
68 MD5Error = &KeepError{401, "MD5 Failure"}
69 PermissionError = &KeepError{401, "Permission denied"}
70 CorruptError = &KeepError{402, "Corruption"}
71 ExpiredError = &KeepError{403, "Expired permission signature"}
72 NotFoundError = &KeepError{404, "Not Found"}
73 GenericError = &KeepError{500, "Fail"}
74 FullError = &KeepError{503, "Full"}
75 TooLongError = &KeepError{504, "Too Long"}
78 func (e *KeepError) Error() string {
82 // This error is returned by ReadAtMost if the available
83 // data exceeds BLOCKSIZE bytes.
84 var ReadErrorTooLong = errors.New("Too long")
87 // Parse command-line flags:
89 // -listen=ipaddr:port
90 // Interface on which to listen for requests. Use :port without
91 // an ipaddr to listen on all network interfaces.
93 // -listen=127.0.0.1:4949
94 // -listen=10.0.1.24:8000
95 // -listen=:25107 (to listen to port 25107 on all interfaces)
98 // A comma-separated list of directories to use as Keep volumes.
100 // -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
102 // If -volumes is empty or is not present, Keep will select volumes
103 // by looking at currently mounted filesystems for /keep top-level
106 var data_manager_token_file, listen, permission_key_file, volumearg string
107 var serialize_io bool
109 &data_manager_token_file,
110 "data-manager-token-file",
112 "File with the API token used by the Data Manager. All DELETE requests or unqualified GET /index requests must carry this token.")
114 &enforce_permissions,
115 "enforce-permissions",
117 "Enforce permission signatures on requests.")
122 "interface on which to listen for requests, in the format ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port to listen on all network interfaces.")
124 &permission_key_file,
125 "permission-key-file",
127 "File containing the secret key for generating and verifying permission signatures.")
132 "Expiration time (in seconds) for newly generated permission signatures.")
137 "If set, all read and write operations on local Keep volumes will be serialized.")
142 "Comma-separated list of directories to use for Keep volumes, e.g. -volumes=/var/keep1,/var/keep2. If empty or not supplied, Keep will scan mounted filesystems for volumes with a /keep top-level directory.")
145 // Look for local keep volumes.
146 var keepvols []string
148 // TODO(twp): decide whether this is desirable default behavior.
149 // In production we may want to require the admin to specify
150 // Keep volumes explicitly.
151 keepvols = FindKeepVolumes()
153 keepvols = strings.Split(volumearg, ",")
156 // Check that the specified volumes actually exist.
157 var goodvols []Volume = nil
158 for _, v := range keepvols {
159 if _, err := os.Stat(v); err == nil {
160 log.Println("adding Keep volume:", v)
161 newvol := MakeUnixVolume(v, serialize_io)
162 goodvols = append(goodvols, &newvol)
164 log.Printf("bad Keep volume: %s\n", err)
168 if len(goodvols) == 0 {
169 log.Fatal("could not find any keep volumes")
172 // Initialize data manager token and permission key.
173 if data_manager_token_file != "" {
174 if buf, err := ioutil.ReadFile(data_manager_token_file); err == nil {
175 data_manager_token = strings.TrimSpace(string(buf))
177 log.Printf("reading data_manager_token: %s\n", err)
180 if permission_key_file != "" {
181 if buf, err := ioutil.ReadFile(permission_key_file); err == nil {
182 PermissionSecret = bytes.TrimSpace(buf)
184 log.Printf("reading data_manager_token: %s\n", err)
188 // If --enforce-permissions is true, we must have a permission key to continue.
189 if enforce_permissions && PermissionSecret == nil {
190 log.Fatal("--enforce-permissions requires a permission key")
193 // Start a round-robin VolumeManager with the volumes we have found.
194 KeepVM = MakeRRVolumeManager(goodvols)
196 // Tell the built-in HTTP server to direct all requests to the REST
198 http.Handle("/", NewRESTRouter())
200 // Start listening for requests.
201 http.ListenAndServe(listen, nil)
205 // Returns a mux.Router that passes GET and PUT requests to the
206 // appropriate handlers.
208 func NewRESTRouter() *mux.Router {
209 rest := mux.NewRouter()
210 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
211 rest.HandleFunc(`/{hash:[0-9a-f]{32}}+A{signature:[0-9a-f]+}@{timestamp:[0-9a-f]+}`, GetBlockHandler).Methods("GET", "HEAD")
212 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
213 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
214 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
215 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
220 // Returns a list of Keep volumes mounted on this system.
222 // A Keep volume is a normal or tmpfs volume with a /keep
223 // directory at the top level of the mount point.
225 func FindKeepVolumes() []string {
226 vols := make([]string, 0)
228 if f, err := os.Open(PROC_MOUNTS); err != nil {
229 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
231 scanner := bufio.NewScanner(f)
233 args := strings.Fields(scanner.Text())
234 dev, mount := args[0], args[1]
235 if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
236 keep := mount + "/keep"
237 if st, err := os.Stat(keep); err == nil && st.IsDir() {
238 vols = append(vols, keep)
242 if err := scanner.Err(); err != nil {
249 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
250 hash := mux.Vars(req)["hash"]
251 signature := mux.Vars(req)["signature"]
252 timestamp := mux.Vars(req)["timestamp"]
254 // If permission checking is in effect, verify this
255 // request's permission signature.
256 if enforce_permissions {
257 if signature == "" || timestamp == "" {
258 http.Error(w, PermissionError.Error(), PermissionError.HTTPCode)
260 } else if IsExpired(timestamp) {
261 http.Error(w, ExpiredError.Error(), ExpiredError.HTTPCode)
263 } else if signature != MakePermSignature(hash, GetApiToken(req), timestamp) {
264 http.Error(w, PermissionError.Error(), PermissionError.HTTPCode)
269 block, err := GetBlock(hash)
271 http.Error(w, err.Error(), err.(*KeepError).HTTPCode)
275 _, err = w.Write(block)
277 log.Printf("GetBlockHandler: writing response: %s", err)
283 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
284 hash := mux.Vars(req)["hash"]
286 // Read the block data to be stored.
287 // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
289 // Note: because req.Body is a buffered Reader, each Read() call will
290 // collect only the data in the network buffer (typically 16384 bytes),
291 // even if it is passed a much larger slice.
293 // Instead, call ReadAtMost to read data from the socket
294 // repeatedly until either EOF or BLOCKSIZE bytes have been read.
296 if buf, err := ReadAtMost(req.Body, BLOCKSIZE); err == nil {
297 if err := PutBlock(buf, hash); err == nil {
298 // Success; sign the locator and return it to the client.
299 api_token := GetApiToken(req)
300 expiry := time.Now().Add( // convert permission_ttl to time.Duration
301 time.Duration(permission_ttl) * time.Second)
302 signed_loc := SignLocator(hash, api_token, expiry)
303 w.Write([]byte(signed_loc))
305 ke := err.(*KeepError)
306 http.Error(w, ke.Error(), ke.HTTPCode)
309 log.Println("error reading request: ", err)
310 errmsg := err.Error()
311 if err == ReadErrorTooLong {
312 // Use a more descriptive error message that includes
313 // the maximum request size.
314 errmsg = fmt.Sprintf("Max request size %d bytes", BLOCKSIZE)
316 http.Error(w, errmsg, 500)
321 // A HandleFunc to address /index and /index/{prefix} requests.
323 func IndexHandler(w http.ResponseWriter, req *http.Request) {
324 prefix := mux.Vars(req)["prefix"]
326 // Only the data manager may issue unqualified "GET /index" requests,
327 // and only if enforce_permissions is enabled.
328 // If the request is unauthenticated, or does not match the data manager's
329 // API token, return 403 Permission denied.
331 api_token := GetApiToken(req)
332 if !enforce_permissions ||
334 data_manager_token != GetApiToken(req) {
335 http.Error(w, PermissionError.Error(), PermissionError.HTTPCode)
340 for _, vol := range KeepVM.Volumes() {
341 index = index + vol.Index(prefix)
343 w.Write([]byte(index))
347 // Responds to /status.json requests with the current node status,
348 // described in a JSON structure.
350 // The data given in a status.json response includes:
351 // volumes - a list of Keep volumes currently in use by this server
352 // each volume is an object with the following fields:
354 // * device_num (an integer identifying the underlying filesystem)
358 type VolumeStatus struct {
359 MountPoint string `json:"mount_point"`
360 DeviceNum uint64 `json:"device_num"`
361 BytesFree uint64 `json:"bytes_free"`
362 BytesUsed uint64 `json:"bytes_used"`
365 type NodeStatus struct {
366 Volumes []*VolumeStatus `json:"volumes"`
369 func StatusHandler(w http.ResponseWriter, req *http.Request) {
370 st := GetNodeStatus()
371 if jstat, err := json.Marshal(st); err == nil {
374 log.Printf("json.Marshal: %s\n", err)
375 log.Printf("NodeStatus = %v\n", st)
376 http.Error(w, err.Error(), 500)
381 // Returns a NodeStatus struct describing this Keep
382 // node's current status.
384 func GetNodeStatus() *NodeStatus {
385 st := new(NodeStatus)
387 st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
388 for i, vol := range KeepVM.Volumes() {
389 st.Volumes[i] = vol.Status()
395 // Returns a VolumeStatus describing the requested volume.
397 func GetVolumeStatus(volume string) *VolumeStatus {
398 var fs syscall.Statfs_t
401 if fi, err := os.Stat(volume); err == nil {
402 devnum = fi.Sys().(*syscall.Stat_t).Dev
404 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
408 err := syscall.Statfs(volume, &fs)
410 log.Printf("GetVolumeStatus: statfs: %s\n", err)
413 // These calculations match the way df calculates disk usage:
414 // "free" space is measured by fs.Bavail, but "used" space
415 // uses fs.Blocks - fs.Bfree.
416 free := fs.Bavail * uint64(fs.Bsize)
417 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
418 return &VolumeStatus{volume, devnum, free, used}
421 func GetBlock(hash string) ([]byte, error) {
422 // Attempt to read the requested hash from a keep volume.
423 for _, vol := range KeepVM.Volumes() {
424 if buf, err := vol.Get(hash); err != nil {
425 // IsNotExist is an expected error and may be ignored.
426 // (If all volumes report IsNotExist, we return a NotFoundError)
427 // A CorruptError should be returned immediately.
428 // Any other errors should be logged but we continue trying to
431 case os.IsNotExist(err):
434 log.Printf("GetBlock: reading %s: %s\n", hash, err)
437 // Double check the file checksum.
439 filehash := fmt.Sprintf("%x", md5.Sum(buf))
440 if filehash != hash {
441 // TODO(twp): this condition probably represents a bad disk and
442 // should raise major alarm bells for an administrator: e.g.
443 // they should be sent directly to an event manager at high
444 // priority or logged as urgent problems.
446 log.Printf("%s: checksum mismatch for request %s (actual hash %s)\n",
448 return buf, CorruptError
455 log.Printf("%s: not found on any volumes, giving up\n", hash)
456 return nil, NotFoundError
459 /* PutBlock(block, hash)
460 Stores the BLOCK (identified by the content id HASH) in Keep.
462 The MD5 checksum of the block must be identical to the content id HASH.
463 If not, an error is returned.
465 PutBlock stores the BLOCK on the first Keep volume with free space.
466 A failure code is returned to the user only if all volumes fail.
468 On success, PutBlock returns nil.
469 On failure, it returns a KeepError with one of the following codes:
472 A different block with the same hash already exists on this
475 The MD5 hash of the BLOCK does not match the argument HASH.
477 There was not enough space left in any Keep volume to store
480 The object could not be stored for some other reason (e.g.
481 all writes failed). The text of the error message should
482 provide as much detail as possible.
485 func PutBlock(block []byte, hash string) error {
486 // Check that BLOCK's checksum matches HASH.
487 blockhash := fmt.Sprintf("%x", md5.Sum(block))
488 if blockhash != hash {
489 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
493 // If we already have a block on disk under this identifier, return
494 // success (but check for MD5 collisions).
495 // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
496 // In either case, we want to write our new (good) block to disk, so there is
497 // nothing special to do if err != nil.
498 if oldblock, err := GetBlock(hash); err == nil {
499 if bytes.Compare(block, oldblock) == 0 {
502 return CollisionError
506 // Choose a Keep volume to write to.
507 // If this volume fails, try all of the volumes in order.
508 vol := KeepVM.Choose()
509 if err := vol.Put(hash, block); err == nil {
510 return nil // success!
513 for _, vol := range KeepVM.Volumes() {
514 err := vol.Put(hash, block)
516 return nil // success!
518 if err != FullError {
519 // The volume is not full but the write did not succeed.
520 // Report the error and continue trying.
522 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
527 log.Printf("all Keep volumes full")
530 log.Printf("all Keep volumes failed")
537 // Reads bytes repeatedly from an io.Reader until either
538 // encountering EOF, or the maxbytes byte limit has been reached.
539 // Returns a byte slice of the bytes that were read.
541 // If the reader contains more than maxbytes, returns a nil slice
544 func ReadAtMost(r io.Reader, maxbytes int) ([]byte, error) {
545 // Attempt to read one more byte than maxbytes.
546 lr := io.LimitReader(r, int64(maxbytes+1))
547 buf, err := ioutil.ReadAll(lr)
548 if len(buf) > maxbytes {
549 return nil, ReadErrorTooLong
555 // Return true if the specified string is a valid Keep locator.
556 // When Keep is extended to support hash types other than MD5,
557 // this should be updated to cover those as well.
559 func IsValidLocator(loc string) bool {
560 match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
564 log.Printf("IsValidLocator: %s\n", err)
568 // GetApiToken returns the OAuth token from the Authorization
569 // header of a HTTP request, or an empty string if no matching
571 func GetApiToken(req *http.Request) string {
572 if auth, ok := req.Header["Authorization"]; ok {
573 if strings.HasPrefix(auth[0], "OAuth ") {
580 // IsExpired returns true if the given Unix timestamp (expressed as a
581 // hexadecimal string) is in the past.
582 func IsExpired(timestamp_hex string) bool {
583 ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
585 log.Printf("IsExpired: %s\n", err)
588 return time.Unix(ts, 0).Before(time.Now())