9 "github.com/gorilla/mux"
20 // Default TCP port on which to listen for requests.
21 const DEFAULT_PORT = 25107
23 // A Keep "block" is 64MB.
24 const BLOCKSIZE = 64 * 1024 * 1024
26 // A Keep volume must have at least MIN_FREE_KILOBYTES available
27 // in order to permit writes.
28 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
30 var PROC_MOUNTS = "/proc/mounts"
32 var KeepVolumes []string
34 type KeepError struct {
48 func (e *KeepError) Error() string {
49 return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
53 // Look for local keep volumes.
54 KeepVolumes = FindKeepVolumes()
55 if len(KeepVolumes) == 0 {
56 log.Fatal("could not find any keep volumes")
58 for _, v := range KeepVolumes {
59 log.Println("keep volume:", v)
62 // Set up REST handlers.
64 // Start with a router that will route each URL path to an
65 // appropriate handler.
67 rest := mux.NewRouter()
68 rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
69 rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT")
71 // Tell the built-in HTTP server to direct all requests to the REST
73 http.Handle("/", rest)
75 // Start listening for requests.
76 port := fmt.Sprintf(":%d", DEFAULT_PORT)
77 http.ListenAndServe(port, nil)
81 // Returns a list of Keep volumes mounted on this system.
83 // A Keep volume is a normal or tmpfs volume with a /keep
84 // directory at the top level of the mount point.
86 func FindKeepVolumes() []string {
87 vols := make([]string, 0)
89 if f, err := os.Open(PROC_MOUNTS); err != nil {
90 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
92 scanner := bufio.NewScanner(f)
94 args := strings.Fields(scanner.Text())
95 dev, mount := args[0], args[1]
96 if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
97 keep := mount + "/keep"
98 if st, err := os.Stat(keep); err == nil && st.IsDir() {
99 vols = append(vols, keep)
103 if err := scanner.Err(); err != nil {
110 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
111 hash := mux.Vars(req)["hash"]
113 block, err := GetBlock(hash)
115 http.Error(w, err.Error(), 404)
119 _, err = w.Write(block)
121 log.Printf("GetBlockHandler: writing response: %s", err)
127 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
128 hash := mux.Vars(req)["hash"]
130 // Read the block data to be stored.
131 // TODO(twp): decide what to do when the input stream contains
132 // more than BLOCKSIZE bytes.
134 buf := make([]byte, BLOCKSIZE)
135 if nread, err := req.Body.Read(buf); err == nil {
136 if err := PutBlock(buf[:nread], hash); err == nil {
137 w.WriteHeader(http.StatusOK)
139 ke := err.(*KeepError)
140 http.Error(w, ke.Error(), ke.HTTPCode)
143 log.Println("error reading request: ", err)
144 http.Error(w, err.Error(), 500)
148 func GetBlock(hash string) ([]byte, error) {
149 var buf = make([]byte, BLOCKSIZE)
151 // Attempt to read the requested hash from a keep volume.
152 for _, vol := range KeepVolumes {
157 blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
159 f, err = os.Open(blockFilename)
161 if !os.IsNotExist(err) {
162 // A block is stored on only one Keep disk,
163 // so os.IsNotExist is expected. Report any other errors.
164 log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
169 nread, err = f.Read(buf)
171 log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
175 // Double check the file checksum.
177 filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
178 if filehash != hash {
179 // TODO(twp): this condition probably represents a bad disk and
180 // should raise major alarm bells for an administrator: e.g.
181 // they should be sent directly to an event manager at high
182 // priority or logged as urgent problems.
184 log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
185 vol, blockFilename, filehash)
186 return buf, &KeepError{ErrCorrupt, errors.New("Corrupt")}
190 return buf[:nread], nil
193 log.Printf("%s: not found on any volumes, giving up\n", hash)
194 return buf, &KeepError{ErrNotFound, errors.New("not found: " + hash)}
197 /* PutBlock(block, hash)
198 Stores the BLOCK (identified by the content id HASH) in Keep.
200 The MD5 checksum of the block must be identical to the content id HASH.
201 If not, an error is returned.
203 PutBlock stores the BLOCK on the first Keep volume with free space.
204 A failure code is returned to the user only if all volumes fail.
206 On success, PutBlock returns nil.
207 On failure, it returns a KeepError with one of the following codes:
210 A different block with the same hash already exists on this
213 The MD5 hash of the BLOCK does not match the argument HASH.
215 There was not enough space left in any Keep volume to store
218 The object could not be stored for some other reason (e.g.
219 all writes failed). The text of the error message should
220 provide as much detail as possible.
223 func PutBlock(block []byte, hash string) error {
224 // Check that BLOCK's checksum matches HASH.
225 blockhash := fmt.Sprintf("%x", md5.Sum(block))
226 if blockhash != hash {
227 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
228 return &KeepError{ErrMD5Fail, errors.New("MD5Fail")}
231 // If we already have a block on disk under this identifier, return
232 // success (but check for MD5 collisions).
233 // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
234 // In either case, we want to write our new (good) block to disk, so there is
235 // nothing special to do if err != nil.
236 if oldblock, err := GetBlock(hash); err == nil {
237 if bytes.Compare(block, oldblock) == 0 {
240 return &KeepError{ErrCollision, errors.New("Collision")}
244 // Store the block on the first available Keep volume.
246 for _, vol := range KeepVolumes {
251 blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
252 if err := os.MkdirAll(blockDir, 0755); err != nil {
253 log.Printf("%s: could not create directory %s: %s",
258 tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash)
260 log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr)
263 blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
265 if _, err := tmpfile.Write(block); err != nil {
266 log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
269 if err := tmpfile.Close(); err != nil {
270 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
271 os.Remove(tmpfile.Name())
274 if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
275 log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
276 os.Remove(tmpfile.Name())
283 log.Printf("all Keep volumes full")
284 return &KeepError{ErrFull, errors.New("Full")}
286 log.Printf("all Keep volumes failed")
287 return &KeepError{ErrOther, errors.New("Fail")}
291 func IsFull(volume string) (isFull bool) {
292 fullSymlink := volume + "/full"
294 // Check if the volume has been marked as full in the last hour.
295 if link, err := os.Readlink(fullSymlink); err == nil {
296 if ts, err := strconv.Atoi(link); err == nil {
297 fulltime := time.Unix(int64(ts), 0)
298 if time.Since(fulltime).Hours() < 1.0 {
304 if avail, err := FreeDiskSpace(volume); err == nil {
305 isFull = avail < MIN_FREE_KILOBYTES
307 log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
311 // If the volume is full, timestamp it.
313 now := fmt.Sprintf("%d", time.Now().Unix())
314 os.Symlink(now, fullSymlink)
319 // FreeDiskSpace(volume)
320 // Returns the amount of available disk space on VOLUME,
321 // as a number of 1k blocks.
323 func FreeDiskSpace(volume string) (free uint64, err error) {
324 var fs syscall.Statfs_t
325 err = syscall.Statfs(volume, &fs)
327 // Statfs output is not guaranteed to measure free
328 // space in terms of 1K blocks.
329 free = fs.Bavail * uint64(fs.Bsize) / 1024