Added rudimentary GetNodeStatus test. (refs #2561)
[arvados.git] / services / keep / keep.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/md5"
7         "encoding/json"
8         "errors"
9         "flag"
10         "fmt"
11         "github.com/gorilla/mux"
12         "io"
13         "io/ioutil"
14         "log"
15         "net/http"
16         "os"
17         "path/filepath"
18         "strconv"
19         "strings"
20         "syscall"
21         "time"
22 )
23
24 // ======================
25 // Configuration settings
26 //
27 // TODO(twp): make all of these configurable via command line flags
28 // and/or configuration file settings.
29
30 // Default TCP address on which to listen for requests.
31 const DEFAULT_ADDR = ":25107"
32
33 // A Keep "block" is 64MB.
34 const BLOCKSIZE = 64 * 1024 * 1024
35
36 // A Keep volume must have at least MIN_FREE_KILOBYTES available
37 // in order to permit writes.
38 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
39
40 var PROC_MOUNTS = "/proc/mounts"
41
42 var KeepVolumes []string
43
44 // ==========
45 // Error types.
46 //
47 type KeepError struct {
48         HTTPCode int
49         ErrMsg   string
50 }
51
52 var (
53         CollisionError = &KeepError{400, "Collision"}
54         MD5Error       = &KeepError{401, "MD5 Failure"}
55         CorruptError   = &KeepError{402, "Corruption"}
56         NotFoundError  = &KeepError{404, "Not Found"}
57         GenericError   = &KeepError{500, "Fail"}
58         FullError      = &KeepError{503, "Full"}
59         TooLongError   = &KeepError{504, "Too Long"}
60 )
61
62 func (e *KeepError) Error() string {
63         return e.ErrMsg
64 }
65
66 // This error is returned by ReadAtMost if the available
67 // data exceeds BLOCKSIZE bytes.
68 var ReadErrorTooLong = errors.New("Too long")
69
70 func main() {
71         // Parse command-line flags:
72         //
73         // -listen=ipaddr:port
74         //    Interface on which to listen for requests. Use :port without
75         //    an ipaddr to listen on all network interfaces.
76         //    Examples:
77         //      -listen=127.0.0.1:4949
78         //      -listen=10.0.1.24:8000
79         //      -listen=:25107 (to listen to port 25107 on all interfaces)
80         //
81         // -volumes
82         //    A comma-separated list of directories to use as Keep volumes.
83         //    Example:
84         //      -volumes=/var/keep01,/var/keep02,/var/keep03/subdir
85         //
86         //    If -volumes is empty or is not present, Keep will select volumes
87         //    by looking at currently mounted filesystems for /keep top-level
88         //    directories.
89
90         var listen, keepvols string
91         flag.StringVar(&listen, "listen", DEFAULT_ADDR,
92                 "interface on which to listen for requests, in the format ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port to listen on all network interfaces.")
93         flag.StringVar(&keepvols, "volumes", "",
94                 "Comma-separated list of directories to use for Keep volumes, e.g. -volumes=/var/keep1,/var/keep2. If empty or not supplied, Keep will scan mounted filesystems for volumes with a /keep top-level directory.")
95         flag.Parse()
96
97         // Look for local keep volumes.
98         if keepvols == "" {
99                 // TODO(twp): decide whether this is desirable default behavior.
100                 // In production we may want to require the admin to specify
101                 // Keep volumes explicitly.
102                 KeepVolumes = FindKeepVolumes()
103         } else {
104                 KeepVolumes = strings.Split(keepvols, ",")
105         }
106
107         if len(KeepVolumes) == 0 {
108                 log.Fatal("could not find any keep volumes")
109         }
110         for _, v := range KeepVolumes {
111                 log.Println("keep volume:", v)
112         }
113
114         // Set up REST handlers.
115         //
116         // Start with a router that will route each URL path to an
117         // appropriate handler.
118         //
119         rest := mux.NewRouter()
120         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
121         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
122         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
123         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
124         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
125
126         // Tell the built-in HTTP server to direct all requests to the REST
127         // router.
128         http.Handle("/", rest)
129
130         // Start listening for requests.
131         http.ListenAndServe(listen, nil)
132 }
133
134 // FindKeepVolumes
135 //     Returns a list of Keep volumes mounted on this system.
136 //
137 //     A Keep volume is a normal or tmpfs volume with a /keep
138 //     directory at the top level of the mount point.
139 //
140 func FindKeepVolumes() []string {
141         vols := make([]string, 0)
142
143         if f, err := os.Open(PROC_MOUNTS); err != nil {
144                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
145         } else {
146                 scanner := bufio.NewScanner(f)
147                 for scanner.Scan() {
148                         args := strings.Fields(scanner.Text())
149                         dev, mount := args[0], args[1]
150                         if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
151                                 keep := mount + "/keep"
152                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
153                                         vols = append(vols, keep)
154                                 }
155                         }
156                 }
157                 if err := scanner.Err(); err != nil {
158                         log.Fatal(err)
159                 }
160         }
161         return vols
162 }
163
164 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
165         hash := mux.Vars(req)["hash"]
166
167         block, err := GetBlock(hash)
168         if err != nil {
169                 http.Error(w, err.Error(), 404)
170                 return
171         }
172
173         _, err = w.Write(block)
174         if err != nil {
175                 log.Printf("GetBlockHandler: writing response: %s", err)
176         }
177
178         return
179 }
180
181 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
182         hash := mux.Vars(req)["hash"]
183
184         // Read the block data to be stored.
185         // If the request exceeds BLOCKSIZE bytes, issue a HTTP 500 error.
186         //
187         // Note: because req.Body is a buffered Reader, each Read() call will
188         // collect only the data in the network buffer (typically 16384 bytes),
189         // even if it is passed a much larger slice.
190         //
191         // Instead, call ReadAtMost to read data from the socket
192         // repeatedly until either EOF or BLOCKSIZE bytes have been read.
193         //
194         if buf, err := ReadAtMost(req.Body, BLOCKSIZE); err == nil {
195                 if err := PutBlock(buf, hash); err == nil {
196                         w.WriteHeader(http.StatusOK)
197                 } else {
198                         ke := err.(*KeepError)
199                         http.Error(w, ke.Error(), ke.HTTPCode)
200                 }
201         } else {
202                 log.Println("error reading request: ", err)
203                 errmsg := err.Error()
204                 if err == ReadErrorTooLong {
205                         // Use a more descriptive error message that includes
206                         // the maximum request size.
207                         errmsg = fmt.Sprintf("Max request size %d bytes", BLOCKSIZE)
208                 }
209                 http.Error(w, errmsg, 500)
210         }
211 }
212
213 // IndexHandler
214 //     A HandleFunc to address /index and /index/{prefix} requests.
215 //
216 func IndexHandler(w http.ResponseWriter, req *http.Request) {
217         prefix := mux.Vars(req)["prefix"]
218
219         index := IndexLocators(prefix)
220         w.Write([]byte(index))
221 }
222
223 // StatusHandler
224 //     Responds to /status.json requests with the current node status,
225 //     described in a JSON structure.
226 //
227 //     The data given in a status.json response includes:
228 //        time - the time the status was last updated
229 //        df   - the output of the most recent `df --block-size=1k`
230 //        disk_devices - list of disk device files (i.e. /dev/(s|xv|h)d)
231 //        dirs - an object describing Keep volumes, keyed on Keep volume dirs,
232 //          each value is an object describing the status of that volume
233 //            * status ("full [timestamp]" or "ok [timestamp]")
234 //            * last_error
235 //            * last_error_time
236 //
237 type VolumeStatus struct {
238         MountPoint string `json:"mount_point"`
239         BytesFree  uint64 `json:"bytes_free"`
240         BytesUsed  uint64 `json:"bytes_used"`
241 }
242
243 type NodeStatus struct {
244         Volumes []*VolumeStatus `json:"volumes"`
245 }
246
247 func StatusHandler(w http.ResponseWriter, req *http.Request) {
248         st := GetNodeStatus()
249         if jstat, err := json.Marshal(st); err == nil {
250                 w.Write(jstat)
251         } else {
252                 log.Printf("json.Marshal: %s\n", err)
253                 log.Printf("NodeStatus = %v\n", st)
254                 http.Error(w, err.Error(), 500)
255         }
256 }
257
258 // GetNodeStatus
259 //     Returns a NodeStatus struct describing this Keep
260 //     node's current status.
261 //
262 func GetNodeStatus() *NodeStatus {
263         st := new(NodeStatus)
264
265         st.Volumes = make([]*VolumeStatus, len(KeepVolumes))
266         for i, vol := range KeepVolumes {
267                 st.Volumes[i] = GetVolumeStatus(vol)
268         }
269         return st
270 }
271
272 // GetVolumeStatus
273 //     Returns a VolumeStatus describing the requested volume.
274 //
275 func GetVolumeStatus(volume string) *VolumeStatus {
276         var fs syscall.Statfs_t
277
278         err := syscall.Statfs(volume, &fs)
279         if err != nil {
280                 log.Printf("GetVolumeStatus: statfs: %s\n", err)
281                 return nil
282         }
283         // These calculations match the way df calculates disk usage:
284         // "free" space is measured by fs.Bavail, but "used" space
285         // uses fs.Blocks - fs.Bfree.
286         free := fs.Bavail * uint64(fs.Bsize)
287         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
288         return &VolumeStatus{volume, free, used}
289 }
290
291 // IndexLocators
292 //     Returns a string containing a list of locator ids found on this
293 //     Keep server.  If {prefix} is given, return only those locator
294 //     ids that begin with the given prefix string.
295 //
296 //     The return string consists of a sequence of newline-separated
297 //     strings in the format
298 //
299 //         locator+size modification-time
300 //
301 //     e.g.:
302 //
303 //         e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
304 //         e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
305 //         e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
306 //
307 func IndexLocators(prefix string) string {
308         var output string
309         for _, vol := range KeepVolumes {
310                 filepath.Walk(vol,
311                         func(path string, info os.FileInfo, err error) error {
312                                 // This WalkFunc inspects each path in the volume
313                                 // and prints an index line for all files that begin
314                                 // with prefix.
315                                 if err != nil {
316                                         log.Printf("IndexHandler: %s: walking to %s: %s",
317                                                 vol, path, err)
318                                         return nil
319                                 }
320                                 locator := filepath.Base(path)
321                                 // Skip directories that do not match prefix.
322                                 // We know there is nothing interesting inside.
323                                 if info.IsDir() &&
324                                         !strings.HasPrefix(locator, prefix) &&
325                                         !strings.HasPrefix(prefix, locator) {
326                                         return filepath.SkipDir
327                                 }
328                                 // Print filenames beginning with prefix
329                                 if !info.IsDir() && strings.HasPrefix(locator, prefix) {
330                                         output = output + fmt.Sprintf(
331                                                 "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
332                                 }
333                                 return nil
334                         })
335         }
336
337         return output
338 }
339
340 func GetBlock(hash string) ([]byte, error) {
341         var buf = make([]byte, BLOCKSIZE)
342
343         // Attempt to read the requested hash from a keep volume.
344         for _, vol := range KeepVolumes {
345                 var f *os.File
346                 var err error
347                 var nread int
348
349                 blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
350
351                 f, err = os.Open(blockFilename)
352                 if err != nil {
353                         if !os.IsNotExist(err) {
354                                 // A block is stored on only one Keep disk,
355                                 // so os.IsNotExist is expected.  Report any other errors.
356                                 log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
357                         }
358                         continue
359                 }
360
361                 nread, err = f.Read(buf)
362                 if err != nil {
363                         log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
364                         continue
365                 }
366
367                 // Double check the file checksum.
368                 //
369                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
370                 if filehash != hash {
371                         // TODO(twp): this condition probably represents a bad disk and
372                         // should raise major alarm bells for an administrator: e.g.
373                         // they should be sent directly to an event manager at high
374                         // priority or logged as urgent problems.
375                         //
376                         log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
377                                 vol, blockFilename, filehash)
378                         return buf, CorruptError
379                 }
380
381                 // Success!
382                 return buf[:nread], nil
383         }
384
385         log.Printf("%s: not found on any volumes, giving up\n", hash)
386         return buf, NotFoundError
387 }
388
389 /* PutBlock(block, hash)
390    Stores the BLOCK (identified by the content id HASH) in Keep.
391
392    The MD5 checksum of the block must be identical to the content id HASH.
393    If not, an error is returned.
394
395    PutBlock stores the BLOCK on the first Keep volume with free space.
396    A failure code is returned to the user only if all volumes fail.
397
398    On success, PutBlock returns nil.
399    On failure, it returns a KeepError with one of the following codes:
400
401    400 Collision
402           A different block with the same hash already exists on this
403           Keep server.
404    401 MD5Fail
405           The MD5 hash of the BLOCK does not match the argument HASH.
406    503 Full
407           There was not enough space left in any Keep volume to store
408           the object.
409    500 Fail
410           The object could not be stored for some other reason (e.g.
411           all writes failed). The text of the error message should
412           provide as much detail as possible.
413 */
414
415 func PutBlock(block []byte, hash string) error {
416         // Check that BLOCK's checksum matches HASH.
417         blockhash := fmt.Sprintf("%x", md5.Sum(block))
418         if blockhash != hash {
419                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
420                 return MD5Error
421         }
422
423         // If we already have a block on disk under this identifier, return
424         // success (but check for MD5 collisions).
425         // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
426         // In either case, we want to write our new (good) block to disk, so there is
427         // nothing special to do if err != nil.
428         if oldblock, err := GetBlock(hash); err == nil {
429                 if bytes.Compare(block, oldblock) == 0 {
430                         return nil
431                 } else {
432                         return CollisionError
433                 }
434         }
435
436         // Store the block on the first available Keep volume.
437         allFull := true
438         for _, vol := range KeepVolumes {
439                 if IsFull(vol) {
440                         continue
441                 }
442                 allFull = false
443                 blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
444                 if err := os.MkdirAll(blockDir, 0755); err != nil {
445                         log.Printf("%s: could not create directory %s: %s",
446                                 hash, blockDir, err)
447                         continue
448                 }
449
450                 tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash)
451                 if tmperr != nil {
452                         log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr)
453                         continue
454                 }
455                 blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
456
457                 if _, err := tmpfile.Write(block); err != nil {
458                         log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
459                         continue
460                 }
461                 if err := tmpfile.Close(); err != nil {
462                         log.Printf("closing %s: %s\n", tmpfile.Name(), err)
463                         os.Remove(tmpfile.Name())
464                         continue
465                 }
466                 if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
467                         log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
468                         os.Remove(tmpfile.Name())
469                         continue
470                 }
471                 return nil
472         }
473
474         if allFull {
475                 log.Printf("all Keep volumes full")
476                 return FullError
477         } else {
478                 log.Printf("all Keep volumes failed")
479                 return GenericError
480         }
481 }
482
483 func IsFull(volume string) (isFull bool) {
484         fullSymlink := volume + "/full"
485
486         // Check if the volume has been marked as full in the last hour.
487         if link, err := os.Readlink(fullSymlink); err == nil {
488                 if ts, err := strconv.Atoi(link); err == nil {
489                         fulltime := time.Unix(int64(ts), 0)
490                         if time.Since(fulltime).Hours() < 1.0 {
491                                 return true
492                         }
493                 }
494         }
495
496         if avail, err := FreeDiskSpace(volume); err == nil {
497                 isFull = avail < MIN_FREE_KILOBYTES
498         } else {
499                 log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
500                 isFull = false
501         }
502
503         // If the volume is full, timestamp it.
504         if isFull {
505                 now := fmt.Sprintf("%d", time.Now().Unix())
506                 os.Symlink(now, fullSymlink)
507         }
508         return
509 }
510
511 // FreeDiskSpace(volume)
512 //     Returns the amount of available disk space on VOLUME,
513 //     as a number of 1k blocks.
514 //
515 //     TODO(twp): consider integrating this better with
516 //     VolumeStatus (e.g. keep a NodeStatus object up-to-date
517 //     periodically and use it as the source of info)
518 //
519 func FreeDiskSpace(volume string) (free uint64, err error) {
520         var fs syscall.Statfs_t
521         err = syscall.Statfs(volume, &fs)
522         if err == nil {
523                 // Statfs output is not guaranteed to measure free
524                 // space in terms of 1K blocks.
525                 free = fs.Bavail * uint64(fs.Bsize) / 1024
526         }
527         return
528 }
529
530 // ReadAtMost
531 //     Reads bytes repeatedly from an io.Reader until either
532 //     encountering EOF, or the maxbytes byte limit has been reached.
533 //     Returns a byte slice of the bytes that were read.
534 //
535 //     If the reader contains more than maxbytes, returns a nil slice
536 //     and an error.
537 //
538 func ReadAtMost(r io.Reader, maxbytes int) ([]byte, error) {
539         // Attempt to read one more byte than maxbytes.
540         lr := io.LimitReader(r, int64(maxbytes+1))
541         buf, err := ioutil.ReadAll(lr)
542         if len(buf) > maxbytes {
543                 return nil, ReadErrorTooLong
544         }
545         return buf, err
546 }