Moved the MD5 verification check back to GetBlock.
[arvados.git] / services / keep / src / keep / volume.go
1 // A Volume is an interface representing a Keep back-end storage unit:
2 // for example, a single mounted disk, a RAID array, an Amazon S3 volume,
3 // etc.
4 //
5 // A UnixVolume is a Volume that is backed by a locally mounted disk.
6
7 package main
8
9 import (
10         "fmt"
11         "io/ioutil"
12         "log"
13         "os"
14         "path/filepath"
15         "strconv"
16         "strings"
17         "syscall"
18         "time"
19 )
20
21 type Volume interface {
22         Read(loc string) ([]byte, error)
23         Write(loc string, block []byte) error
24         Index(prefix string) string
25         Status() *VolumeStatus
26         String() string
27 }
28
29 type UnixVolume struct {
30         root string // path to this volume
31 }
32
33 func (v *UnixVolume) String() string {
34         return fmt.Sprintf("[UnixVolume %s]", v.root)
35 }
36
37 // Read retrieves a block identified by the locator string "loc", and
38 // returns its contents as a byte slice.
39 //
40 // If the block could not be opened or read, Read returns a nil slice
41 // and the os.Error that was generated.
42 //
43 // If the block is present but its content hash does not match loc,
44 // Read returns the block and a CorruptError.  It is the caller's
45 // responsibility to decide what (if anything) to do with the
46 // corrupted data block.
47 //
48 func (v *UnixVolume) Read(loc string) ([]byte, error) {
49         var f *os.File
50         var err error
51         var nread int
52
53         blockFilename := fmt.Sprintf("%s/%s/%s", v.root, loc[0:3], loc)
54
55         f, err = os.Open(blockFilename)
56         if err != nil {
57                 return nil, err
58         }
59
60         var buf = make([]byte, BLOCKSIZE)
61         nread, err = f.Read(buf)
62         if err != nil {
63                 log.Printf("%s: reading %s: %s\n", v, blockFilename, err)
64                 return buf, err
65         }
66
67         // Success!
68         return buf[:nread], nil
69 }
70
71 // Write stores a block of data identified by the locator string
72 // "loc".  It returns nil on success.  If the volume is full, it
73 // returns a FullError.  If the write fails due to some other error,
74 // that error is returned.
75 //
76 func (v *UnixVolume) Write(loc string, block []byte) error {
77         if v.IsFull() {
78                 return FullError
79         }
80         blockDir := fmt.Sprintf("%s/%s", v.root, loc[0:3])
81         if err := os.MkdirAll(blockDir, 0755); err != nil {
82                 log.Printf("%s: could not create directory %s: %s",
83                         loc, blockDir, err)
84                 return err
85         }
86
87         tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+loc)
88         if tmperr != nil {
89                 log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, loc, tmperr)
90                 return tmperr
91         }
92         blockFilename := fmt.Sprintf("%s/%s", blockDir, loc)
93
94         if _, err := tmpfile.Write(block); err != nil {
95                 log.Printf("%s: writing to %s: %s\n", v, blockFilename, err)
96                 return err
97         }
98         if err := tmpfile.Close(); err != nil {
99                 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
100                 os.Remove(tmpfile.Name())
101                 return err
102         }
103         if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
104                 log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
105                 os.Remove(tmpfile.Name())
106                 return err
107         }
108         return nil
109 }
110
111 // Status returns a VolumeStatus struct describing the volume's
112 // current state.
113 //
114 func (v *UnixVolume) Status() *VolumeStatus {
115         var fs syscall.Statfs_t
116         var devnum uint64
117
118         if fi, err := os.Stat(v.root); err == nil {
119                 devnum = fi.Sys().(*syscall.Stat_t).Dev
120         } else {
121                 log.Printf("%s: os.Stat: %s\n", v, err)
122                 return nil
123         }
124
125         err := syscall.Statfs(v.root, &fs)
126         if err != nil {
127                 log.Printf("%s: statfs: %s\n", v, err)
128                 return nil
129         }
130         // These calculations match the way df calculates disk usage:
131         // "free" space is measured by fs.Bavail, but "used" space
132         // uses fs.Blocks - fs.Bfree.
133         free := fs.Bavail * uint64(fs.Bsize)
134         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
135         return &VolumeStatus{v.root, devnum, free, used}
136 }
137
138 // Index returns a list of blocks found on this volume which begin with
139 // the specified prefix. If the prefix is an empty string, Index returns
140 // a complete list of blocks.
141 //
142 // The return value is a multiline string (separated by
143 // newlines). Each line is in the format
144 //
145 //     locator+size modification-time
146 //
147 // e.g.:
148 //
149 //     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
150 //     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
151 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
152 //
153 func (v *UnixVolume) Index(prefix string) (output string) {
154         filepath.Walk(v.root,
155                 func(path string, info os.FileInfo, err error) error {
156                         // This WalkFunc inspects each path in the volume
157                         // and prints an index line for all files that begin
158                         // with prefix.
159                         if err != nil {
160                                 log.Printf("IndexHandler: %s: walking to %s: %s",
161                                         v, path, err)
162                                 return nil
163                         }
164                         locator := filepath.Base(path)
165                         // Skip directories that do not match prefix.
166                         // We know there is nothing interesting inside.
167                         if info.IsDir() &&
168                                 !strings.HasPrefix(locator, prefix) &&
169                                 !strings.HasPrefix(prefix, locator) {
170                                 return filepath.SkipDir
171                         }
172                         // Skip any file that is not apparently a locator, e.g. .meta files
173                         if is_valid, err := IsValidLocator(locator); err != nil {
174                                 return err
175                         } else if !is_valid {
176                                 return nil
177                         }
178                         // Print filenames beginning with prefix
179                         if !info.IsDir() && strings.HasPrefix(locator, prefix) {
180                                 output = output + fmt.Sprintf(
181                                         "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
182                         }
183                         return nil
184                 })
185
186         return
187 }
188
189 // IsFull returns true if the free space on the volume is less than
190 // MIN_FREE_KILOBYTES.
191 //
192 func (v *UnixVolume) IsFull() (isFull bool) {
193         fullSymlink := v.root + "/full"
194
195         // Check if the volume has been marked as full in the last hour.
196         if link, err := os.Readlink(fullSymlink); err == nil {
197                 if ts, err := strconv.Atoi(link); err == nil {
198                         fulltime := time.Unix(int64(ts), 0)
199                         if time.Since(fulltime).Hours() < 1.0 {
200                                 return true
201                         }
202                 }
203         }
204
205         if avail, err := v.FreeDiskSpace(); err == nil {
206                 isFull = avail < MIN_FREE_KILOBYTES
207         } else {
208                 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
209                 isFull = false
210         }
211
212         // If the volume is full, timestamp it.
213         if isFull {
214                 now := fmt.Sprintf("%d", time.Now().Unix())
215                 os.Symlink(now, fullSymlink)
216         }
217         return
218 }
219
220 // FreeDiskSpace returns the number of unused 1k blocks available on
221 // the volume.
222 //
223 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
224         var fs syscall.Statfs_t
225         err = syscall.Statfs(v.root, &fs)
226         if err == nil {
227                 // Statfs output is not guaranteed to measure free
228                 // space in terms of 1K blocks.
229                 free = fs.Bavail * uint64(fs.Bsize) / 1024
230         }
231         return
232 }