Merge branch '2449-keep-write-blocks' into 2449-keep-index-status-handlers
[arvados.git] / services / keep / keep.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/md5"
7         "errors"
8         "fmt"
9         "github.com/gorilla/mux"
10         "io/ioutil"
11         "log"
12         "net/http"
13         "os"
14         "path/filepath"
15         "strconv"
16         "strings"
17         "syscall"
18         "time"
19 )
20
21 // Default TCP port on which to listen for requests.
22 const DEFAULT_PORT = 25107
23
24 // A Keep "block" is 64MB.
25 const BLOCKSIZE = 64 * 1024 * 1024
26
27 // A Keep volume must have at least MIN_FREE_KILOBYTES available
28 // in order to permit writes.
29 const MIN_FREE_KILOBYTES = BLOCKSIZE / 1024
30
31 var PROC_MOUNTS = "/proc/mounts"
32
33 var KeepVolumes []string
34
35 type KeepError struct {
36         HTTPCode int
37         Err      error
38 }
39
40 const (
41         ErrCollision = 400
42         ErrMD5Fail   = 401
43         ErrCorrupt   = 402
44         ErrNotFound  = 404
45         ErrOther     = 500
46         ErrFull      = 503
47 )
48
49 func (e *KeepError) Error() string {
50         return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
51 }
52
53 func main() {
54         // Look for local keep volumes.
55         KeepVolumes = FindKeepVolumes()
56         if len(KeepVolumes) == 0 {
57                 log.Fatal("could not find any keep volumes")
58         }
59         for _, v := range KeepVolumes {
60                 log.Println("keep volume:", v)
61         }
62
63         // Set up REST handlers.
64         //
65         // Start with a router that will route each URL path to an
66         // appropriate handler.
67         //
68         rest := mux.NewRouter()
69         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
70         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
71         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
72         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
73         rest.HandleFunc(`/status\.json`, StatusHandler).Methods("GET", "HEAD")
74
75         // Tell the built-in HTTP server to direct all requests to the REST
76         // router.
77         http.Handle("/", rest)
78
79         // Start listening for requests.
80         port := fmt.Sprintf(":%d", DEFAULT_PORT)
81         http.ListenAndServe(port, nil)
82 }
83
84 // FindKeepVolumes
85 //     Returns a list of Keep volumes mounted on this system.
86 //
87 //     A Keep volume is a normal or tmpfs volume with a /keep
88 //     directory at the top level of the mount point.
89 //
90 func FindKeepVolumes() []string {
91         vols := make([]string, 0)
92
93         if f, err := os.Open(PROC_MOUNTS); err != nil {
94                 log.Fatalf("opening %s: %s\n", PROC_MOUNTS, err)
95         } else {
96                 scanner := bufio.NewScanner(f)
97                 for scanner.Scan() {
98                         args := strings.Fields(scanner.Text())
99                         dev, mount := args[0], args[1]
100                         if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
101                                 keep := mount + "/keep"
102                                 if st, err := os.Stat(keep); err == nil && st.IsDir() {
103                                         vols = append(vols, keep)
104                                 }
105                         }
106                 }
107                 if err := scanner.Err(); err != nil {
108                         log.Fatal(err)
109                 }
110         }
111         return vols
112 }
113
114 func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
115         hash := mux.Vars(req)["hash"]
116
117         block, err := GetBlock(hash)
118         if err != nil {
119                 http.Error(w, err.Error(), 404)
120                 return
121         }
122
123         _, err = w.Write(block)
124         if err != nil {
125                 log.Printf("GetBlockHandler: writing response: %s", err)
126         }
127
128         return
129 }
130
131 func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
132         hash := mux.Vars(req)["hash"]
133
134         // Read the block data to be stored.
135         // TODO(twp): decide what to do when the input stream contains
136         // more than BLOCKSIZE bytes.
137         //
138         buf := make([]byte, BLOCKSIZE)
139         if nread, err := req.Body.Read(buf); err == nil {
140                 if err := PutBlock(buf[:nread], hash); err == nil {
141                         w.WriteHeader(http.StatusOK)
142                 } else {
143                         ke := err.(*KeepError)
144                         http.Error(w, ke.Error(), ke.HTTPCode)
145                 }
146         } else {
147                 log.Println("error reading request: ", err)
148                 http.Error(w, err.Error(), 500)
149         }
150 }
151
152 func IndexHandler(w http.ResponseWriter, req *http.Request) {
153         prefix := mux.Vars(req)["prefix"]
154
155         index := IndexLocators(prefix)
156         w.Write([]byte(index))
157 }
158
159 // StatusHandler
160 //     Responds to /status.json requests with the current node status,
161 //     described in a JSON structure.
162 //
163 //     The data given in a status.json response includes:
164 //        time - the time the status was last updated
165 //        df   - the output of the most recent `df --block-size=1k`
166 //        disk_devices - list of disk device files (i.e. /dev/(s|xv|h)d)
167 //        dirs - an object describing Keep volumes, keyed on Keep volume dirs,
168 //          each value is an object describing the status of that volume
169 //            * status ("full [timestamp]" or "ok [timestamp]")
170 //            * last_error
171 //            * last_error_time
172 //
173 type VolumeStatus struct {
174         Space       string
175         LastErr     string
176         LastErrTime time.Time
177 }
178
179 type NodeStatus struct {
180         LastUpdate time.Time
181         DfOutput   string
182         DiskDev    []string
183         Volumes    map[string]VolumeStatus
184 }
185
186 func StatusHandler(w http.ResponseWriter, req *http.Request) {
187         st := new(NodeStatus)
188         st.LastUpdate = time.Now()
189
190         // Get a list of disk devices on this system.
191         st.DiskDev = make([]string, 1)
192         if devdir, err := os.Open("/dev"); err != nil {
193                 log.Printf("StatusHandler: opening /dev: %s\n", err)
194         } else {
195                 devs, err := devdir.Readdirnames(0)
196                 if err == nil {
197                         for _, d := range devs {
198                                 if strings.HasPrefix(d, "sd") ||
199                                         strings.HasPrefix(d, "hd") ||
200                                         strings.HasPrefix(d, "xvd") {
201                                         st.DiskDev = append(st.DiskDev, d)
202                                 }
203                         }
204                 } else {
205                         log.Printf("Readdirnames: %s", err)
206                 }
207         }
208
209         for _, vol := range KeepVolumes {
210                 st.Volumes[vol] = GetVolumeStatus(vol)
211         }
212 }
213
214 // GetVolumeStatus
215 //     Returns a VolumeStatus describing the requested volume.
216 func GetVolumeStatus(volume string) VolumeStatus {
217         var isfull, lasterr string
218         var lasterrtime time.Time
219
220         if IsFull(volume) {
221                 isfull = fmt.Sprintf("full %d", time.Now().Unix())
222         } else {
223                 isfull = fmt.Sprintf("ok %d", time.Now().Unix())
224         }
225
226         // Not implemented yet
227         lasterr = ""
228         lasterrtime = time.Unix(0, 0)
229
230         return VolumeStatus{isfull, lasterr, lasterrtime}
231 }
232
233 // IndexLocators
234 //     Returns a string containing a list of locator ids found on this
235 //     Keep server.  If {prefix} is given, return only those locator
236 //     ids that begin with the given prefix string.
237 //
238 //     The return string consists of a sequence of newline-separated
239 //     strings in the format
240 //
241 //         locator+size modification-time
242 //
243 //     e.g.:
244 //
245 //         e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
246 //         e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
247 //         e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
248 //
249 func IndexLocators(prefix string) string {
250         var output string
251         for _, vol := range KeepVolumes {
252                 filepath.Walk(vol,
253                         func(path string, info os.FileInfo, err error) error {
254                                 // This WalkFunc inspects each path in the volume
255                                 // and prints an index line for all files that begin
256                                 // with prefix.
257                                 if err != nil {
258                                         log.Printf("IndexHandler: %s: walking to %s: %s",
259                                                 vol, path, err)
260                                         return nil
261                                 }
262                                 locator := filepath.Base(path)
263                                 // Skip directories that do not match prefix.
264                                 // We know there is nothing interesting inside.
265                                 if info.IsDir() &&
266                                         !strings.HasPrefix(locator, prefix) &&
267                                         !strings.HasPrefix(prefix, locator) {
268                                         return filepath.SkipDir
269                                 }
270                                 // Print filenames beginning with prefix
271                                 if !info.IsDir() && strings.HasPrefix(locator, prefix) {
272                                         output = output + fmt.Sprintf(
273                                                 "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
274                                 }
275                                 return nil
276                         })
277         }
278
279         return output
280 }
281
282 func GetBlock(hash string) ([]byte, error) {
283         var buf = make([]byte, BLOCKSIZE)
284
285         // Attempt to read the requested hash from a keep volume.
286         for _, vol := range KeepVolumes {
287                 var f *os.File
288                 var err error
289                 var nread int
290
291                 blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
292
293                 f, err = os.Open(blockFilename)
294                 if err != nil {
295                         if !os.IsNotExist(err) {
296                                 // A block is stored on only one Keep disk,
297                                 // so os.IsNotExist is expected.  Report any other errors.
298                                 log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
299                         }
300                         continue
301                 }
302
303                 nread, err = f.Read(buf)
304                 if err != nil {
305                         log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
306                         continue
307                 }
308
309                 // Double check the file checksum.
310                 //
311                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
312                 if filehash != hash {
313                         // TODO(twp): this condition probably represents a bad disk and
314                         // should raise major alarm bells for an administrator: e.g.
315                         // they should be sent directly to an event manager at high
316                         // priority or logged as urgent problems.
317                         //
318                         log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
319                                 vol, blockFilename, filehash)
320                         return buf, &KeepError{ErrCorrupt, errors.New("Corrupt")}
321                 }
322
323                 // Success!
324                 return buf[:nread], nil
325         }
326
327         log.Printf("%s: not found on any volumes, giving up\n", hash)
328         return buf, &KeepError{ErrNotFound, errors.New("not found: " + hash)}
329 }
330
331 /* PutBlock(block, hash)
332    Stores the BLOCK (identified by the content id HASH) in Keep.
333
334    The MD5 checksum of the block must be identical to the content id HASH.
335    If not, an error is returned.
336
337    PutBlock stores the BLOCK on the first Keep volume with free space.
338    A failure code is returned to the user only if all volumes fail.
339
340    On success, PutBlock returns nil.
341    On failure, it returns a KeepError with one of the following codes:
342
343    400 Collision
344           A different block with the same hash already exists on this
345           Keep server.
346    401 MD5Fail
347           The MD5 hash of the BLOCK does not match the argument HASH.
348    503 Full
349           There was not enough space left in any Keep volume to store
350           the object.
351    500 Fail
352           The object could not be stored for some other reason (e.g.
353           all writes failed). The text of the error message should
354           provide as much detail as possible.
355 */
356
357 func PutBlock(block []byte, hash string) error {
358         // Check that BLOCK's checksum matches HASH.
359         blockhash := fmt.Sprintf("%x", md5.Sum(block))
360         if blockhash != hash {
361                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
362                 return &KeepError{ErrMD5Fail, errors.New("MD5Fail")}
363         }
364
365         // If we already have a block on disk under this identifier, return
366         // success (but check for MD5 collisions).
367         // The only errors that GetBlock can return are ErrCorrupt and ErrNotFound.
368         // In either case, we want to write our new (good) block to disk, so there is
369         // nothing special to do if err != nil.
370         if oldblock, err := GetBlock(hash); err == nil {
371                 if bytes.Compare(block, oldblock) == 0 {
372                         return nil
373                 } else {
374                         return &KeepError{ErrCollision, errors.New("Collision")}
375                 }
376         }
377
378         // Store the block on the first available Keep volume.
379         allFull := true
380         for _, vol := range KeepVolumes {
381                 if IsFull(vol) {
382                         continue
383                 }
384                 allFull = false
385                 blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
386                 if err := os.MkdirAll(blockDir, 0755); err != nil {
387                         log.Printf("%s: could not create directory %s: %s",
388                                 hash, blockDir, err)
389                         continue
390                 }
391
392                 tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+hash)
393                 if tmperr != nil {
394                         log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, hash, tmperr)
395                         continue
396                 }
397                 blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
398
399                 if _, err := tmpfile.Write(block); err != nil {
400                         log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
401                         continue
402                 }
403                 if err := tmpfile.Close(); err != nil {
404                         log.Printf("closing %s: %s\n", tmpfile.Name(), err)
405                         os.Remove(tmpfile.Name())
406                         continue
407                 }
408                 if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
409                         log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
410                         os.Remove(tmpfile.Name())
411                         continue
412                 }
413                 return nil
414         }
415
416         if allFull {
417                 log.Printf("all Keep volumes full")
418                 return &KeepError{ErrFull, errors.New("Full")}
419         } else {
420                 log.Printf("all Keep volumes failed")
421                 return &KeepError{ErrOther, errors.New("Fail")}
422         }
423 }
424
425 func IsFull(volume string) (isFull bool) {
426         fullSymlink := volume + "/full"
427
428         // Check if the volume has been marked as full in the last hour.
429         if link, err := os.Readlink(fullSymlink); err == nil {
430                 if ts, err := strconv.Atoi(link); err == nil {
431                         fulltime := time.Unix(int64(ts), 0)
432                         if time.Since(fulltime).Hours() < 1.0 {
433                                 return true
434                         }
435                 }
436         }
437
438         if avail, err := FreeDiskSpace(volume); err == nil {
439                 isFull = avail < MIN_FREE_KILOBYTES
440         } else {
441                 log.Printf("%s: FreeDiskSpace: %s\n", volume, err)
442                 isFull = false
443         }
444
445         // If the volume is full, timestamp it.
446         if isFull {
447                 now := fmt.Sprintf("%d", time.Now().Unix())
448                 os.Symlink(now, fullSymlink)
449         }
450         return
451 }
452
453 // FreeDiskSpace(volume)
454 //     Returns the amount of available disk space on VOLUME,
455 //     as a number of 1k blocks.
456 //
457 func FreeDiskSpace(volume string) (free uint64, err error) {
458         var fs syscall.Statfs_t
459         err = syscall.Statfs(volume, &fs)
460         if err == nil {
461                 // Statfs output is not guaranteed to measure free
462                 // space in terms of 1K blocks.
463                 free = fs.Bavail * uint64(fs.Bsize) / 1024
464         }
465         return
466 }