11 "github.com/gorilla/mux"
22 // ======================
23 // Configuration settings
25 // TODO(twp): make all of these configurable via command line flags
26 // and/or configuration file settings.
28 // Default TCP address on which to listen for requests.
29 const DEFAULT_ADDR = ":25107"
31 // A Keep "block" is 64MB.
32 const BLOCKSIZE = 64 * 1024 * 1024
34 // A Keep volume must have at least MIN_FREE_KILOBYTES available
35 // in order to permit writes.
36 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
38 var PROC_MOUNTS = "/proc/mounts"
40 // The Keep VolumeManager maintains a list of available volumes.
41 var KeepVM VolumeManager
46 type KeepError struct {
52 CollisionError = &KeepError{400, "Collision"}
53 MD5Error = &KeepError{401, "MD5 Failure"}
54 CorruptError = &KeepError{402, "Corruption"}
55 NotFoundError = &KeepError{404, "Not Found"}
56 GenericError = &KeepError{500, "Fail"}
57 FullError = &KeepError{503, "Full"}
58 TooLongError = &KeepError{504, "Too Long"}
61 func (e *KeepError) Error() string {
65 // This error is returned by ReadAtMost if the available
66 // data exceeds BLOCKSIZE bytes.
67 var ReadErrorTooLong = errors.New("Too long")
70 // Parse command-line flags:
72 // -listen=ipaddr:port
73 // Interface on which to listen for requests. Use :port without
74 // an ipaddr to listen on all network interfaces.
76 // -listen=127.0.0.1:4949
77 // -listen=10.0.1.24:8000
78 // -listen=:25107 (to listen to port 25107 on all interfaces)
81 // A comma-separated list of directories to use as Keep volumes.
83 // -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
85 // If -volumes is empty or is not present, Keep will select volumes
86 // by looking at currently mounted filesystems for /keep top-level
89 var listen, volumearg string
91 flag.StringVar(&listen, "listen", DEFAULT_ADDR,
92 "interface on which to listen for requests, in the format ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port to listen on all network interfaces.")
93 flag.StringVar(&volumearg, "volumes", "",
94 "Comma-separated list of directories to use for Keep volumes, e.g. -volumes=/var/keep1,/var/keep2. If empty or not supplied, Keep will scan mounted filesystems for volumes with a /keep top-level directory.")
95 flag.BoolVar(&serialize_io, "serialize", false,
96 "If set, all read and write operations on local Keep volumes will be serialized.")
99 // Look for local keep volumes.
100 var keepvols []string
102 // TODO(twp): decide whether this is desirable default behavior.
103 // In production we may want to require the admin to specify
104 // Keep volumes explicitly.
105 keepvols = FindKeepVolumes()
107 keepvols = strings.Split(volumearg, ",")
110 // Check that the specified volumes actually exist.
111 var goodvols []Volume = nil
112 for _, v := range keepvols {
113 if _, err := os.Stat(v); err == nil {
114 log.Println("adding Keep volume:", v)
115 newvol := MakeUnixVolume(v, serialize_io)
116 goodvols = append(goodvols, &newvol)
118 log.Printf("bad Keep volume: %s\n", err)
122 if len(goodvols) == 0 {
123 log.Fatal("could not find any keep volumes")
126 // Start a round-robin VolumeManager with the volumes we have found.
127 KeepVM = MakeRRVolumeManager(goodvols)
129 // Set up REST handlers.
131 // Start with a router that will route each URL path to an
132 // appropriate handler.
134 rest := mux.NewRouter()
135 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
136 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
137 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
138 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
139 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
141 // Tell the built-in HTTP server to direct all requests to the REST
143 http.Handle("/", rest)
145 // Start listening for requests.
146 http.ListenAndServe(listen, nil)
150 // Returns a list of Keep volumes mounted on this system.
152 // A Keep volume is a normal or tmpfs volume with a /keep
153 // directory at the top level of the mount point.
155 func FindKeepVolumes() []string {
156 vols := make([]string, 0)
158 if f, err := os.Open(PROC_MOUNTS); err != nil {
159 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
161 scanner := bufio.NewScanner(f)
163 args := strings.Fields(scanner.Text())
164 dev, mount := args[0], args[1]
165 if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
166 keep := mount + "/keep"
167 if st, err := os.Stat(keep); err == nil && st.IsDir() {
168 vols = append(vols, keep)
172 if err := scanner.Err(); err != nil {
179 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
180 hash := mux.Vars(req)["hash"]
182 block, err := GetBlock(hash)
184 http.Error(w, err.Error(), 404)
188 _, err = w.Write(block)
190 log.Printf("GetBlockHandler: writing response: %s", err)
196 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
197 hash := mux.Vars(req)["hash"]
199 // Read the block data to be stored.
200 // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
202 // Note: because req.Body is a buffered Reader, each Read() call will
203 // collect only the data in the network buffer (typically 16384 bytes),
204 // even if it is passed a much larger slice.
206 // Instead, call ReadAtMost to read data from the socket
207 // repeatedly until either EOF or BLOCKSIZE bytes have been read.
209 if buf, err := ReadAtMost(req.Body, BLOCKSIZE); err == nil {
210 if err := PutBlock(buf, hash); err == nil {
211 w.WriteHeader(http.StatusOK)
213 ke := err.(*KeepError)
214 http.Error(w, ke.Error(), ke.HTTPCode)
217 log.Println("error reading request: ", err)
218 errmsg := err.Error()
219 if err == ReadErrorTooLong {
220 // Use a more descriptive error message that includes
221 // the maximum request size.
222 errmsg = fmt.Sprintf("Max request size %d bytes", BLOCKSIZE)
224 http.Error(w, errmsg, 500)
229 // A HandleFunc to address /index and /index/{prefix} requests.
231 func IndexHandler(w http.ResponseWriter, req *http.Request) {
232 prefix := mux.Vars(req)["prefix"]
235 for _, vol := range KeepVM.Volumes() {
236 index = index + vol.Index(prefix)
238 w.Write([]byte(index))
242 // Responds to /status.json requests with the current node status,
243 // described in a JSON structure.
245 // The data given in a status.json response includes:
246 // volumes - a list of Keep volumes currently in use by this server
247 // each volume is an object with the following fields:
249 // * device_num (an integer identifying the underlying filesystem)
253 type VolumeStatus struct {
254 MountPoint string `json:"mount_point"`
255 DeviceNum uint64 `json:"device_num"`
256 BytesFree uint64 `json:"bytes_free"`
257 BytesUsed uint64 `json:"bytes_used"`
260 type NodeStatus struct {
261 Volumes []*VolumeStatus `json:"volumes"`
264 func StatusHandler(w http.ResponseWriter, req *http.Request) {
265 st := GetNodeStatus()
266 if jstat, err := json.Marshal(st); err == nil {
269 log.Printf("json.Marshal: %s\n", err)
270 log.Printf("NodeStatus = %v\n", st)
271 http.Error(w, err.Error(), 500)
276 // Returns a NodeStatus struct describing this Keep
277 // node's current status.
279 func GetNodeStatus() *NodeStatus {
280 st := new(NodeStatus)
282 st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
283 for i, vol := range KeepVM.Volumes() {
284 st.Volumes[i] = vol.Status()
290 // Returns a VolumeStatus describing the requested volume.
292 func GetVolumeStatus(volume string) *VolumeStatus {
293 var fs syscall.Statfs_t
296 if fi, err := os.Stat(volume); err == nil {
297 devnum = fi.Sys().(*syscall.Stat_t).Dev
299 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
303 err := syscall.Statfs(volume, &fs)
305 log.Printf("GetVolumeStatus: statfs: %s\n", err)
308 // These calculations match the way df calculates disk usage:
309 // "free" space is measured by fs.Bavail, but "used" space
310 // uses fs.Blocks - fs.Bfree.
311 free := fs.Bavail * uint64(fs.Bsize)
312 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
313 return &VolumeStatus{volume, devnum, free, used}
316 func GetBlock(hash string) ([]byte, error) {
317 // Attempt to read the requested hash from a keep volume.
318 for _, vol := range KeepVM.Volumes() {
319 if buf, err := vol.Get(hash); err != nil {
320 // IsNotExist is an expected error and may be ignored.
321 // (If all volumes report IsNotExist, we return a NotFoundError)
322 // A CorruptError should be returned immediately.
323 // Any other errors should be logged but we continue trying to
326 case os.IsNotExist(err):
329 log.Printf("GetBlock: reading %s: %s\n", hash, err)
332 // Double check the file checksum.
334 filehash := fmt.Sprintf("%x", md5.Sum(buf))
335 if filehash != hash {
336 // TODO(twp): this condition probably represents a bad disk and
337 // should raise major alarm bells for an administrator: e.g.
338 // they should be sent directly to an event manager at high
339 // priority or logged as urgent problems.
341 log.Printf("%s: checksum mismatch for request %s (actual hash %s)\n",
343 return buf, CorruptError
350 log.Printf("%s: not found on any volumes, giving up\n", hash)
351 return nil, NotFoundError
354 /* PutBlock(block, hash)
355 Stores the BLOCK (identified by the content id HASH) in Keep.
357 The MD5 checksum of the block must be identical to the content id HASH.
358 If not, an error is returned.
360 PutBlock stores the BLOCK on the first Keep volume with free space.
361 A failure code is returned to the user only if all volumes fail.
363 On success, PutBlock returns nil.
364 On failure, it returns a KeepError with one of the following codes:
367 A different block with the same hash already exists on this
370 The MD5 hash of the BLOCK does not match the argument HASH.
372 There was not enough space left in any Keep volume to store
375 The object could not be stored for some other reason (e.g.
376 all writes failed). The text of the error message should
377 provide as much detail as possible.
380 func PutBlock(block []byte, hash string) error {
381 // Check that BLOCK's checksum matches HASH.
382 blockhash := fmt.Sprintf("%x", md5.Sum(block))
383 if blockhash != hash {
384 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
388 // If we already have a block on disk under this identifier, return
389 // success (but check for MD5 collisions).
390 // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
391 // In either case, we want to write our new (good) block to disk, so there is
392 // nothing special to do if err != nil.
393 if oldblock, err := GetBlock(hash); err == nil {
394 if bytes.Compare(block, oldblock) == 0 {
397 return CollisionError
401 // Choose a Keep volume to write to.
402 // If this volume fails, try all of the volumes in order.
403 vol := KeepVM.Choose()
404 if err := vol.Put(hash, block); err == nil {
405 return nil // success!
408 for _, vol := range KeepVM.Volumes() {
409 err := vol.Put(hash, block)
411 return nil // success!
413 if err != FullError {
414 // The volume is not full but the write did not succeed.
415 // Report the error and continue trying.
417 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
422 log.Printf("all Keep volumes full")
425 log.Printf("all Keep volumes failed")
432 // Reads bytes repeatedly from an io.Reader until either
433 // encountering EOF, or the maxbytes byte limit has been reached.
434 // Returns a byte slice of the bytes that were read.
436 // If the reader contains more than maxbytes, returns a nil slice
439 func ReadAtMost(r io.Reader, maxbytes int) ([]byte, error) {
440 // Attempt to read one more byte than maxbytes.
441 lr := io.LimitReader(r, int64(maxbytes+1))
442 buf, err := ioutil.ReadAll(lr)
443 if len(buf) > maxbytes {
444 return nil, ReadErrorTooLong
450 // Return true if the specified string is a valid Keep locator.
451 // When Keep is extended to support hash types other than MD5,
452 // this should be updated to cover those as well.
454 func IsValidLocator(loc string) bool {
455 match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
459 log.Printf("IsValidLocator: %s\n", err)