Typo fix.
[arvados.git] / services / keep / keep.go
1 package main
2
3 import (
4         "bufio"
5         "crypto/md5"
6         "errors"
7         "fmt"
8         "github.com/gorilla/mux"
9         "log"
10         "net/http"
11         "os"
12         "path"
13         "strings"
14 )
15
16 const DEFAULT_PORT = 25107
17 const BLOCKSIZE = 64 * 1024 * 1024
18
19 var PROC_MOUNTS = "/proc/mounts"
20
21 var KeepVolumes []string
22
23 type KeepError struct {
24         HTTPCode int
25         Err      error
26 }
27
28 func (e *KeepError) Error() string {
29         return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
30 }
31
32 func main() {
33         // Look for local keep volumes.
34         KeepVolumes = FindKeepVolumes()
35         if len(KeepVolumes) == 0 {
36                 log.Fatal("could not find any keep volumes")
37         }
38         for _, v := range KeepVolumes {
39                 log.Println("keep volume:", v)
40         }
41
42         // Set up REST handlers.
43         //
44         // Start with a router that will route each URL path to an
45         // appropriate handler.
46         //
47         rest := mux.NewRouter()
48         rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
49         rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT")
50
51         // Tell the built-in HTTP server to direct all requests to the REST
52         // router.
53         http.Handle("/", rest)
54
55         // Start listening for requests.
56         port := fmt.Sprintf(":%d", DEFAULT_PORT)
57         http.ListenAndServe(port, nil)
58 }
59
60 // FindKeepVolumes
61 //     Returns a list of Keep volumes mounted on this system.
62 //
63 //     A Keep volume is a normal or tmpfs volume with a /keep
64 //     directory at the top level of the mount point.
65 //
66 func FindKeepVolumes() []string {
67         vols := make([]string, 0)
68
69         if f, err := os.Open(PROC_MOUNTS); err != nil {
70                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
71         } else {
72                 scanner := bufio.NewScanner(f)
73                 for scanner.Scan() {
74                         args := strings.Fields(scanner.Text())
75                         dev, mount := args[0], args[1]
76                         if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
77                                 keep := mount + "/keep"
78                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
79                                         vols = append(vols, keep)
80                                 }
81                         }
82                 }
83                 if err := scanner.Err(); err != nil {
84                         log.Fatal(err)
85                 }
86         }
87         return vols
88 }
89
90 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
91         hash := mux.Vars(req)["hash"]
92
93         block, err := GetBlock(hash)
94         if err != nil {
95                 http.Error(w, err.Error(), 404)
96                 return
97         }
98
99         _, err = w.Write(block)
100         if err != nil {
101                 log.Printf("GetBlockHandler: writing response: %s", err)
102         }
103
104         return
105 }
106
107 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
108         hash := mux.Vars(req)["hash"]
109
110         // Read the block data to be stored.
111         // TODO(twp): decide what to do when the input stream contains
112         // more than BLOCKSIZE bytes.
113         //
114         buf := make([]byte, BLOCKSIZE)
115         if nread, err := req.Body.Read(buf); err == nil {
116                 if err := PutBlock(buf[:nread], hash); err == nil {
117                         w.WriteHeader(http.StatusOK)
118                 } else {
119                         http.Error(w, err.Error(), err.HTTPCode)
120                 }
121         } else {
122                 log.Println("error reading request: ", err)
123                 http.Error(w, err.Error(), 500)
124         }
125 }
126
127 func GetBlock(hash string) ([]byte, error) {
128         var buf = make([]byte, BLOCKSIZE)
129
130         // Attempt to read the requested hash from a keep volume.
131         for _, vol := range KeepVolumes {
132                 var f *os.File
133                 var err error
134                 var nread int
135
136                 path := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
137
138                 f, err = os.Open(path)
139                 if err != nil {
140                         log.Printf("%s: opening %s: %s\n", vol, path, err)
141                         continue
142                 }
143
144                 nread, err = f.Read(buf)
145                 if err != nil {
146                         log.Printf("%s: reading %s: %s\n", vol, path, err)
147                         continue
148                 }
149
150                 // Double check the file checksum.
151                 //
152                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
153                 if filehash != hash {
154                         // TODO(twp): this condition probably represents a bad disk and
155                         // should raise major alarm bells for an administrator: e.g.
156                         // they should be sent directly to an event manager at high
157                         // priority or logged as urgent problems.
158                         //
159                         log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
160                                 vol, path, filehash)
161                         continue
162                 }
163
164                 // Success!
165                 return buf[:nread], nil
166         }
167
168         log.Printf("%s: not found on any volumes, giving up\n", hash)
169         return buf, &KeepError{404, errors.New("not found: " + hash)}
170 }
171
172 /* PutBlock(block, hash)
173    Stores the BLOCK (identified by the content id HASH) in Keep.
174
175    The MD5 checksum of the block must be identical to the content id HASH.
176    If not, an error is returned.
177
178    PutBlock stores the BLOCK in each of the available Keep volumes.
179    If any volume fails, an error is signaled on the back end.  A write
180    error is returned only if all volumes fail.
181
182    On success, PutBlock returns nil.
183    On failure, it returns a KeepError with one of the following codes:
184
185    401 MD5Fail
186          -- The MD5 hash of the BLOCK does not match the argument HASH.
187    503 Full
188          -- There was not enough space left in any Keep volume to store
189             the object.
190    500 Fail
191          -- The object could not be stored for some other reason (e.g.
192             all writes failed). The text of the error message should
193             provide as much detail as possible.
194 */
195
196 func PutBlock(block []byte, hash string) *KeepError {
197         // Check that BLOCK's checksum matches HASH.
198         blockhash := fmt.Sprintf("%x", md5.Sum(block))
199         if blockhash != hash {
200                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
201                 return &KeepError{401, errors.New("MD5Fail")}
202         }
203
204         // any_success will be set to true upon a successful block write.
205         any_success := false
206         for _, vol := range KeepVolumes {
207
208                 bFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
209                 if err := os.MkdirAll(path.Dir(bFilename), 0755); err != nil {
210                         log.Printf("%s: could not create directory %s: %s",
211                                 hash, path.Dir(bFilename), err)
212                         continue
213                 }
214
215                 f, err := os.OpenFile(bFilename, os.O_CREATE|os.O_WRONLY, 0644)
216                 if err != nil {
217                         // if the block already exists, just skip to the next volume.
218                         if os.IsExist(err) {
219                                 any_success = true
220                                 continue
221                         } else {
222                                 // Open failed for some other reason.
223                                 log.Printf("%s: creating %s: %s\n", vol, bFilename, err)
224                                 continue
225                         }
226                 }
227
228                 if _, err := f.Write(block); err == nil {
229                         f.Close()
230                         any_success = true
231                         continue
232                 } else {
233                         log.Printf("%s: writing to %s: %s\n", vol, bFilename, err)
234                         continue
235                 }
236         }
237
238         if any_success {
239                 return nil
240         } else {
241                 log.Printf("all Keep volumes failed")
242                 return &KeepError{500, errors.New("Fail")}
243         }
244 }