8 "github.com/gorilla/mux"
18 // Default TCP port on which to listen for requests.
19 const DEFAULT_PORT = 25107
21 // A Keep "block" is 64MB.
22 const BLOCKSIZE = 64 * 1024 * 1024
24 // A Keep volume must have at least MIN_FREE_KILOBYTES available
25 // in order to permit writes.
26 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
28 var PROC_MOUNTS = "/proc/mounts"
30 var KeepVolumes []string
32 type KeepError struct {
37 func (e *KeepError) Error() string {
38 return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
42 // Look for local keep volumes.
43 KeepVolumes = FindKeepVolumes()
44 if len(KeepVolumes) == 0 {
45 log.Fatal("could not find any keep volumes")
47 for _, v := range KeepVolumes {
48 log.Println("keep volume:", v)
51 // Set up REST handlers.
53 // Start with a router that will route each URL path to an
54 // appropriate handler.
56 rest := mux.NewRouter()
57 rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
58 rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT")
60 // Tell the built-in HTTP server to direct all requests to the REST
62 http.Handle("/", rest)
64 // Start listening for requests.
65 port := fmt.Sprintf(":%d", DEFAULT_PORT)
66 http.ListenAndServe(port, nil)
70 // Returns a list of Keep volumes mounted on this system.
72 // A Keep volume is a normal or tmpfs volume with a /keep
73 // directory at the top level of the mount point.
75 func FindKeepVolumes() []string {
76 vols := make([]string, 0)
78 if f, err := os.Open(PROC_MOUNTS); err != nil {
79 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
81 scanner := bufio.NewScanner(f)
83 args := strings.Fields(scanner.Text())
84 dev, mount := args[0], args[1]
85 if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
86 keep := mount + "/keep"
87 if st, err := os.Stat(keep); err == nil && st.IsDir() {
88 vols = append(vols, keep)
92 if err := scanner.Err(); err != nil {
99 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
100 hash := mux.Vars(req)["hash"]
102 block, err := GetBlock(hash)
104 http.Error(w, err.Error(), 404)
108 _, err = w.Write(block)
110 log.Printf("GetBlockHandler: writing response: %s", err)
116 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
117 hash := mux.Vars(req)["hash"]
119 // Read the block data to be stored.
120 // TODO(twp): decide what to do when the input stream contains
121 // more than BLOCKSIZE bytes.
123 buf := make([]byte, BLOCKSIZE)
124 if nread, err := req.Body.Read(buf); err == nil {
125 if err := PutBlock(buf[:nread], hash); err == nil {
126 w.WriteHeader(http.StatusOK)
128 ke := err.(*KeepError)
129 http.Error(w, ke.Error(), ke.HTTPCode)
132 log.Println("error reading request: ", err)
133 http.Error(w, err.Error(), 500)
137 func GetBlock(hash string) ([]byte, error) {
138 var buf = make([]byte, BLOCKSIZE)
140 // Attempt to read the requested hash from a keep volume.
141 for _, vol := range KeepVolumes {
146 blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
148 f, err = os.Open(blockFilename)
150 if !os.IsNotExist(err) {
151 // A block is stored on only one Keep disk,
152 // so os.IsNotExist is expected. Report any other errors.
153 log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
158 nread, err = f.Read(buf)
160 log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
164 // Double check the file checksum.
166 filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
167 if filehash != hash {
168 // TODO(twp): this condition probably represents a bad disk and
169 // should raise major alarm bells for an administrator: e.g.
170 // they should be sent directly to an event manager at high
171 // priority or logged as urgent problems.
173 log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
174 vol, blockFilename, filehash)
179 return buf[:nread], nil
182 log.Printf("%s: not found on any volumes, giving up\n", hash)
183 return buf, &KeepError{404, errors.New("not found: " + hash)}
186 /* PutBlock(block, hash)
187 Stores the BLOCK (identified by the content id HASH) in Keep.
189 The MD5 checksum of the block must be identical to the content id HASH.
190 If not, an error is returned.
192 PutBlock stores the BLOCK on the first Keep volume with free space.
193 A failure code is returned to the user only if all volumes fail.
195 On success, PutBlock returns nil.
196 On failure, it returns a KeepError with one of the following codes:
199 -- The MD5 hash of the BLOCK does not match the argument HASH.
201 -- There was not enough space left in any Keep volume to store
204 -- The object could not be stored for some other reason (e.g.
205 all writes failed). The text of the error message should
206 provide as much detail as possible.
209 func PutBlock(block []byte, hash string) error {
210 // Check that BLOCK's checksum matches HASH.
211 blockhash := fmt.Sprintf("%x", md5.Sum(block))
212 if blockhash != hash {
213 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
214 return &KeepError{401, errors.New("MD5Fail")}
218 for _, vol := range KeepVolumes {
223 blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
224 if err := os.MkdirAll(blockDir, 0755); err != nil {
225 log.Printf("%s: could not create directory %s: %s",
230 blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
231 f, err := os.OpenFile(blockFilename, os.O_CREATE|os.O_WRONLY, 0644)
233 // if the block already exists, just return success.
234 // TODO(twp): should we check here whether the file on disk
235 // matches the file we were asked to store?
239 // Open failed for some other reason.
240 log.Printf("%s: creating %s: %s\n", vol, blockFilename, err)
245 if _, err := f.Write(block); err == nil {
249 log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
255 log.Printf("all Keep volumes full")
256 return &KeepError{503, errors.New("Full")}
258 log.Printf("all Keep volumes failed")
259 return &KeepError{500, errors.New("Fail")}
263 func IsFull(volume string) (isFull bool) {
264 fullSymlink := volume + "/full"
266 // Check if the volume has been marked as full in the last hour.
267 if link, err := os.Readlink(fullSymlink); err == nil {
268 if ts, err := strconv.Atoi(link); err == nil {
269 fulltime := time.Unix(int64(ts), 0)
270 if time.Since(fulltime).Hours() < 1.0 {
276 if avail, err := FreeDiskSpace(volume); err == nil {
277 isFull = avail < MIN_FREE_KILOBYTES
279 log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
283 // If the volume is full, timestamp it.
285 now := fmt.Sprintf("%d", time.Now().Unix())
286 os.Symlink(now, fullSymlink)
291 // FreeDiskSpace(volume)
292 // Returns the amount of available disk space on VOLUME,
293 // as a number of 1k blocks.
295 func FreeDiskSpace(volume string) (free int, err error) {
296 // Run df to find out how much disk space is left.
297 cmd := exec.Command("df", "--block-size=1k", volume)
298 stdout, perr := cmd.StdoutPipe()
302 scanner := bufio.NewScanner(stdout)
303 if perr := cmd.Start(); err != nil {
307 scanner.Scan() // skip header line of df output
310 f := strings.Fields(scanner.Text())
311 if avail, err := strconv.Atoi(f[3]); err == nil {
314 err = errors.New("bad df format: " + scanner.Text())
317 // Flush the df output and shut it down cleanly.