2762: Fix wrong class used in test case.
[arvados.git] / services / keep / src / keep / keep.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/md5"
7         "encoding/json"
8         "errors"
9         "flag"
10         "fmt"
11         "github.com/gorilla/mux"
12         "io"
13         "io/ioutil"
14         "log"
15         "net/http"
16         "os"
17         "regexp"
18         "strings"
19         "syscall"
20 )
21
22 // ======================
23 // Configuration settings
24 //
25 // TODO(twp): make all of these configurable via command line flags
26 // and/or configuration file settings.
27
28 // Default TCP address on which to listen for requests.
29 const DEFAULT_ADDR = ":25107"
30
31 // A Keep "block" is 64MB.
32 const BLOCKSIZE = 64 * 1024 * 1024
33
34 // A Keep volume must have at least MIN_FREE_KILOBYTES available
35 // in order to permit writes.
36 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
37
38 var PROC_MOUNTS = "/proc/mounts"
39
40 // The Keep VolumeManager maintains a list of available volumes.
41 var KeepVM VolumeManager
42
43 // ==========
44 // Error types.
45 //
46 type KeepError struct {
47         HTTPCode int
48         ErrMsg   string
49 }
50
51 var (
52         CollisionError = &KeepError{400, "Collision"}
53         MD5Error       = &KeepError{401, "MD5 Failure"}
54         CorruptError   = &KeepError{402, "Corruption"}
55         NotFoundError  = &KeepError{404, "Not Found"}
56         GenericError   = &KeepError{500, "Fail"}
57         FullError      = &KeepError{503, "Full"}
58         TooLongError   = &KeepError{504, "Too Long"}
59 )
60
61 func (e *KeepError) Error() string {
62         return e.ErrMsg
63 }
64
65 // This error is returned by ReadAtMost if the available
66 // data exceeds BLOCKSIZE bytes.
67 var ReadErrorTooLong = errors.New("Too long")
68
69 func main() {
70         // Parse command-line flags:
71         //
72         // -listen=ipaddr:port
73         //    Interface on which to listen for requests. Use :port without
74         //    an ipaddr to listen on all network interfaces.
75         //    Examples:
76         //      -listen=127.0.0.1:4949
77         //      -listen=10.0.1.24:8000
78         //      -listen=:25107 (to listen to port 25107 on all interfaces)
79         //
80         // -volumes
81         //    A comma-separated list of directories to use as Keep volumes.
82         //    Example:
83         //      -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
84         //
85         //    If -volumes is empty or is not present, Keep will select volumes
86         //    by looking at currently mounted filesystems for /keep top-level
87         //    directories.
88
89         var listen, volumearg string
90         var serialize_io bool
91         flag.StringVar(&listen, "listen", DEFAULT_ADDR,
92                 "interface on which to listen for requests, in the format ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port to listen on all network interfaces.")
93         flag.StringVar(&volumearg, "volumes", "",
94                 "Comma-separated list of directories to use for Keep volumes, e.g. -volumes=/var/keep1,/var/keep2. If empty or not supplied, Keep will scan mounted filesystems for volumes with a /keep top-level directory.")
95         flag.BoolVar(&serialize_io, "serialize", false,
96                 "If set, all read and write operations on local Keep volumes will be serialized.")
97         flag.Parse()
98
99         // Look for local keep volumes.
100         var keepvols []string
101         if volumearg == "" {
102                 // TODO(twp): decide whether this is desirable default behavior.
103                 // In production we may want to require the admin to specify
104                 // Keep volumes explicitly.
105                 keepvols = FindKeepVolumes()
106         } else {
107                 keepvols = strings.Split(volumearg, ",")
108         }
109
110         // Check that the specified volumes actually exist.
111         var goodvols []Volume = nil
112         for _, v := range keepvols {
113                 if _, err := os.Stat(v); err == nil {
114                         log.Println("adding Keep volume:", v)
115                         newvol := MakeUnixVolume(v, serialize_io)
116                         goodvols = append(goodvols, &newvol)
117                 } else {
118                         log.Printf("bad Keep volume: %s\n", err)
119                 }
120         }
121
122         if len(goodvols) == 0 {
123                 log.Fatal("could not find any keep volumes")
124         }
125
126         // Start a round-robin VolumeManager with the volumes we have found.
127         KeepVM = MakeRRVolumeManager(goodvols)
128
129         // Set up REST handlers.
130         //
131         // Start with a router that will route each URL path to an
132         // appropriate handler.
133         //
134         rest := mux.NewRouter()
135         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
136         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
137         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
138         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
139         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
140
141         // Tell the built-in HTTP server to direct all requests to the REST
142         // router.
143         http.Handle("/", rest)
144
145         // Start listening for requests.
146         http.ListenAndServe(listen, nil)
147 }
148
149 // FindKeepVolumes
150 //     Returns a list of Keep volumes mounted on this system.
151 //
152 //     A Keep volume is a normal or tmpfs volume with a /keep
153 //     directory at the top level of the mount point.
154 //
155 func FindKeepVolumes() []string {
156         vols := make([]string, 0)
157
158         if f, err := os.Open(PROC_MOUNTS); err != nil {
159                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
160         } else {
161                 scanner := bufio.NewScanner(f)
162                 for scanner.Scan() {
163                         args := strings.Fields(scanner.Text())
164                         dev, mount := args[0], args[1]
165                         if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
166                                 keep := mount + "/keep"
167                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
168                                         vols = append(vols, keep)
169                                 }
170                         }
171                 }
172                 if err := scanner.Err(); err != nil {
173                         log.Fatal(err)
174                 }
175         }
176         return vols
177 }
178
179 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
180         hash := mux.Vars(req)["hash"]
181
182         block, err := GetBlock(hash)
183         if err != nil {
184                 http.Error(w, err.Error(), 404)
185                 return
186         }
187
188         _, err = w.Write(block)
189         if err != nil {
190                 log.Printf("GetBlockHandler: writing response: %s", err)
191         }
192
193         return
194 }
195
196 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
197         hash := mux.Vars(req)["hash"]
198
199         // Read the block data to be stored.
200         // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
201         //
202         // Note: because req.Body is a buffered Reader, each Read() call will
203         // collect only the data in the network buffer (typically 16384 bytes),
204         // even if it is passed a much larger slice.
205         //
206         // Instead, call ReadAtMost to read data from the socket
207         // repeatedly until either EOF or BLOCKSIZE bytes have been read.
208         //
209         if buf, err := ReadAtMost(req.Body, BLOCKSIZE); err == nil {
210                 if err := PutBlock(buf, hash); err == nil {
211                         w.WriteHeader(http.StatusOK)
212                 } else {
213                         ke := err.(*KeepError)
214                         http.Error(w, ke.Error(), ke.HTTPCode)
215                 }
216         } else {
217                 log.Println("error reading request: ", err)
218                 errmsg := err.Error()
219                 if err == ReadErrorTooLong {
220                         // Use a more descriptive error message that includes
221                         // the maximum request size.
222                         errmsg = fmt.Sprintf("Max request size %d bytes", BLOCKSIZE)
223                 }
224                 http.Error(w, errmsg, 500)
225         }
226 }
227
228 // IndexHandler
229 //     A HandleFunc to address /index and /index/{prefix} requests.
230 //
231 func IndexHandler(w http.ResponseWriter, req *http.Request) {
232         prefix := mux.Vars(req)["prefix"]
233
234         var index string
235         for _, vol := range KeepVM.Volumes() {
236                 index = index + vol.Index(prefix)
237         }
238         w.Write([]byte(index))
239 }
240
241 // StatusHandler
242 //     Responds to /status.json requests with the current node status,
243 //     described in a JSON structure.
244 //
245 //     The data given in a status.json response includes:
246 //        volumes - a list of Keep volumes currently in use by this server
247 //          each volume is an object with the following fields:
248 //            * mount_point
249 //            * device_num (an integer identifying the underlying filesystem)
250 //            * bytes_free
251 //            * bytes_used
252 //
253 type VolumeStatus struct {
254         MountPoint string `json:"mount_point"`
255         DeviceNum  uint64 `json:"device_num"`
256         BytesFree  uint64 `json:"bytes_free"`
257         BytesUsed  uint64 `json:"bytes_used"`
258 }
259
260 type NodeStatus struct {
261         Volumes []*VolumeStatus `json:"volumes"`
262 }
263
264 func StatusHandler(w http.ResponseWriter, req *http.Request) {
265         st := GetNodeStatus()
266         if jstat, err := json.Marshal(st); err == nil {
267                 w.Write(jstat)
268         } else {
269                 log.Printf("json.Marshal: %s\n", err)
270                 log.Printf("NodeStatus = %v\n", st)
271                 http.Error(w, err.Error(), 500)
272         }
273 }
274
275 // GetNodeStatus
276 //     Returns a NodeStatus struct describing this Keep
277 //     node's current status.
278 //
279 func GetNodeStatus() *NodeStatus {
280         st := new(NodeStatus)
281
282         st.Volumes = make([]*VolumeStatus, len(KeepVM.Volumes()))
283         for i, vol := range KeepVM.Volumes() {
284                 st.Volumes[i] = vol.Status()
285         }
286         return st
287 }
288
289 // GetVolumeStatus
290 //     Returns a VolumeStatus describing the requested volume.
291 //
292 func GetVolumeStatus(volume string) *VolumeStatus {
293         var fs syscall.Statfs_t
294         var devnum uint64
295
296         if fi, err := os.Stat(volume); err == nil {
297                 devnum = fi.Sys().(*syscall.Stat_t).Dev
298         } else {
299                 log.Printf("GetVolumeStatus: os.Stat: %s\n", err)
300                 return nil
301         }
302
303         err := syscall.Statfs(volume, &fs)
304         if err != nil {
305                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
306                 return nil
307         }
308         // These calculations match the way df calculates disk usage:
309         // "free" space is measured by fs.Bavail, but "used" space
310         // uses fs.Blocks - fs.Bfree.
311         free := fs.Bavail * uint64(fs.Bsize)
312         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
313         return &VolumeStatus{volume, devnum, free, used}
314 }
315
316 func GetBlock(hash string) ([]byte, error) {
317         // Attempt to read the requested hash from a keep volume.
318         for _, vol := range KeepVM.Volumes() {
319                 if buf, err := vol.Get(hash); err != nil {
320                         // IsNotExist is an expected error and may be ignored.
321                         // (If all volumes report IsNotExist, we return a NotFoundError)
322                         // A CorruptError should be returned immediately.
323                         // Any other errors should be logged but we continue trying to
324                         // read.
325                         switch {
326                         case os.IsNotExist(err):
327                                 continue
328                         default:
329                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
330                         }
331                 } else {
332                         // Double check the file checksum.
333                         //
334                         filehash := fmt.Sprintf("%x", md5.Sum(buf))
335                         if filehash != hash {
336                                 // TODO(twp): this condition probably represents a bad disk and
337                                 // should raise major alarm bells for an administrator: e.g.
338                                 // they should be sent directly to an event manager at high
339                                 // priority or logged as urgent problems.
340                                 //
341                                 log.Printf("%s: checksum mismatch for request %s (actual hash %s)\n",
342                                         vol, hash, filehash)
343                                 return buf, CorruptError
344                         }
345                         // Success!
346                         return buf, nil
347                 }
348         }
349
350         log.Printf("%s: not found on any volumes, giving up\n", hash)
351         return nil, NotFoundError
352 }
353
354 /* PutBlock(block, hash)
355    Stores the BLOCK (identified by the content id HASH) in Keep.
356
357    The MD5 checksum of the block must be identical to the content id HASH.
358    If not, an error is returned.
359
360    PutBlock stores the BLOCK on the first Keep volume with free space.
361    A failure code is returned to the user only if all volumes fail.
362
363    On success, PutBlock returns nil.
364    On failure, it returns a KeepError with one of the following codes:
365
366    400 Collision
367           A different block with the same hash already exists on this
368           Keep server.
369    401 MD5Fail
370           The MD5 hash of the BLOCK does not match the argument HASH.
371    503 Full
372           There was not enough space left in any Keep volume to store
373           the object.
374    500 Fail
375           The object could not be stored for some other reason (e.g.
376           all writes failed). The text of the error message should
377           provide as much detail as possible.
378 */
379
380 func PutBlock(block []byte, hash string) error {
381         // Check that BLOCK's checksum matches HASH.
382         blockhash := fmt.Sprintf("%x", md5.Sum(block))
383         if blockhash != hash {
384                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
385                 return MD5Error
386         }
387
388         // If we already have a block on disk under this identifier, return
389         // success (but check for MD5 collisions).
390         // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
391         // In either case, we want to write our new (good) block to disk, so there is
392         // nothing special to do if err != nil.
393         if oldblock, err := GetBlock(hash); err == nil {
394                 if bytes.Compare(block, oldblock) == 0 {
395                         return nil
396                 } else {
397                         return CollisionError
398                 }
399         }
400
401         // Choose a Keep volume to write to.
402         // If this volume fails, try all of the volumes in order.
403         vol := KeepVM.Choose()
404         if err := vol.Put(hash, block); err == nil {
405                 return nil // success!
406         } else {
407                 allFull := true
408                 for _, vol := range KeepVM.Volumes() {
409                         err := vol.Put(hash, block)
410                         if err == nil {
411                                 return nil // success!
412                         }
413                         if err != FullError {
414                                 // The volume is not full but the write did not succeed.
415                                 // Report the error and continue trying.
416                                 allFull = false
417                                 log.Printf("%s: Write(%s): %s\n", vol, hash, err)
418                         }
419                 }
420
421                 if allFull {
422                         log.Printf("all Keep volumes full")
423                         return FullError
424                 } else {
425                         log.Printf("all Keep volumes failed")
426                         return GenericError
427                 }
428         }
429 }
430
431 // ReadAtMost
432 //     Reads bytes repeatedly from an io.Reader until either
433 //     encountering EOF, or the maxbytes byte limit has been reached.
434 //     Returns a byte slice of the bytes that were read.
435 //
436 //     If the reader contains more than maxbytes, returns a nil slice
437 //     and an error.
438 //
439 func ReadAtMost(r io.Reader, maxbytes int) ([]byte, error) {
440         // Attempt to read one more byte than maxbytes.
441         lr := io.LimitReader(r, int64(maxbytes+1))
442         buf, err := ioutil.ReadAll(lr)
443         if len(buf) > maxbytes {
444                 return nil, ReadErrorTooLong
445         }
446         return buf, err
447 }
448
449 // IsValidLocator
450 //     Return true if the specified string is a valid Keep locator.
451 //     When Keep is extended to support hash types other than MD5,
452 //     this should be updated to cover those as well.
453 //
454 func IsValidLocator(loc string) bool {
455         match, err := regexp.MatchString(`^[0-9a-f]{32}$`, loc)
456         if err == nil {
457                 return match
458         }
459         log.Printf("IsValidLocator: %s\n", err)
460         return false
461 }