Made KeepVolumes a slice of Volume objects, not strings.
[arvados.git] / services / keep / src / keep / keep.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/md5"
7         "encoding/json"
8         "errors"
9         "flag"
10         "fmt"
11         "github.com/gorilla/mux"
12         "io"
13         "io/ioutil"
14         "log"
15         "net/http"
16         "os"
17         "regexp"
18         "strings"
19         "syscall"
20 )
21
22 // ======================
23 // Configuration settings
24 //
25 // TODO(twp): make all of these configurable via command line flags
26 // and/or configuration file settings.
27
28 // Default TCP address on which to listen for requests.
29 const DEFAULT_ADDR = ":25107"
30
31 // A Keep "block" is 64MB.
32 const BLOCKSIZE = 64 * 1024 * 1024
33
34 // A Keep volume must have at least MIN_FREE_KILOBYTES available
35 // in order to permit writes.
36 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
37
38 var PROC_MOUNTS = "/proc/mounts"
39
40 // KeepVolumes is a slice of volumes on which blocks can be stored.
41 var KeepVolumes []Volume
42
43 // ==========
44 // Error types.
45 //
46 type KeepError struct {
47         HTTPCode int
48         ErrMsg   string
49 }
50
51 var (
52         CollisionError = &KeepError{400, "Collision"}
53         MD5Error       = &KeepError{401, "MD5 Failure"}
54         CorruptError   = &KeepError{402, "Corruption"}
55         NotFoundError  = &KeepError{404, "Not Found"}
56         GenericError   = &KeepError{500, "Fail"}
57         FullError      = &KeepError{503, "Full"}
58         TooLongError   = &KeepError{504, "Too Long"}
59 )
60
61 func (e *KeepError) Error() string {
62         return e.ErrMsg
63 }
64
65 // This error is returned by ReadAtMost if the available
66 // data exceeds BLOCKSIZE bytes.
67 var ReadErrorTooLong = errors.New("Too long")
68
69 func main() {
70         // Parse command-line flags:
71         //
72         // -listen=ipaddr:port
73         //    Interface on which to listen for requests. Use :port without
74         //    an ipaddr to listen on all network interfaces.
75         //    Examples:
76         //      -listen=127.0.0.1:4949
77         //      -listen=10.0.1.24:8000
78         //      -listen=:25107 (to listen to port 25107 on all interfaces)
79         //
80         // -volumes
81         //    A comma-separated list of directories to use as Keep volumes.
82         //    Example:
83         //      -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
84         //
85         //    If -volumes is empty or is not present, Keep will select volumes
86         //    by looking at currently mounted filesystems for /keep top-level
87         //    directories.
88
89         var listen, volumearg string
90         flag.StringVar(&listen, "listen", DEFAULT_ADDR,
91                 "interface on which to listen for requests, in the format ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port to listen on all network interfaces.")
92         flag.StringVar(&volumearg, "volumes", "",
93                 "Comma-separated list of directories to use for Keep volumes, e.g. -volumes=/var/keep1,/var/keep2. If empty or not supplied, Keep will scan mounted filesystems for volumes with a /keep top-level directory.")
94         flag.Parse()
95
96         // Look for local keep volumes.
97         var keepvols []string
98         if volumearg == "" {
99                 // TODO(twp): decide whether this is desirable default behavior.
100                 // In production we may want to require the admin to specify
101                 // Keep volumes explicitly.
102                 keepvols = FindKeepVolumes()
103         } else {
104                 keepvols = strings.Split(volumearg, ",")
105         }
106
107         // Check that the specified volumes actually exist.
108         KeepVolumes = []Volume(nil)
109         for _, v := range keepvols {
110                 if _, err := os.Stat(v); err == nil {
111                         log.Println("adding Keep volume:", v)
112                         KeepVolumes = append(KeepVolumes, &UnixVolume{v})
113                 } else {
114                         log.Printf("bad Keep volume: %s\n", err)
115                 }
116         }
117
118         if len(KeepVolumes) == 0 {
119                 log.Fatal("could not find any keep volumes")
120         }
121
122         // Set up REST handlers.
123         //
124         // Start with a router that will route each URL path to an
125         // appropriate handler.
126         //
127         rest := mux.NewRouter()
128         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
129         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
130         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
131         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
132         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
133
134         // Tell the built-in HTTP server to direct all requests to the REST
135         // router.
136         http.Handle("/", rest)
137
138         // Start listening for requests.
139         http.ListenAndServe(listen, nil)
140 }
141
142 // FindKeepVolumes
143 //     Returns a list of Keep volumes mounted on this system.
144 //
145 //     A Keep volume is a normal or tmpfs volume with a /keep
146 //     directory at the top level of the mount point.
147 //
148 func FindKeepVolumes() []string {
149         vols := make([]string, 0)
150
151         if f, err := os.Open(PROC_MOUNTS); err != nil {
152                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
153         } else {
154                 scanner := bufio.NewScanner(f)
155                 for scanner.Scan() {
156                         args := strings.Fields(scanner.Text())
157                         dev, mount := args[0], args[1]
158                         if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
159                                 keep := mount + "/keep"
160                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
161                                         vols = append(vols, keep)
162                                 }
163                         }
164                 }
165                 if err := scanner.Err(); err != nil {
166                         log.Fatal(err)
167                 }
168         }
169         return vols
170 }
171
172 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
173         hash := mux.Vars(req)["hash"]
174
175         block, err := GetBlock(hash)
176         if err != nil {
177                 http.Error(w, err.Error(), 404)
178                 return
179         }
180
181         _, err = w.Write(block)
182         if err != nil {
183                 log.Printf("GetBlockHandler: writing response: %s", err)
184         }
185
186         return
187 }
188
189 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
190         hash := mux.Vars(req)["hash"]
191
192         // Read the block data to be stored.
193         // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
194         //
195         // Note: because req.Body is a buffered Reader, each Read() call will
196         // collect only the data in the network buffer (typically 16384 bytes),
197         // even if it is passed a much larger slice.
198         //
199         // Instead, call ReadAtMost to read data from the socket
200         // repeatedly until either EOF or BLOCKSIZE bytes have been read.
201         //
202         if buf, err := ReadAtMost(req.Body, BLOCKSIZE); err == nil {
203                 if err := PutBlock(buf, hash); err == nil {
204                         w.WriteHeader(http.StatusOK)
205                 } else {
206                         ke := err.(*KeepError)
207                         http.Error(w, ke.Error(), ke.HTTPCode)
208                 }
209         } else {
210                 log.Println("error reading request: ", err)
211                 errmsg := err.Error()
212                 if err == ReadErrorTooLong {
213                         // Use a more descriptive error message that includes
214                         // the maximum request size.
215                         errmsg = fmt.Sprintf("Max request size %d bytes", BLOCKSIZE)
216                 }
217                 http.Error(w, errmsg, 500)
218         }
219 }
220
221 // IndexHandler
222 //     A HandleFunc to address /index and /index/{prefix} requests.
223 //
224 func IndexHandler(w http.ResponseWriter, req *http.Request) {
225         prefix := mux.Vars(req)["prefix"]
226
227         var index string
228         for _, vol := range KeepVolumes {
229                 index = index + vol.Index(prefix)
230         }
231         w.Write([]byte(index))
232 }
233
234 // StatusHandler
235 //     Responds to /status.json requests with the current node status,
236 //     described in a JSON structure.
237 //
238 //     The data given in a status.json response includes:
239 //        volumes - a list of Keep volumes currently in use by this server
240 //          each volume is an object with the following fields:
241 //            * mount_point
242 //            * device_num (an integer identifying the underlying filesystem)
243 //            * bytes_free
244 //            * bytes_used
245 //
246 type VolumeStatus struct {
247         MountPoint string `json:"mount_point"`
248         DeviceNum  uint64 `json:"device_num"`
249         BytesFree  uint64 `json:"bytes_free"`
250         BytesUsed  uint64 `json:"bytes_used"`
251 }
252
253 type NodeStatus struct {
254         Volumes []*VolumeStatus `json:"volumes"`
255 }
256
257 func StatusHandler(w http.ResponseWriter, req *http.Request) {
258         st := GetNodeStatus()
259         if jstat, err := json.Marshal(st); err == nil {
260                 w.Write(jstat)
261         } else {
262                 log.Printf("json.Marshal: %s\n", err)
263                 log.Printf("NodeStatus = %v\n", st)
264                 http.Error(w, err.Error(), 500)
265         }
266 }
267
268 // GetNodeStatus
269 //     Returns a NodeStatus struct describing this Keep
270 //     node's current status.
271 //
272 func GetNodeStatus() *NodeStatus {
273         st := new(NodeStatus)
274
275         st.Volumes = make([]*VolumeStatus, len(KeepVolumes))
276         for i, vol := range KeepVolumes {
277                 st.Volumes[i] = vol.Status()
278         }
279         return st
280 }
281
282 // GetVolumeStatus
283 //     Returns a VolumeStatus describing the requested volume.
284 //
285 func GetVolumeStatus(volume string) *VolumeStatus {
286         var fs syscall.Statfs_t
287         var devnum uint64
288
289         if fi, err := os.Stat(volume); err == nil {
290                 devnum = fi.Sys().(*syscall.Stat_t).Dev
291         } else {
292                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
293                 return nil
294         }
295
296         err := syscall.Statfs(volume, &fs)
297         if err != nil {
298                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
299                 return nil
300         }
301         // These calculations match the way df calculates disk usage:
302         // "free" space is measured by fs.Bavail, but "used" space
303         // uses fs.Blocks - fs.Bfree.
304         free := fs.Bavail * uint64(fs.Bsize)
305         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
306         return &VolumeStatus{volume, devnum, free, used}
307 }
308
309 func GetBlock(hash string) ([]byte, error) {
310         // Attempt to read the requested hash from a keep volume.
311         for _, vol := range KeepVolumes {
312                 if buf, err := vol.Read(hash); err != nil {
313                         // IsNotExist is an expected error and may be ignored.
314                         // (If all volumes report IsNotExist, we return a NotFoundError)
315                         // A CorruptError should be returned immediately.
316                         // Any other errors should be logged but we continue trying to
317                         // read.
318                         switch {
319                         case os.IsNotExist(err):
320                                 continue
321                         case err == CorruptError:
322                                 return nil, err
323                         default:
324                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
325                         }
326                 } else {
327                         // Success!
328                         return buf, nil
329                 }
330         }
331
332         log.Printf("%s: not found on any volumes, giving up\n", hash)
333         return nil, NotFoundError
334 }
335
336 /* PutBlock(block, hash)
337    Stores the BLOCK (identified by the content id HASH) in Keep.
338
339    The MD5 checksum of the block must be identical to the content id HASH.
340    If not, an error is returned.
341
342    PutBlock stores the BLOCK on the first Keep volume with free space.
343    A failure code is returned to the user only if all volumes fail.
344
345    On success, PutBlock returns nil.
346    On failure, it returns a KeepError with one of the following codes:
347
348    400 Collision
349           A different block with the same hash already exists on this
350           Keep server.
351    401 MD5Fail
352           The MD5 hash of the BLOCK does not match the argument HASH.
353    503 Full
354           There was not enough space left in any Keep volume to store
355           the object.
356    500 Fail
357           The object could not be stored for some other reason (e.g.
358           all writes failed). The text of the error message should
359           provide as much detail as possible.
360 */
361
362 func PutBlock(block []byte, hash string) error {
363         // Check that BLOCK's checksum matches HASH.
364         blockhash := fmt.Sprintf("%x", md5.Sum(block))
365         if blockhash != hash {
366                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
367                 return MD5Error
368         }
369
370         // If we already have a block on disk under this identifier, return
371         // success (but check for MD5 collisions).
372         // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
373         // In either case, we want to write our new (good) block to disk, so there is
374         // nothing special to do if err != nil.
375         if oldblock, err := GetBlock(hash); err == nil {
376                 if bytes.Compare(block, oldblock) == 0 {
377                         return nil
378                 } else {
379                         return CollisionError
380                 }
381         }
382
383         // Store the block on the first available Keep volume.
384         allFull := true
385         for _, vol := range KeepVolumes {
386                 err := vol.Write(hash, block)
387                 if err == nil {
388                         return nil // success!
389                 }
390                 if err != FullError {
391                         // The volume is not full but the write did not succeed.
392                         // Report the error and continue trying.
393                         allFull = false
394                         log.Printf("%s: Write(%s): %s\n", vol, hash, err)
395                 }
396         }
397
398         if allFull {
399                 log.Printf("all Keep volumes full")
400                 return FullError
401         } else {
402                 log.Printf("all Keep volumes failed")
403                 return GenericError
404         }
405 }
406
407 // ReadAtMost
408 //     Reads bytes repeatedly from an io.Reader until either
409 //     encountering EOF, or the maxbytes byte limit has been reached.
410 //     Returns a byte slice of the bytes that were read.
411 //
412 //     If the reader contains more than maxbytes, returns a nil slice
413 //     and an error.
414 //
415 func ReadAtMost(r io.Reader, maxbytes int) ([]byte, error) {
416         // Attempt to read one more byte than maxbytes.
417         lr := io.LimitReader(r, int64(maxbytes+1))
418         buf, err := ioutil.ReadAll(lr)
419         if len(buf) > maxbytes {
420                 return nil, ReadErrorTooLong
421         }
422         return buf, err
423 }
424
425 // IsValidLocator
426 //     Return true if the specified string is a valid Keep locator.
427 //     When Keep is extended to support hash types other than MD5,
428 //     this should be updated to cover those as well.
429 //
430 func IsValidLocator(loc string) (bool, error) {
431         return regexp.MatchString(`^[0-9a-f]{32}$`, loc)
432 }