11 "github.com/gorilla/mux"
22 // ======================
23 // Configuration settings
25 // TODO(twp): make all of these configurable via command line flags
26 // and/or configuration file settings.
28 // Default TCP address on which to listen for requests.
29 const DEFAULT_ADDR = ":25107"
31 // A Keep "block" is 64MB.
32 const BLOCKSIZE = 64 * 1024 * 1024
34 // A Keep volume must have at least MIN_FREE_KILOBYTES available
35 // in order to permit writes.
36 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
38 var PROC_MOUNTS = "/proc/mounts"
40 // KeepVolumes is a slice of volumes on which blocks can be stored.
41 var KeepVolumes []Volume
46 type KeepError struct {
52 CollisionError = &KeepError{400, "Collision"}
53 MD5Error = &KeepError{401, "MD5 Failure"}
54 CorruptError = &KeepError{402, "Corruption"}
55 NotFoundError = &KeepError{404, "Not Found"}
56 GenericError = &KeepError{500, "Fail"}
57 FullError = &KeepError{503, "Full"}
58 TooLongError = &KeepError{504, "Too Long"}
61 func (e *KeepError) Error() string {
65 // This error is returned by ReadAtMost if the available
66 // data exceeds BLOCKSIZE bytes.
67 var ReadErrorTooLong = errors.New("Too long")
70 // Parse command-line flags:
72 // -listen=ipaddr:port
73 // Interface on which to listen for requests. Use :port without
74 // an ipaddr to listen on all network interfaces.
76 // -listen=127.0.0.1:4949
77 // -listen=10.0.1.24:8000
78 // -listen=:25107 (to listen to port 25107 on all interfaces)
81 // A comma-separated list of directories to use as Keep volumes.
83 // -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
85 // If -volumes is empty or is not present, Keep will select volumes
86 // by looking at currently mounted filesystems for /keep top-level
89 var listen, volumearg string
90 flag.StringVar(&listen, "listen", DEFAULT_ADDR,
91 "interface on which to listen for requests, in the format ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port to listen on all network interfaces.")
92 flag.StringVar(&volumearg, "volumes", "",
93 "Comma-separated list of directories to use for Keep volumes, e.g. -volumes=/var/keep1,/var/keep2. If empty or not supplied, Keep will scan mounted filesystems for volumes with a /keep top-level directory.")
96 // Look for local keep volumes.
99 // TODO(twp): decide whether this is desirable default behavior.
100 // In production we may want to require the admin to specify
101 // Keep volumes explicitly.
102 keepvols = FindKeepVolumes()
104 keepvols = strings.Split(volumearg, ",")
107 // Check that the specified volumes actually exist.
108 KeepVolumes = []Volume(nil)
109 for _, v := range keepvols {
110 if _, err := os.Stat(v); err == nil {
111 log.Println("adding Keep volume:", v)
112 KeepVolumes = append(KeepVolumes, &UnixVolume{v})
114 log.Printf("bad Keep volume: %s\n", err)
118 if len(KeepVolumes) == 0 {
119 log.Fatal("could not find any keep volumes")
122 // Set up REST handlers.
124 // Start with a router that will route each URL path to an
125 // appropriate handler.
127 rest := mux.NewRouter()
128 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
129 rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
130 rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
131 rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
132 rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
134 // Tell the built-in HTTP server to direct all requests to the REST
136 http.Handle("/", rest)
138 // Start listening for requests.
139 http.ListenAndServe(listen, nil)
143 // Returns a list of Keep volumes mounted on this system.
145 // A Keep volume is a normal or tmpfs volume with a /keep
146 // directory at the top level of the mount point.
148 func FindKeepVolumes() []string {
149 vols := make([]string, 0)
151 if f, err := os.Open(PROC_MOUNTS); err != nil {
152 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
154 scanner := bufio.NewScanner(f)
156 args := strings.Fields(scanner.Text())
157 dev, mount := args[0], args[1]
158 if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
159 keep := mount + "/keep"
160 if st, err := os.Stat(keep); err == nil && st.IsDir() {
161 vols = append(vols, keep)
165 if err := scanner.Err(); err != nil {
172 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
173 hash := mux.Vars(req)["hash"]
175 block, err := GetBlock(hash)
177 http.Error(w, err.Error(), 404)
181 _, err = w.Write(block)
183 log.Printf("GetBlockHandler: writing response: %s", err)
189 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
190 hash := mux.Vars(req)["hash"]
192 // Read the block data to be stored.
193 // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
195 // Note: because req.Body is a buffered Reader, each Read() call will
196 // collect only the data in the network buffer (typically 16384 bytes),
197 // even if it is passed a much larger slice.
199 // Instead, call ReadAtMost to read data from the socket
200 // repeatedly until either EOF or BLOCKSIZE bytes have been read.
202 if buf, err := ReadAtMost(req.Body, BLOCKSIZE); err == nil {
203 if err := PutBlock(buf, hash); err == nil {
204 w.WriteHeader(http.StatusOK)
206 ke := err.(*KeepError)
207 http.Error(w, ke.Error(), ke.HTTPCode)
210 log.Println("error reading request: ", err)
211 errmsg := err.Error()
212 if err == ReadErrorTooLong {
213 // Use a more descriptive error message that includes
214 // the maximum request size.
215 errmsg = fmt.Sprintf("Max request size %d bytes", BLOCKSIZE)
217 http.Error(w, errmsg, 500)
222 // A HandleFunc to address /index and /index/{prefix} requests.
224 func IndexHandler(w http.ResponseWriter, req *http.Request) {
225 prefix := mux.Vars(req)["prefix"]
228 for _, vol := range KeepVolumes {
229 index = index + vol.Index(prefix)
231 w.Write([]byte(index))
235 // Responds to /status.json requests with the current node status,
236 // described in a JSON structure.
238 // The data given in a status.json response includes:
239 // volumes - a list of Keep volumes currently in use by this server
240 // each volume is an object with the following fields:
242 // * device_num (an integer identifying the underlying filesystem)
246 type VolumeStatus struct {
247 MountPoint string `json:"mount_point"`
248 DeviceNum uint64 `json:"device_num"`
249 BytesFree uint64 `json:"bytes_free"`
250 BytesUsed uint64 `json:"bytes_used"`
253 type NodeStatus struct {
254 Volumes []*VolumeStatus `json:"volumes"`
257 func StatusHandler(w http.ResponseWriter, req *http.Request) {
258 st := GetNodeStatus()
259 if jstat, err := json.Marshal(st); err == nil {
262 log.Printf("json.Marshal: %s\n", err)
263 log.Printf("NodeStatus = %v\n", st)
264 http.Error(w, err.Error(), 500)
269 // Returns a NodeStatus struct describing this Keep
270 // node's current status.
272 func GetNodeStatus() *NodeStatus {
273 st := new(NodeStatus)
275 st.Volumes = make([]*VolumeStatus, len(KeepVolumes))
276 for i, vol := range KeepVolumes {
277 st.Volumes[i] = vol.Status()
283 // Returns a VolumeStatus describing the requested volume.
285 func GetVolumeStatus(volume string) *VolumeStatus {
286 var fs syscall.Statfs_t
289 if fi, err := os.Stat(volume); err == nil {
290 devnum = fi.Sys().(*syscall.Stat_t).Dev
292 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
296 err := syscall.Statfs(volume, &fs)
298 log.Printf("GetVolumeStatus: statfs: %s\n", err)
301 // These calculations match the way df calculates disk usage:
302 // "free" space is measured by fs.Bavail, but "used" space
303 // uses fs.Blocks - fs.Bfree.
304 free := fs.Bavail * uint64(fs.Bsize)
305 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
306 return &VolumeStatus{volume, devnum, free, used}
309 func GetBlock(hash string) ([]byte, error) {
310 // Attempt to read the requested hash from a keep volume.
311 for _, vol := range KeepVolumes {
312 if buf, err := vol.Read(hash); err != nil {
313 // IsNotExist is an expected error and may be ignored.
314 // (If all volumes report IsNotExist, we return a NotFoundError)
315 // A CorruptError should be returned immediately.
316 // Any other errors should be logged but we continue trying to
319 case os.IsNotExist(err):
321 case err == CorruptError:
324 log.Printf("GetBlock: reading %s: %s\n", hash, err)
332 log.Printf("%s: not found on any volumes, giving up\n", hash)
333 return nil, NotFoundError
336 /* PutBlock(block, hash)
337 Stores the BLOCK (identified by the content id HASH) in Keep.
339 The MD5 checksum of the block must be identical to the content id HASH.
340 If not, an error is returned.
342 PutBlock stores the BLOCK on the first Keep volume with free space.
343 A failure code is returned to the user only if all volumes fail.
345 On success, PutBlock returns nil.
346 On failure, it returns a KeepError with one of the following codes:
349 A different block with the same hash already exists on this
352 The MD5 hash of the BLOCK does not match the argument HASH.
354 There was not enough space left in any Keep volume to store
357 The object could not be stored for some other reason (e.g.
358 all writes failed). The text of the error message should
359 provide as much detail as possible.
362 func PutBlock(block []byte, hash string) error {
363 // Check that BLOCK's checksum matches HASH.
364 blockhash := fmt.Sprintf("%x", md5.Sum(block))
365 if blockhash != hash {
366 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
370 // If we already have a block on disk under this identifier, return
371 // success (but check for MD5 collisions).
372 // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
373 // In either case, we want to write our new (good) block to disk, so there is
374 // nothing special to do if err != nil.
375 if oldblock, err := GetBlock(hash); err == nil {
376 if bytes.Compare(block, oldblock) == 0 {
379 return CollisionError
383 // Store the block on the first available Keep volume.
385 for _, vol := range KeepVolumes {
386 err := vol.Write(hash, block)
388 return nil // success!
390 if err != FullError {
391 // The volume is not full but the write did not succeed.
392 // Report the error and continue trying.
394 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
399 log.Printf("all Keep volumes full")
402 log.Printf("all Keep volumes failed")
408 // Reads bytes repeatedly from an io.Reader until either
409 // encountering EOF, or the maxbytes byte limit has been reached.
410 // Returns a byte slice of the bytes that were read.
412 // If the reader contains more than maxbytes, returns a nil slice
415 func ReadAtMost(r io.Reader, maxbytes int) ([]byte, error) {
416 // Attempt to read one more byte than maxbytes.
417 lr := io.LimitReader(r, int64(maxbytes+1))
418 buf, err := ioutil.ReadAll(lr)
419 if len(buf) > maxbytes {
420 return nil, ReadErrorTooLong
426 // Return true if the specified string is a valid Keep locator.
427 // When Keep is extended to support hash types other than MD5,
428 // this should be updated to cover those as well.
430 func IsValidLocator(loc string) (bool, error) {
431 return regexp.MatchString(`^[0-9a-f]{32}$`, loc)