1 // A Volume is an interface representing a Keep back-end storage unit:
2 // for example, a single mounted disk, a RAID array, an Amazon S3 volume,
5 // A UnixVolume is a Volume that is backed by a locally mounted disk.
21 type Volume interface {
22 Get(loc string) ([]byte, error)
23 Put(loc string, block []byte) error
24 Index(prefix string) string
25 Status() *VolumeStatus
29 // IORequests are encapsulated requests to perform I/O on a Keep volume.
30 // When running in serialized mode, the Keep front end sends IORequests
31 // on a channel to an IORunner, which handles them one at a time and
32 // returns an IOResponse.
37 KeepGet IOMethod = iota
41 type IORequest struct {
45 reply chan *IOResponse
48 type IOResponse struct {
53 // A UnixVolume is configured with:
55 // * root: the path to the volume's root directory
56 // * queue: if non-nil, all I/O requests for this volume should be queued
57 // on this channel. The response will be delivered on the IOResponse
58 // channel included in the request.
60 type UnixVolume struct {
61 root string // path to this volume
65 func (v *UnixVolume) IOHandler() {
66 for req := range v.queue {
70 result.data, result.err = v.Read(req.loc)
72 result.err = v.Write(req.loc, req.data)
78 func MakeUnixVolume(root string, queue chan *IORequest) UnixVolume {
79 v := UnixVolume{root, queue}
86 func (v *UnixVolume) Get(loc string) ([]byte, error) {
90 reply := make(chan *IOResponse)
91 v.queue <- &IORequest{KeepGet, loc, nil, reply}
93 return response.data, response.err
96 func (v *UnixVolume) Put(loc string, block []byte) error {
98 return v.Write(loc, block)
100 reply := make(chan *IOResponse)
101 v.queue <- &IORequest{KeepPut, loc, block, reply}
106 // Read retrieves a block identified by the locator string "loc", and
107 // returns its contents as a byte slice.
109 // If the block could not be opened or read, Read returns a nil slice
110 // and the os.Error that was generated.
112 // If the block is present but its content hash does not match loc,
113 // Read returns the block and a CorruptError. It is the caller's
114 // responsibility to decide what (if anything) to do with the
115 // corrupted data block.
117 func (v *UnixVolume) Read(loc string) ([]byte, error) {
122 blockFilename := fmt.Sprintf("%s/%s/%s", v.root, loc[0:3], loc)
124 f, err = os.Open(blockFilename)
129 var buf = make([]byte, BLOCKSIZE)
130 nread, err = f.Read(buf)
132 log.Printf("%s: reading %s: %s\n", v, blockFilename, err)
137 return buf[:nread], nil
140 // Write stores a block of data identified by the locator string
141 // "loc". It returns nil on success. If the volume is full, it
142 // returns a FullError. If the write fails due to some other error,
143 // that error is returned.
145 func (v *UnixVolume) Write(loc string, block []byte) error {
149 blockDir := fmt.Sprintf("%s/%s", v.root, loc[0:3])
150 if err := os.MkdirAll(blockDir, 0755); err != nil {
151 log.Printf("%s: could not create directory %s: %s",
156 tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+loc)
158 log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, loc, tmperr)
161 blockFilename := fmt.Sprintf("%s/%s", blockDir, loc)
163 if _, err := tmpfile.Write(block); err != nil {
164 log.Printf("%s: writing to %s: %s\n", v, blockFilename, err)
167 if err := tmpfile.Close(); err != nil {
168 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
169 os.Remove(tmpfile.Name())
172 if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
173 log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
174 os.Remove(tmpfile.Name())
180 // Status returns a VolumeStatus struct describing the volume's
183 func (v *UnixVolume) Status() *VolumeStatus {
184 var fs syscall.Statfs_t
187 if fi, err := os.Stat(v.root); err == nil {
188 devnum = fi.Sys().(*syscall.Stat_t).Dev
190 log.Printf("%s: os.Stat: %s\n", v, err)
194 err := syscall.Statfs(v.root, &fs)
196 log.Printf("%s: statfs: %s\n", v, err)
199 // These calculations match the way df calculates disk usage:
200 // "free" space is measured by fs.Bavail, but "used" space
201 // uses fs.Blocks - fs.Bfree.
202 free := fs.Bavail * uint64(fs.Bsize)
203 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
204 return &VolumeStatus{v.root, devnum, free, used}
207 // Index returns a list of blocks found on this volume which begin with
208 // the specified prefix. If the prefix is an empty string, Index returns
209 // a complete list of blocks.
211 // The return value is a multiline string (separated by
212 // newlines). Each line is in the format
214 // locator+size modification-time
218 // e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
219 // e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
220 // e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
222 func (v *UnixVolume) Index(prefix string) (output string) {
223 filepath.Walk(v.root,
224 func(path string, info os.FileInfo, err error) error {
225 // This WalkFunc inspects each path in the volume
226 // and prints an index line for all files that begin
229 log.Printf("IndexHandler: %s: walking to %s: %s",
233 locator := filepath.Base(path)
234 // Skip directories that do not match prefix.
235 // We know there is nothing interesting inside.
237 !strings.HasPrefix(locator, prefix) &&
238 !strings.HasPrefix(prefix, locator) {
239 return filepath.SkipDir
241 // Skip any file that is not apparently a locator, e.g. .meta files
242 if is_valid, err := IsValidLocator(locator); err != nil {
244 } else if !is_valid {
247 // Print filenames beginning with prefix
248 if !info.IsDir() && strings.HasPrefix(locator, prefix) {
249 output = output + fmt.Sprintf(
250 "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
258 // IsFull returns true if the free space on the volume is less than
259 // MIN_FREE_KILOBYTES.
261 func (v *UnixVolume) IsFull() (isFull bool) {
262 fullSymlink := v.root + "/full"
264 // Check if the volume has been marked as full in the last hour.
265 if link, err := os.Readlink(fullSymlink); err == nil {
266 if ts, err := strconv.Atoi(link); err == nil {
267 fulltime := time.Unix(int64(ts), 0)
268 if time.Since(fulltime).Hours() < 1.0 {
274 if avail, err := v.FreeDiskSpace(); err == nil {
275 isFull = avail < MIN_FREE_KILOBYTES
277 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
281 // If the volume is full, timestamp it.
283 now := fmt.Sprintf("%d", time.Now().Unix())
284 os.Symlink(now, fullSymlink)
289 // FreeDiskSpace returns the number of unused 1k blocks available on
292 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
293 var fs syscall.Statfs_t
294 err = syscall.Statfs(v.root, &fs)
296 // Statfs output is not guaranteed to measure free
297 // space in terms of 1K blocks.
298 free = fs.Bavail * uint64(fs.Bsize) / 1024
303 func (v *UnixVolume) String() string {
304 return fmt.Sprintf("[UnixVolume %s]", v.root)