Merge branch 'master' into 2449-keep-write-blocks
[arvados.git] / services / keep / keep.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/md5"
7         "errors"
8         "fmt"
9         "github.com/gorilla/mux"
10         "log"
11         "net/http"
12         "os"
13         "os/exec"
14         "strconv"
15         "strings"
16         "time"
17 )
18
19 // Default TCP port on which to listen for requests.
20 const DEFAULT_PORT = 25107
21
22 // A Keep "block" is 64MB.
23 const BLOCKSIZE = 64 * 1024 * 1024
24
25 // A Keep volume must have at least MIN_FREE_KILOBYTES available
26 // in order to permit writes.
27 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
28
29 var PROC_MOUNTS = "/proc/mounts"
30
31 var KeepVolumes []string
32
33 type KeepError struct {
34         HTTPCode int
35         Err      error
36 }
37
38 const (
39         ErrCollision = 400
40         ErrMD5Fail   = 401
41         ErrCorrupt   = 402
42         ErrNotFound  = 404
43         ErrOther     = 500
44         ErrFull      = 503
45 )
46
47 func (e *KeepError) Error() string {
48         return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
49 }
50
51 func main() {
52         // Look for local keep volumes.
53         KeepVolumes = FindKeepVolumes()
54         if len(KeepVolumes) == 0 {
55                 log.Fatal("could not find any keep volumes")
56         }
57         for _, v := range KeepVolumes {
58                 log.Println("keep volume:", v)
59         }
60
61         // Set up REST handlers.
62         //
63         // Start with a router that will route each URL path to an
64         // appropriate handler.
65         //
66         rest := mux.NewRouter()
67         rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
68         rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT")
69
70         // Tell the built-in HTTP server to direct all requests to the REST
71         // router.
72         http.Handle("/", rest)
73
74         // Start listening for requests.
75         port := fmt.Sprintf(":%d", DEFAULT_PORT)
76         http.ListenAndServe(port, nil)
77 }
78
79 // FindKeepVolumes
80 //     Returns a list of Keep volumes mounted on this system.
81 //
82 //     A Keep volume is a normal or tmpfs volume with a /keep
83 //     directory at the top level of the mount point.
84 //
85 func FindKeepVolumes() []string {
86         vols := make([]string, 0)
87
88         if f, err := os.Open(PROC_MOUNTS); err != nil {
89                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
90         } else {
91                 scanner := bufio.NewScanner(f)
92                 for scanner.Scan() {
93                         args := strings.Fields(scanner.Text())
94                         dev, mount := args[0], args[1]
95                         if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
96                                 keep := mount + "/keep"
97                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
98                                         vols = append(vols, keep)
99                                 }
100                         }
101                 }
102                 if err := scanner.Err(); err != nil {
103                         log.Fatal(err)
104                 }
105         }
106         return vols
107 }
108
109 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
110         hash := mux.Vars(req)["hash"]
111
112         block, err := GetBlock(hash)
113         if err != nil {
114                 http.Error(w, err.Error(), 404)
115                 return
116         }
117
118         _, err = w.Write(block)
119         if err != nil {
120                 log.Printf("GetBlockHandler: writing response: %s", err)
121         }
122
123         return
124 }
125
126 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
127         hash := mux.Vars(req)["hash"]
128
129         // Read the block data to be stored.
130         // TODO(twp): decide what to do when the input stream contains
131         // more than BLOCKSIZE bytes.
132         //
133         buf := make([]byte, BLOCKSIZE)
134         if nread, err := req.Body.Read(buf); err == nil {
135                 if err := PutBlock(buf[:nread], hash); err == nil {
136                         w.WriteHeader(http.StatusOK)
137                 } else {
138                         ke := err.(*KeepError)
139                         http.Error(w, ke.Error(), ke.HTTPCode)
140                 }
141         } else {
142                 log.Println("error reading request: ", err)
143                 http.Error(w, err.Error(), 500)
144         }
145 }
146
147 func GetBlock(hash string) ([]byte, error) {
148         var buf = make([]byte, BLOCKSIZE)
149
150         // Attempt to read the requested hash from a keep volume.
151         for _, vol := range KeepVolumes {
152                 var f *os.File
153                 var err error
154                 var nread int
155
156                 blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
157
158                 f, err = os.Open(blockFilename)
159                 if err != nil {
160                         if !os.IsNotExist(err) {
161                                 // A block is stored on only one Keep disk,
162                                 // so os.IsNotExist is expected.  Report any other errors.
163                                 log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
164                         }
165                         continue
166                 }
167
168                 nread, err = f.Read(buf)
169                 if err != nil {
170                         log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
171                         continue
172                 }
173
174                 // Double check the file checksum.
175                 //
176                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
177                 if filehash != hash {
178                         // TODO(twp): this condition probably represents a bad disk and
179                         // should raise major alarm bells for an administrator: e.g.
180                         // they should be sent directly to an event manager at high
181                         // priority or logged as urgent problems.
182                         //
183                         log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
184                                 vol, blockFilename, filehash)
185                         return buf, &KeepError{ErrCorrupt, errors.New("Corrupt")}
186                 }
187
188                 // Success!
189                 return buf[:nread], nil
190         }
191
192         log.Printf("%s: not found on any volumes, giving up\n", hash)
193         return buf, &KeepError{ErrNotFound, errors.New("not found: " + hash)}
194 }
195
196 /* PutBlock(block, hash)
197    Stores the BLOCK (identified by the content id HASH) in Keep.
198
199    The MD5 checksum of the block must be identical to the content id HASH.
200    If not, an error is returned.
201
202    PutBlock stores the BLOCK on the first Keep volume with free space.
203    A failure code is returned to the user only if all volumes fail.
204
205    On success, PutBlock returns nil.
206    On failure, it returns a KeepError with one of the following codes:
207
208    400 Collision
209           A different block with the same hash already exists on this
210           Keep server.
211    401 MD5Fail
212           The MD5 hash of the BLOCK does not match the argument HASH.
213    503 Full
214           There was not enough space left in any Keep volume to store
215           the object.
216    500 Fail
217           The object could not be stored for some other reason (e.g.
218           all writes failed). The text of the error message should
219           provide as much detail as possible.
220 */
221
222 func PutBlock(block []byte, hash string) error {
223         // Check that BLOCK's checksum matches HASH.
224         blockhash := fmt.Sprintf("%x", md5.Sum(block))
225         if blockhash != hash {
226                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
227                 return &KeepError{ErrMD5Fail, errors.New("MD5Fail")}
228         }
229
230         // If we already have a block on disk under this identifier, return
231         // success (but check for MD5 collisions, which may signify on-disk corruption).
232         if oldblock, err := GetBlock(hash); err == nil {
233                 if bytes.Compare(block, oldblock) == 0 {
234                         return nil
235                 } else {
236                         return &KeepError{ErrCollision, errors.New("Collision")}
237                 }
238         } else {
239                 ke := err.(*KeepError)
240                 if ke.HTTPCode == ErrCorrupt {
241                         return &KeepError{ErrCollision, errors.New("Collision")}
242                 }
243         }
244
245         // Store the block on the first available Keep volume.
246         allFull := true
247         for _, vol := range KeepVolumes {
248                 if IsFull(vol) {
249                         continue
250                 }
251                 allFull = false
252                 blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
253                 if err := os.MkdirAll(blockDir, 0755); err != nil {
254                         log.Printf("%s: could not create directory %s: %s",
255                                 hash, blockDir, err)
256                         continue
257                 }
258
259                 blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
260
261                 f, err := os.OpenFile(blockFilename, os.O_CREATE|os.O_WRONLY, 0644)
262                 if err != nil {
263                         log.Printf("%s: creating %s: %s\n", vol, blockFilename, err)
264                         continue
265                 }
266
267                 if _, err := f.Write(block); err == nil {
268                         f.Close()
269                         return nil
270                 } else {
271                         log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
272                         continue
273                 }
274         }
275
276         if allFull {
277                 log.Printf("all Keep volumes full")
278                 return &KeepError{ErrFull, errors.New("Full")}
279         } else {
280                 log.Printf("all Keep volumes failed")
281                 return &KeepError{ErrOther, errors.New("Fail")}
282         }
283 }
284
285 func IsFull(volume string) (isFull bool) {
286         fullSymlink := volume + "/full"
287
288         // Check if the volume has been marked as full in the last hour.
289         if link, err := os.Readlink(fullSymlink); err == nil {
290                 if ts, err := strconv.Atoi(link); err == nil {
291                         fulltime := time.Unix(int64(ts), 0)
292                         if time.Since(fulltime).Hours() < 1.0 {
293                                 return true
294                         }
295                 }
296         }
297
298         if avail, err := FreeDiskSpace(volume); err == nil {
299                 isFull = avail < MIN_FREE_KILOBYTES
300         } else {
301                 log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
302                 isFull = false
303         }
304
305         // If the volume is full, timestamp it.
306         if isFull {
307                 now := fmt.Sprintf("%d", time.Now().Unix())
308                 os.Symlink(now, fullSymlink)
309         }
310         return
311 }
312
313 // FreeDiskSpace(volume)
314 //     Returns the amount of available disk space on VOLUME,
315 //     as a number of 1k blocks.
316 //
317 func FreeDiskSpace(volume string) (free int, err error) {
318         // Run df to find out how much disk space is left.
319         cmd := exec.Command("df", "--block-size=1k", volume)
320         stdout, perr := cmd.StdoutPipe()
321         if perr != nil {
322                 return 0, perr
323         }
324         scanner := bufio.NewScanner(stdout)
325         if perr := cmd.Start(); err != nil {
326                 return 0, perr
327         }
328
329         scanner.Scan() // skip header line of df output
330         scanner.Scan()
331
332         f := strings.Fields(scanner.Text())
333         if avail, err := strconv.Atoi(f[3]); err == nil {
334                 free = avail
335         } else {
336                 err = errors.New("bad df format: " + scanner.Text())
337         }
338
339         // Flush the df output and shut it down cleanly.
340         for scanner.Scan() {
341         }
342         cmd.Wait()
343
344         return
345 }