8 "github.com/gorilla/mux"
16 const DEFAULT_PORT = 25107
17 const BLOCKSIZE = 64 * 1024 * 1024
19 var PROC_MOUNTS = "/proc/mounts"
21 var KeepVolumes []string
23 type KeepError struct {
28 func (e *KeepError) Error() string {
29 return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
33 // Look for local keep volumes.
34 KeepVolumes = FindKeepVolumes()
35 if len(KeepVolumes) == 0 {
36 log.Fatal("could not find any keep volumes")
38 for _, v := range KeepVolumes {
39 log.Println("keep volume:", v)
42 // Set up REST handlers.
44 // Start with a router that will route each URL path to an
45 // appropriate handler.
47 rest := mux.NewRouter()
48 rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
49 rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT")
51 // Tell the built-in HTTP server to direct all requests to the REST
53 http.Handle("/", rest)
55 // Start listening for requests.
56 port := fmt.Sprintf(":%d", DEFAULT_PORT)
57 http.ListenAndServe(port, nil)
61 // Returns a list of Keep volumes mounted on this system.
63 // A Keep volume is a normal or tmpfs volume with a /keep
64 // directory at the top level of the mount point.
66 func FindKeepVolumes() []string {
67 vols := make([]string, 0)
69 if f, err := os.Open(PROC_MOUNTS); err != nil {
70 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
72 scanner := bufio.NewScanner(f)
74 args := strings.Fields(scanner.Text())
75 dev, mount := args[0], args[1]
76 if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
77 keep := mount + "/keep"
78 if st, err := os.Stat(keep); err == nil && st.IsDir() {
79 vols = append(vols, keep)
83 if err := scanner.Err(); err != nil {
90 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
91 hash := mux.Vars(req)["hash"]
93 block, err := GetBlock(hash)
95 http.Error(w, err.Error(), 404)
99 _, err = w.Write(block)
101 log.Printf("GetBlockHandler: writing response: %s", err)
107 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
108 hash := mux.Vars(req)["hash"]
110 // Read the block data to be stored.
111 // TODO(twp): decide what to do when the input stream contains
112 // more than BLOCKSIZE bytes.
114 buf := make([]byte, BLOCKSIZE)
115 if nread, err := req.Body.Read(buf); err == nil {
116 if err := PutBlock(buf[:nread], hash); err == nil {
117 w.WriteHeader(http.StatusOK)
119 http.Error(w, err.Error(), err.HTTPCode)
122 log.Println("error reading request: ", err)
123 http.Error(w, err.Error(), 500)
127 func GetBlock(hash string) ([]byte, error) {
128 var buf = make([]byte, BLOCKSIZE)
130 // Attempt to read the requested hash from a keep volume.
131 for _, vol := range KeepVolumes {
136 path := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
138 f, err = os.Open(path)
140 log.Printf("%s: opening %s: %s\n", vol, path, err)
144 nread, err = f.Read(buf)
146 log.Printf("%s: reading %s: %s\n", vol, path, err)
150 // Double check the file checksum.
152 filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
153 if filehash != hash {
154 // TODO(twp): this condition probably represents a bad disk and
155 // should raise major alarm bells for an administrator: e.g.
156 // they should be sent directly to an event manager at high
157 // priority or logged as urgent problems.
159 log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
165 return buf[:nread], nil
168 log.Printf("%s: not found on any volumes, giving up\n", hash)
169 return buf, &KeepError{404, errors.New("not found: " + hash)}
172 /* PutBlock(block, hash)
173 Stores the BLOCK (identified by the content id HASH) in Keep.
175 The MD5 checksum of the block must be identical to the content id HASH.
176 If not, an error is returned.
178 PutBlock stores the BLOCK in each of the available Keep volumes.
179 If any volume fails, an error is signaled on the back end. A write
180 error is returned only if all volumes fail.
182 On success, PutBlock returns nil.
183 On failure, it returns a KeepError with one of the following codes:
186 -- The MD5 hash of the BLOCK does not match the argument HASH.
188 -- There was not enough space left in any Keep volume to store
191 -- The object could not be stored for some other reason (e.g.
192 all writes failed). The text of the error message should
193 provide as much detail as possible.
196 func PutBlock(block []byte, hash string) *KeepError {
197 // Check that BLOCK's checksum matches HASH.
198 blockhash := fmt.Sprintf("%x", md5.Sum(block))
199 if blockhash != hash {
200 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
201 return &KeepError{401, errors.New("MD5Fail")}
204 // any_success will be set to true upon a successful block write.
206 for _, vol := range KeepVolumes {
208 bFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
209 if err := os.MkdirAll(path.Dir(bFilename), 0755); err != nil {
210 log.Printf("%s: could not create directory %s: %s",
211 hash, path.Dir(bFilename), err)
215 f, err := os.OpenFile(bFilename, os.O_CREATE|os.O_WRONLY, 0644)
217 // if the block already exists, just skip to the next volume.
222 // Open failed for some other reason.
223 log.Printf("%s: creating %s: %s\n", vol, bFilename, err)
228 if _, err := f.Write(block); err == nil {
233 log.Printf("%s: writing to %s: %s\n", vol, bFilename, err)
241 log.Printf("all Keep volumes failed")
242 return &KeepError{500, errors.New("Fail")}