Merge branch 'master' into 1971-show-image-thumbnails
[arvados.git] / services / keep / keep.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/md5"
7         "errors"
8         "flag"
9         "fmt"
10         "github.com/gorilla/mux"
11         "io"
12         "io/ioutil"
13         "log"
14         "net/http"
15         "os"
16         "strconv"
17         "strings"
18         "syscall"
19         "time"
20 )
21
22 // ======================
23 // Configuration settings
24 //
25 // TODO(twp): make all of these configurable via command line flags
26 // and/or configuration file settings.
27
28 // Default TCP address on which to listen for requests.
29 const DEFAULT_ADDR = ":25107"
30
31 // A Keep "block" is 64MB.
32 const BLOCKSIZE = 64 * 1024 * 1024
33
34 // A Keep volume must have at least MIN_FREE_KILOBYTES available
35 // in order to permit writes.
36 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
37
38 var PROC_MOUNTS = "/proc/mounts"
39
40 var KeepVolumes []string
41
42 // ==========
43 // Error types.
44 //
45 type KeepError struct {
46         HTTPCode int
47         ErrMsg   string
48 }
49
50 var (
51         CollisionError = &KeepError{400, "Collision"}
52         MD5Error       = &KeepError{401, "MD5 Failure"}
53         CorruptError   = &KeepError{402, "Corruption"}
54         NotFoundError  = &KeepError{404, "Not Found"}
55         GenericError   = &KeepError{500, "Fail"}
56         FullError      = &KeepError{503, "Full"}
57         TooLongError   = &KeepError{504, "Too Long"}
58 )
59
60 func (e *KeepError) Error() string {
61         return e.ErrMsg
62 }
63
64 // This error is returned by ReadAtMost if the available
65 // data exceeds BLOCKSIZE bytes.
66 var ReadErrorTooLong = errors.New("Too long")
67
68 func main() {
69         // Parse command-line flags:
70         //
71         // -listen=ipaddr:port
72         //    Interface on which to listen for requests. Use :port without
73         //    an ipaddr to listen on all network interfaces.
74         //    Examples:
75         //      -listen=127.0.0.1:4949
76         //      -listen=10.0.1.24:8000
77         //      -listen=:25107 (to listen to port 25107 on all interfaces)
78         //
79         // -volumes
80         //    A comma-separated list of directories to use as Keep volumes.
81         //    Example:
82         //      -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
83         //
84         //    If -volumes is empty or is not present, Keep will select volumes
85         //    by looking at currently mounted filesystems for /keep top-level
86         //    directories.
87
88         var listen, keepvols string
89         flag.StringVar(&listen, "listen", DEFAULT_ADDR,
90                 "interface on which to listen for requests, in the format ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port to listen on all network interfaces.")
91         flag.StringVar(&keepvols, "volumes", "",
92                 "Comma-separated list of directories to use for Keep volumes, e.g. -volumes=/var/keep1,/var/keep2. If empty or not supplied, Keep will scan mounted filesystems for volumes with a /keep top-level directory.")
93         flag.Parse()
94
95         // Look for local keep volumes.
96         if keepvols == "" {
97                 // TODO(twp): decide whether this is desirable default behavior.
98                 // In production we may want to require the admin to specify
99                 // Keep volumes explicitly.
100                 KeepVolumes = FindKeepVolumes()
101         } else {
102                 KeepVolumes = strings.Split(keepvols, ",")
103         }
104
105         if len(KeepVolumes) == 0 {
106                 log.Fatal("could not find any keep volumes")
107         }
108         for _, v := range KeepVolumes {
109                 log.Println("keep volume:", v)
110         }
111
112         // Set up REST handlers.
113         //
114         // Start with a router that will route each URL path to an
115         // appropriate handler.
116         //
117         rest := mux.NewRouter()
118         rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
119         rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT")
120
121         // Tell the built-in HTTP server to direct all requests to the REST
122         // router.
123         http.Handle("/", rest)
124
125         // Start listening for requests.
126         http.ListenAndServe(listen, nil)
127 }
128
129 // FindKeepVolumes
130 //     Returns a list of Keep volumes mounted on this system.
131 //
132 //     A Keep volume is a normal or tmpfs volume with a /keep
133 //     directory at the top level of the mount point.
134 //
135 func FindKeepVolumes() []string {
136         vols := make([]string, 0)
137
138         if f, err := os.Open(PROC_MOUNTS); err != nil {
139                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
140         } else {
141                 scanner := bufio.NewScanner(f)
142                 for scanner.Scan() {
143                         args := strings.Fields(scanner.Text())
144                         dev, mount := args[0], args[1]
145                         if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
146                                 keep := mount + "/keep"
147                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
148                                         vols = append(vols, keep)
149                                 }
150                         }
151                 }
152                 if err := scanner.Err(); err != nil {
153                         log.Fatal(err)
154                 }
155         }
156         return vols
157 }
158
159 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
160         hash := mux.Vars(req)["hash"]
161
162         block, err := GetBlock(hash)
163         if err != nil {
164                 http.Error(w, err.Error(), 404)
165                 return
166         }
167
168         _, err = w.Write(block)
169         if err != nil {
170                 log.Printf("GetBlockHandler: writing response: %s", err)
171         }
172
173         return
174 }
175
176 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
177         hash := mux.Vars(req)["hash"]
178
179         // Read the block data to be stored.
180         // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
181         //
182         // Note: because req.Body is a buffered Reader, each Read() call will
183         // collect only the data in the network buffer (typically 16384 bytes),
184         // even if it is passed a much larger slice.
185         //
186         // Instead, call ReadAtMost to read data from the socket
187         // repeatedly until either EOF or BLOCKSIZE bytes have been read.
188         //
189         if buf, err := ReadAtMost(req.Body, BLOCKSIZE); err == nil {
190                 if err := PutBlock(buf, hash); err == nil {
191                         w.WriteHeader(http.StatusOK)
192                 } else {
193                         ke := err.(*KeepError)
194                         http.Error(w, ke.Error(), ke.HTTPCode)
195                 }
196         } else {
197                 log.Println("error reading request: ", err)
198                 errmsg := err.Error()
199                 if err == ReadErrorTooLong {
200                         // Use a more descriptive error message that includes
201                         // the maximum request size.
202                         errmsg = fmt.Sprintf("Max request size %d bytes", BLOCKSIZE)
203                 }
204                 http.Error(w, errmsg, 500)
205         }
206 }
207
208 func GetBlock(hash string) ([]byte, error) {
209         var buf = make([]byte, BLOCKSIZE)
210
211         // Attempt to read the requested hash from a keep volume.
212         for _, vol := range KeepVolumes {
213                 var f *os.File
214                 var err error
215                 var nread int
216
217                 blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
218
219                 f, err = os.Open(blockFilename)
220                 if err != nil {
221                         if !os.IsNotExist(err) {
222                                 // A block is stored on only one Keep disk,
223                                 // so os.IsNotExist is expected.  Report any other errors.
224                                 log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
225                         }
226                         continue
227                 }
228
229                 nread, err = f.Read(buf)
230                 if err != nil {
231                         log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
232                         continue
233                 }
234
235                 // Double check the file checksum.
236                 //
237                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
238                 if filehash != hash {
239                         // TODO(twp): this condition probably represents a bad disk and
240                         // should raise major alarm bells for an administrator: e.g.
241                         // they should be sent directly to an event manager at high
242                         // priority or logged as urgent problems.
243                         //
244                         log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
245                                 vol, blockFilename, filehash)
246                         return buf, CorruptError
247                 }
248
249                 // Success!
250                 return buf[:nread], nil
251         }
252
253         log.Printf("%s: not found on any volumes, giving up\n", hash)
254         return buf, NotFoundError
255 }
256
257 /* PutBlock(block, hash)
258    Stores the BLOCK (identified by the content id HASH) in Keep.
259
260    The MD5 checksum of the block must be identical to the content id HASH.
261    If not, an error is returned.
262
263    PutBlock stores the BLOCK on the first Keep volume with free space.
264    A failure code is returned to the user only if all volumes fail.
265
266    On success, PutBlock returns nil.
267    On failure, it returns a KeepError with one of the following codes:
268
269    400 Collision
270           A different block with the same hash already exists on this
271           Keep server.
272    401 MD5Fail
273           The MD5 hash of the BLOCK does not match the argument HASH.
274    503 Full
275           There was not enough space left in any Keep volume to store
276           the object.
277    500 Fail
278           The object could not be stored for some other reason (e.g.
279           all writes failed). The text of the error message should
280           provide as much detail as possible.
281 */
282
283 func PutBlock(block []byte, hash string) error {
284         // Check that BLOCK's checksum matches HASH.
285         blockhash := fmt.Sprintf("%x", md5.Sum(block))
286         if blockhash != hash {
287                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
288                 return MD5Error
289         }
290
291         // If we already have a block on disk under this identifier, return
292         // success (but check for MD5 collisions).
293         // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
294         // In either case, we want to write our new (good) block to disk, so there is
295         // nothing special to do if err != nil.
296         if oldblock, err := GetBlock(hash); err == nil {
297                 if bytes.Compare(block, oldblock) == 0 {
298                         return nil
299                 } else {
300                         return CollisionError
301                 }
302         }
303
304         // Store the block on the first available Keep volume.
305         allFull := true
306         for _, vol := range KeepVolumes {
307                 if IsFull(vol) {
308                         continue
309                 }
310                 allFull = false
311                 blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
312                 if err := os.MkdirAll(blockDir, 0755); err != nil {
313                         log.Printf("%s: could not create directory %s: %s",
314                                 hash, blockDir, err)
315                         continue
316                 }
317
318                 tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash)
319                 if tmperr != nil {
320                         log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr)
321                         continue
322                 }
323                 blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
324
325                 if _, err := tmpfile.Write(block); err != nil {
326                         log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
327                         continue
328                 }
329                 if err := tmpfile.Close(); err != nil {
330                         log.Printf("closing %s: %s\n", tmpfile.Name(), err)
331                         os.Remove(tmpfile.Name())
332                         continue
333                 }
334                 if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
335                         log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
336                         os.Remove(tmpfile.Name())
337                         continue
338                 }
339                 return nil
340         }
341
342         if allFull {
343                 log.Printf("all Keep volumes full")
344                 return FullError
345         } else {
346                 log.Printf("all Keep volumes failed")
347                 return GenericError
348         }
349 }
350
351 func IsFull(volume string) (isFull bool) {
352         fullSymlink := volume + "/full"
353
354         // Check if the volume has been marked as full in the last hour.
355         if link, err := os.Readlink(fullSymlink); err == nil {
356                 if ts, err := strconv.Atoi(link); err == nil {
357                         fulltime := time.Unix(int64(ts), 0)
358                         if time.Since(fulltime).Hours() < 1.0 {
359                                 return true
360                         }
361                 }
362         }
363
364         if avail, err := FreeDiskSpace(volume); err == nil {
365                 isFull = avail < MIN_FREE_KILOBYTES
366         } else {
367                 log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
368                 isFull = false
369         }
370
371         // If the volume is full, timestamp it.
372         if isFull {
373                 now := fmt.Sprintf("%d", time.Now().Unix())
374                 os.Symlink(now, fullSymlink)
375         }
376         return
377 }
378
379 // FreeDiskSpace(volume)
380 //     Returns the amount of available disk space on VOLUME,
381 //     as a number of 1k blocks.
382 //
383 func FreeDiskSpace(volume string) (free uint64, err error) {
384         var fs syscall.Statfs_t
385         err = syscall.Statfs(volume, &fs)
386         if err == nil {
387                 // Statfs output is not guaranteed to measure free
388                 // space in terms of 1K blocks.
389                 free = fs.Bavail * uint64(fs.Bsize) / 1024
390         }
391
392         return
393 }
394
395 // ReadAtMost
396 //     Reads bytes repeatedly from an io.Reader until either
397 //     encountering EOF, or the maxbytes byte limit has been reached.
398 //     Returns a byte slice of the bytes that were read.
399 //
400 //     If the reader contains more than maxbytes, returns a nil slice
401 //     and an error.
402 //
403 func ReadAtMost(r io.Reader, maxbytes int) ([]byte, error) {
404         // Attempt to read one more byte than maxbytes.
405         lr := io.LimitReader(r, int64(maxbytes+1))
406         buf, err := ioutil.ReadAll(lr)
407         if len(buf) > maxbytes {
408                 return nil, ReadErrorTooLong
409         }
410         return buf, err
411 }