7981bb9db4340b4d0bfaf808918834890f66cc70
[arvados.git] / services / keep / keep.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/md5"
7         "errors"
8         "flag"
9         "fmt"
10         "github.com/gorilla/mux"
11         "io/ioutil"
12         "log"
13         "net/http"
14         "os"
15         "strconv"
16         "strings"
17         "syscall"
18         "time"
19 )
20
21 // Default TCP address on which to listen for requests.
22 const DEFAULT_ADDR = ":25107"
23
24 // A Keep "block" is 64MB.
25 const BLOCKSIZE = 64 * 1024 * 1024
26
27 // A Keep volume must have at least MIN_FREE_KILOBYTES available
28 // in order to permit writes.
29 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
30
31 var PROC_MOUNTS = "/proc/mounts"
32
33 var KeepVolumes []string
34
35 type KeepError struct {
36         HTTPCode int
37         Err      error
38 }
39
40 const (
41         ErrCollision = 400
42         ErrMD5Fail   = 401
43         ErrCorrupt   = 402
44         ErrNotFound  = 404
45         ErrOther     = 500
46         ErrFull      = 503
47 )
48
49 func (e *KeepError) Error() string {
50         return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
51 }
52
53 func main() {
54         // Parse command-line flags:
55         //
56         // -listen=ipaddr:port
57         //    Interface on which to listen for requests. Use :port without
58         //    an ipaddr to listen on all network interfaces.
59         //    Examples:
60         //      -listen=127.0.0.1:4949
61         //      -listen=10.0.1.24:8000
62         //      -listen=:25107 (to listen to port 25107 on all interfaces)
63         //
64         // -volumes
65         //    A comma-separated list of directories to use as Keep volumes.
66         //    Example:
67         //      -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
68         //
69         //    If -volumes is empty or is not present, Keep will select volumes
70         //    by looking at currently mounted filesystems for /keep top-level
71         //    directories.
72
73         var listen, keepvols string
74         flag.StringVar(&listen, "listen", DEFAULT_ADDR,
75                 "interface on which to listen for requests")
76         flag.StringVar(&keepvols, "volumes", "",
77                 "comma-separated list of directories to use for Keep volumes")
78         flag.Parse()
79
80         // Look for local keep volumes.
81         if keepvols == "" {
82                 KeepVolumes = FindKeepVolumes()
83         } else {
84                 KeepVolumes = strings.Split(keepvols, ",")
85         }
86
87         if len(KeepVolumes) == 0 {
88                 log.Fatal("could not find any keep volumes")
89         }
90         for _, v := range KeepVolumes {
91                 log.Println("keep volume:", v)
92         }
93
94         // Set up REST handlers.
95         //
96         // Start with a router that will route each URL path to an
97         // appropriate handler.
98         //
99         rest := mux.NewRouter()
100         rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
101         rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT")
102
103         // Tell the built-in HTTP server to direct all requests to the REST
104         // router.
105         http.Handle("/", rest)
106
107         // Start listening for requests.
108         http.ListenAndServe(listen, nil)
109 }
110
111 // FindKeepVolumes
112 //     Returns a list of Keep volumes mounted on this system.
113 //
114 //     A Keep volume is a normal or tmpfs volume with a /keep
115 //     directory at the top level of the mount point.
116 //
117 func FindKeepVolumes() []string {
118         vols := make([]string, 0)
119
120         if f, err := os.Open(PROC_MOUNTS); err != nil {
121                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
122         } else {
123                 scanner := bufio.NewScanner(f)
124                 for scanner.Scan() {
125                         args := strings.Fields(scanner.Text())
126                         dev, mount := args[0], args[1]
127                         if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
128                                 keep := mount + "/keep"
129                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
130                                         vols = append(vols, keep)
131                                 }
132                         }
133                 }
134                 if err := scanner.Err(); err != nil {
135                         log.Fatal(err)
136                 }
137         }
138         return vols
139 }
140
141 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
142         hash := mux.Vars(req)["hash"]
143
144         block, err := GetBlock(hash)
145         if err != nil {
146                 http.Error(w, err.Error(), 404)
147                 return
148         }
149
150         _, err = w.Write(block)
151         if err != nil {
152                 log.Printf("GetBlockHandler: writing response: %s", err)
153         }
154
155         return
156 }
157
158 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
159         hash := mux.Vars(req)["hash"]
160
161         // Read the block data to be stored.
162         // TODO(twp): decide what to do when the input stream contains
163         // more than BLOCKSIZE bytes.
164         //
165         buf := make([]byte, BLOCKSIZE)
166         if nread, err := req.Body.Read(buf); err == nil {
167                 if err := PutBlock(buf[:nread], hash); err == nil {
168                         w.WriteHeader(http.StatusOK)
169                 } else {
170                         ke := err.(*KeepError)
171                         http.Error(w, ke.Error(), ke.HTTPCode)
172                 }
173         } else {
174                 log.Println("error reading request: ", err)
175                 http.Error(w, err.Error(), 500)
176         }
177 }
178
179 func GetBlock(hash string) ([]byte, error) {
180         var buf = make([]byte, BLOCKSIZE)
181
182         // Attempt to read the requested hash from a keep volume.
183         for _, vol := range KeepVolumes {
184                 var f *os.File
185                 var err error
186                 var nread int
187
188                 blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
189
190                 f, err = os.Open(blockFilename)
191                 if err != nil {
192                         if !os.IsNotExist(err) {
193                                 // A block is stored on only one Keep disk,
194                                 // so os.IsNotExist is expected.  Report any other errors.
195                                 log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
196                         }
197                         continue
198                 }
199
200                 nread, err = f.Read(buf)
201                 if err != nil {
202                         log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
203                         continue
204                 }
205
206                 // Double check the file checksum.
207                 //
208                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
209                 if filehash != hash {
210                         // TODO(twp): this condition probably represents a bad disk and
211                         // should raise major alarm bells for an administrator: e.g.
212                         // they should be sent directly to an event manager at high
213                         // priority or logged as urgent problems.
214                         //
215                         log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
216                                 vol, blockFilename, filehash)
217                         return buf, &KeepError{ErrCorrupt, errors.New("Corrupt")}
218                 }
219
220                 // Success!
221                 return buf[:nread], nil
222         }
223
224         log.Printf("%s: not found on any volumes, giving up\n", hash)
225         return buf, &KeepError{ErrNotFound, errors.New("not found: " + hash)}
226 }
227
228 /* PutBlock(block, hash)
229    Stores the BLOCK (identified by the content id HASH) in Keep.
230
231    The MD5 checksum of the block must be identical to the content id HASH.
232    If not, an error is returned.
233
234    PutBlock stores the BLOCK on the first Keep volume with free space.
235    A failure code is returned to the user only if all volumes fail.
236
237    On success, PutBlock returns nil.
238    On failure, it returns a KeepError with one of the following codes:
239
240    400 Collision
241           A different block with the same hash already exists on this
242           Keep server.
243    401 MD5Fail
244           The MD5 hash of the BLOCK does not match the argument HASH.
245    503 Full
246           There was not enough space left in any Keep volume to store
247           the object.
248    500 Fail
249           The object could not be stored for some other reason (e.g.
250           all writes failed). The text of the error message should
251           provide as much detail as possible.
252 */
253
254 func PutBlock(block []byte, hash string) error {
255         // Check that BLOCK's checksum matches HASH.
256         blockhash := fmt.Sprintf("%x", md5.Sum(block))
257         if blockhash != hash {
258                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
259                 return &KeepError{ErrMD5Fail, errors.New("MD5Fail")}
260         }
261
262         // If we already have a block on disk under this identifier, return
263         // success (but check for MD5 collisions).
264         // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
265         // In either case, we want to write our new (good) block to disk, so there is
266         // nothing special to do if err != nil.
267         if oldblock, err := GetBlock(hash); err == nil {
268                 if bytes.Compare(block, oldblock) == 0 {
269                         return nil
270                 } else {
271                         return &KeepError{ErrCollision, errors.New("Collision")}
272                 }
273         }
274
275         // Store the block on the first available Keep volume.
276         allFull := true
277         for _, vol := range KeepVolumes {
278                 if IsFull(vol) {
279                         continue
280                 }
281                 allFull = false
282                 blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
283                 if err := os.MkdirAll(blockDir, 0755); err != nil {
284                         log.Printf("%s: could not create directory %s: %s",
285                                 hash, blockDir, err)
286                         continue
287                 }
288
289                 tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash)
290                 if tmperr != nil {
291                         log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr)
292                         continue
293                 }
294                 blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
295
296                 if _, err := tmpfile.Write(block); err != nil {
297                         log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
298                         continue
299                 }
300                 if err := tmpfile.Close(); err != nil {
301                         log.Printf("closing %s: %s\n", tmpfile.Name(), err)
302                         os.Remove(tmpfile.Name())
303                         continue
304                 }
305                 if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
306                         log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
307                         os.Remove(tmpfile.Name())
308                         continue
309                 }
310                 return nil
311         }
312
313         if allFull {
314                 log.Printf("all Keep volumes full")
315                 return &KeepError{ErrFull, errors.New("Full")}
316         } else {
317                 log.Printf("all Keep volumes failed")
318                 return &KeepError{ErrOther, errors.New("Fail")}
319         }
320 }
321
322 func IsFull(volume string) (isFull bool) {
323         fullSymlink := volume + "/full"
324
325         // Check if the volume has been marked as full in the last hour.
326         if link, err := os.Readlink(fullSymlink); err == nil {
327                 if ts, err := strconv.Atoi(link); err == nil {
328                         fulltime := time.Unix(int64(ts), 0)
329                         if time.Since(fulltime).Hours() < 1.0 {
330                                 return true
331                         }
332                 }
333         }
334
335         if avail, err := FreeDiskSpace(volume); err == nil {
336                 isFull = avail < MIN_FREE_KILOBYTES
337         } else {
338                 log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
339                 isFull = false
340         }
341
342         // If the volume is full, timestamp it.
343         if isFull {
344                 now := fmt.Sprintf("%d", time.Now().Unix())
345                 os.Symlink(now, fullSymlink)
346         }
347         return
348 }
349
350 // FreeDiskSpace(volume)
351 //     Returns the amount of available disk space on VOLUME,
352 //     as a number of 1k blocks.
353 //
354 func FreeDiskSpace(volume string) (free uint64, err error) {
355         var fs syscall.Statfs_t
356         err = syscall.Statfs(volume, &fs)
357         if err == nil {
358                 // Statfs output is not guaranteed to measure free
359                 // space in terms of 1K blocks.
360                 free = fs.Bavail * uint64(fs.Bsize) / 1024
361         }
362
363         return
364 }