1 // A Volume is an interface representing a Keep back-end storage unit:
2 // for example, a single mounted disk, a RAID array, an Amazon S3 volume,
5 // A UnixVolume is a Volume that is backed by a locally mounted disk.
21 type Volume interface {
22 Get(loc string) ([]byte, error)
23 Put(loc string, block []byte) error
24 Index(prefix string) string
25 Status() *VolumeStatus
29 // IORequests are encapsulated requests to perform I/O on a Keep volume.
30 // When running in serialized mode, the Keep front end sends IORequests
31 // on a channel to an IORunner, which handles them one at a time and
32 // returns an IOResponse.
37 KeepGet IOMethod = iota
41 type IORequest struct {
45 reply chan *IOResponse
48 type IOResponse struct {
53 // A UnixVolume is configured with:
55 // * root: the path to the volume's root directory
56 // * queue: if non-nil, all I/O requests for this volume should be queued
57 // on this channel. The response will be delivered on the IOResponse
58 // channel included in the request.
60 type UnixVolume struct {
61 root string // path to this volume
65 func (v *UnixVolume) IOHandler() {
66 for req := range v.queue {
70 result.data, result.err = v.Read(req.loc)
72 result.err = v.Write(req.loc, req.data)
78 func MakeUnixVolume(root string, serialize bool) (v UnixVolume) {
80 v = UnixVolume{root, make(chan *IORequest)}
83 v = UnixVolume{root, nil}
88 func (v *UnixVolume) Get(loc string) ([]byte, error) {
92 reply := make(chan *IOResponse)
93 v.queue <- &IORequest{KeepGet, loc, nil, reply}
95 return response.data, response.err
98 func (v *UnixVolume) Put(loc string, block []byte) error {
100 return v.Write(loc, block)
102 reply := make(chan *IOResponse)
103 v.queue <- &IORequest{KeepPut, loc, block, reply}
108 // Read retrieves a block identified by the locator string "loc", and
109 // returns its contents as a byte slice.
111 // If the block could not be opened or read, Read returns a nil slice
112 // and the os.Error that was generated.
114 // If the block is present but its content hash does not match loc,
115 // Read returns the block and a CorruptError. It is the caller's
116 // responsibility to decide what (if anything) to do with the
117 // corrupted data block.
119 func (v *UnixVolume) Read(loc string) ([]byte, error) {
124 blockFilename := fmt.Sprintf("%s/%s/%s", v.root, loc[0:3], loc)
126 f, err = os.Open(blockFilename)
131 var buf = make([]byte, BLOCKSIZE)
132 nread, err = f.Read(buf)
134 log.Printf("%s: reading %s: %s\n", v, blockFilename, err)
139 return buf[:nread], nil
142 // Write stores a block of data identified by the locator string
143 // "loc". It returns nil on success. If the volume is full, it
144 // returns a FullError. If the write fails due to some other error,
145 // that error is returned.
147 func (v *UnixVolume) Write(loc string, block []byte) error {
151 blockDir := fmt.Sprintf("%s/%s", v.root, loc[0:3])
152 if err := os.MkdirAll(blockDir, 0755); err != nil {
153 log.Printf("%s: could not create directory %s: %s",
158 tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+loc)
160 log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, loc, tmperr)
163 blockFilename := fmt.Sprintf("%s/%s", blockDir, loc)
165 if _, err := tmpfile.Write(block); err != nil {
166 log.Printf("%s: writing to %s: %s\n", v, blockFilename, err)
169 if err := tmpfile.Close(); err != nil {
170 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
171 os.Remove(tmpfile.Name())
174 if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
175 log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
176 os.Remove(tmpfile.Name())
182 // Status returns a VolumeStatus struct describing the volume's
185 func (v *UnixVolume) Status() *VolumeStatus {
186 var fs syscall.Statfs_t
189 if fi, err := os.Stat(v.root); err == nil {
190 devnum = fi.Sys().(*syscall.Stat_t).Dev
192 log.Printf("%s: os.Stat: %s\n", v, err)
196 err := syscall.Statfs(v.root, &fs)
198 log.Printf("%s: statfs: %s\n", v, err)
201 // These calculations match the way df calculates disk usage:
202 // "free" space is measured by fs.Bavail, but "used" space
203 // uses fs.Blocks - fs.Bfree.
204 free := fs.Bavail * uint64(fs.Bsize)
205 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
206 return &VolumeStatus{v.root, devnum, free, used}
209 // Index returns a list of blocks found on this volume which begin with
210 // the specified prefix. If the prefix is an empty string, Index returns
211 // a complete list of blocks.
213 // The return value is a multiline string (separated by
214 // newlines). Each line is in the format
216 // locator+size modification-time
220 // e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
221 // e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
222 // e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
224 func (v *UnixVolume) Index(prefix string) (output string) {
225 filepath.Walk(v.root,
226 func(path string, info os.FileInfo, err error) error {
227 // This WalkFunc inspects each path in the volume
228 // and prints an index line for all files that begin
231 log.Printf("IndexHandler: %s: walking to %s: %s",
235 locator := filepath.Base(path)
236 // Skip directories that do not match prefix.
237 // We know there is nothing interesting inside.
239 !strings.HasPrefix(locator, prefix) &&
240 !strings.HasPrefix(prefix, locator) {
241 return filepath.SkipDir
243 // Skip any file that is not apparently a locator, e.g. .meta files
244 if !IsValidLocator(locator) {
247 // Print filenames beginning with prefix
248 if !info.IsDir() && strings.HasPrefix(locator, prefix) {
249 output = output + fmt.Sprintf(
250 "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
258 // IsFull returns true if the free space on the volume is less than
259 // MIN_FREE_KILOBYTES.
261 func (v *UnixVolume) IsFull() (isFull bool) {
262 fullSymlink := v.root + "/full"
264 // Check if the volume has been marked as full in the last hour.
265 if link, err := os.Readlink(fullSymlink); err == nil {
266 if ts, err := strconv.Atoi(link); err == nil {
267 fulltime := time.Unix(int64(ts), 0)
268 if time.Since(fulltime).Hours() < 1.0 {
274 if avail, err := v.FreeDiskSpace(); err == nil {
275 isFull = avail < MIN_FREE_KILOBYTES
277 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
281 // If the volume is full, timestamp it.
283 now := fmt.Sprintf("%d", time.Now().Unix())
284 os.Symlink(now, fullSymlink)
289 // FreeDiskSpace returns the number of unused 1k blocks available on
292 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
293 var fs syscall.Statfs_t
294 err = syscall.Statfs(v.root, &fs)
296 // Statfs output is not guaranteed to measure free
297 // space in terms of 1K blocks.
298 free = fs.Bavail * uint64(fs.Bsize) / 1024
303 func (v *UnixVolume) String() string {
304 return fmt.Sprintf("[UnixVolume %s]", v.root)