10 "github.com/gorilla/mux"
21 // Default TCP address on which to listen for requests.
22 const DEFAULT_ADDR = ":25107"
24 // A Keep "block" is 64MB.
25 const BLOCKSIZE = 64 * 1024 * 1024
27 // A Keep volume must have at least MIN_FREE_KILOBYTES available
28 // in order to permit writes.
29 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
31 var PROC_MOUNTS = "/proc/mounts"
33 var KeepVolumes []string
35 type KeepError struct {
49 func (e *KeepError) Error() string {
50 return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
54 // Parse command-line flags:
56 // -listen=ipaddr:port
57 // Interface on which to listen for requests. Use :port without
58 // an ipaddr to listen on all network interfaces.
60 // -listen=127.0.0.1:4949
61 // -listen=10.0.1.24:8000
62 // -listen=:25107 (to listen to port 25107 on all interfaces)
65 // A comma-separated list of directories to use as Keep volumes.
67 // -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
69 // If -volumes is empty or is not present, Keep will select volumes
70 // by looking at currently mounted filesystems for /keep top-level
73 var listen, keepvols string
74 flag.StringVar(&listen, "listen", DEFAULT_ADDR,
75 "interface on which to listen for requests")
76 flag.StringVar(&keepvols, "volumes", "",
77 "comma-separated list of directories to use for Keep volumes")
80 // Look for local keep volumes.
82 KeepVolumes = FindKeepVolumes()
84 KeepVolumes = strings.Split(keepvols, ",")
87 if len(KeepVolumes) == 0 {
88 log.Fatal("could not find any keep volumes")
90 for _, v := range KeepVolumes {
91 log.Println("keep volume:", v)
94 // Set up REST handlers.
96 // Start with a router that will route each URL path to an
97 // appropriate handler.
99 rest := mux.NewRouter()
100 rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
101 rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT")
103 // Tell the built-in HTTP server to direct all requests to the REST
105 http.Handle("/", rest)
107 // Start listening for requests.
108 http.ListenAndServe(listen, nil)
112 // Returns a list of Keep volumes mounted on this system.
114 // A Keep volume is a normal or tmpfs volume with a /keep
115 // directory at the top level of the mount point.
117 func FindKeepVolumes() []string {
118 vols := make([]string, 0)
120 if f, err := os.Open(PROC_MOUNTS); err != nil {
121 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
123 scanner := bufio.NewScanner(f)
125 args := strings.Fields(scanner.Text())
126 dev, mount := args[0], args[1]
127 if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
128 keep := mount + "/keep"
129 if st, err := os.Stat(keep); err == nil && st.IsDir() {
130 vols = append(vols, keep)
134 if err := scanner.Err(); err != nil {
141 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
142 hash := mux.Vars(req)["hash"]
144 block, err := GetBlock(hash)
146 http.Error(w, err.Error(), 404)
150 _, err = w.Write(block)
152 log.Printf("GetBlockHandler: writing response: %s", err)
158 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
159 hash := mux.Vars(req)["hash"]
161 // Read the block data to be stored.
162 // TODO(twp): decide what to do when the input stream contains
163 // more than BLOCKSIZE bytes.
165 buf := make([]byte, BLOCKSIZE)
166 if nread, err := req.Body.Read(buf); err == nil {
167 if err := PutBlock(buf[:nread], hash); err == nil {
168 w.WriteHeader(http.StatusOK)
170 ke := err.(*KeepError)
171 http.Error(w, ke.Error(), ke.HTTPCode)
174 log.Println("error reading request: ", err)
175 http.Error(w, err.Error(), 500)
179 func GetBlock(hash string) ([]byte, error) {
180 var buf = make([]byte, BLOCKSIZE)
182 // Attempt to read the requested hash from a keep volume.
183 for _, vol := range KeepVolumes {
188 blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
190 f, err = os.Open(blockFilename)
192 if !os.IsNotExist(err) {
193 // A block is stored on only one Keep disk,
194 // so os.IsNotExist is expected. Report any other errors.
195 log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
200 nread, err = f.Read(buf)
202 log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
206 // Double check the file checksum.
208 filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
209 if filehash != hash {
210 // TODO(twp): this condition probably represents a bad disk and
211 // should raise major alarm bells for an administrator: e.g.
212 // they should be sent directly to an event manager at high
213 // priority or logged as urgent problems.
215 log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
216 vol, blockFilename, filehash)
217 return buf, &KeepError{ErrCorrupt, errors.New("Corrupt")}
221 return buf[:nread], nil
224 log.Printf("%s: not found on any volumes, giving up\n", hash)
225 return buf, &KeepError{ErrNotFound, errors.New("not found: " + hash)}
228 /* PutBlock(block, hash)
229 Stores the BLOCK (identified by the content id HASH) in Keep.
231 The MD5 checksum of the block must be identical to the content id HASH.
232 If not, an error is returned.
234 PutBlock stores the BLOCK on the first Keep volume with free space.
235 A failure code is returned to the user only if all volumes fail.
237 On success, PutBlock returns nil.
238 On failure, it returns a KeepError with one of the following codes:
241 A different block with the same hash already exists on this
244 The MD5 hash of the BLOCK does not match the argument HASH.
246 There was not enough space left in any Keep volume to store
249 The object could not be stored for some other reason (e.g.
250 all writes failed). The text of the error message should
251 provide as much detail as possible.
254 func PutBlock(block []byte, hash string) error {
255 // Check that BLOCK's checksum matches HASH.
256 blockhash := fmt.Sprintf("%x", md5.Sum(block))
257 if blockhash != hash {
258 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
259 return &KeepError{ErrMD5Fail, errors.New("MD5Fail")}
262 // If we already have a block on disk under this identifier, return
263 // success (but check for MD5 collisions).
264 // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
265 // In either case, we want to write our new (good) block to disk, so there is
266 // nothing special to do if err != nil.
267 if oldblock, err := GetBlock(hash); err == nil {
268 if bytes.Compare(block, oldblock) == 0 {
271 return &KeepError{ErrCollision, errors.New("Collision")}
275 // Store the block on the first available Keep volume.
277 for _, vol := range KeepVolumes {
282 blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
283 if err := os.MkdirAll(blockDir, 0755); err != nil {
284 log.Printf("%s: could not create directory %s: %s",
289 tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash)
291 log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr)
294 blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
296 if _, err := tmpfile.Write(block); err != nil {
297 log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
300 if err := tmpfile.Close(); err != nil {
301 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
302 os.Remove(tmpfile.Name())
305 if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
306 log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
307 os.Remove(tmpfile.Name())
314 log.Printf("all Keep volumes full")
315 return &KeepError{ErrFull, errors.New("Full")}
317 log.Printf("all Keep volumes failed")
318 return &KeepError{ErrOther, errors.New("Fail")}
322 func IsFull(volume string) (isFull bool) {
323 fullSymlink := volume + "/full"
325 // Check if the volume has been marked as full in the last hour.
326 if link, err := os.Readlink(fullSymlink); err == nil {
327 if ts, err := strconv.Atoi(link); err == nil {
328 fulltime := time.Unix(int64(ts), 0)
329 if time.Since(fulltime).Hours() < 1.0 {
335 if avail, err := FreeDiskSpace(volume); err == nil {
336 isFull = avail < MIN_FREE_KILOBYTES
338 log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
342 // If the volume is full, timestamp it.
344 now := fmt.Sprintf("%d", time.Now().Unix())
345 os.Symlink(now, fullSymlink)
350 // FreeDiskSpace(volume)
351 // Returns the amount of available disk space on VOLUME,
352 // as a number of 1k blocks.
354 func FreeDiskSpace(volume string) (free uint64, err error) {
355 var fs syscall.Statfs_t
356 err = syscall.Statfs(volume, &fs)
358 // Statfs output is not guaranteed to measure free
359 // space in terms of 1K blocks.
360 free = fs.Bavail * uint64(fs.Bsize) / 1024