X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a17b176ea55dc3820ef1bde4b99cf33c628ffbbe..200f7004f921a68ec40b407dfe31f1db95e98fb9:/services/keepstore/volume_unix.go diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go index 75a75229a6..8d23d11618 100644 --- a/services/keepstore/volume_unix.go +++ b/services/keepstore/volume_unix.go @@ -4,6 +4,7 @@ package main import ( "fmt" + "io" "io/ioutil" "log" "os" @@ -50,8 +51,9 @@ type IOResponse struct { // request. // type UnixVolume struct { - root string // path to this volume - queue chan *IORequest + root string // path to this volume + queue chan *IORequest + readonly bool } func (v *UnixVolume) IOHandler() { @@ -67,14 +69,17 @@ func (v *UnixVolume) IOHandler() { } } -func MakeUnixVolume(root string, serialize bool) (v UnixVolume) { +func MakeUnixVolume(root string, serialize bool, readonly bool) *UnixVolume { + v := &UnixVolume{ + root: root, + queue: nil, + readonly: readonly, + } if serialize { - v = UnixVolume{root, make(chan *IORequest)} + v.queue = make(chan *IORequest) go v.IOHandler() - } else { - v = UnixVolume{root, nil} } - return + return v } func (v *UnixVolume) Get(loc string) ([]byte, error) { @@ -88,6 +93,9 @@ func (v *UnixVolume) Get(loc string) ([]byte, error) { } func (v *UnixVolume) Put(loc string, block []byte) error { + if v.readonly { + return MethodDisabledError + } if v.queue == nil { return v.Write(loc, block) } @@ -98,11 +106,15 @@ func (v *UnixVolume) Put(loc string, block []byte) error { } func (v *UnixVolume) Touch(loc string) error { + if v.readonly { + return MethodDisabledError + } p := v.blockPath(loc) f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644) if err != nil { return err } + defer f.Close() if e := lockfile(f); e != nil { return e } @@ -204,14 +216,13 @@ func (v *UnixVolume) Status() *VolumeStatus { return &VolumeStatus{v.root, devnum, free, used} } -// Index returns a list of blocks found on this volume which begin with -// the specified prefix. If the prefix is an empty string, Index returns -// a complete list of blocks. +// IndexTo writes (to the given Writer) a list of blocks found on this +// volume which begin with the specified prefix. If the prefix is an +// empty string, IndexTo writes a complete list of blocks. // -// The return value is a multiline string (separated by -// newlines). Each line is in the format +// Each block is given in the format // -// locator+size modification-time +// locator+size modification-time {newline} // // e.g.: // @@ -219,60 +230,66 @@ func (v *UnixVolume) Status() *VolumeStatus { // e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043 // e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136 // -func (v *UnixVolume) Index(prefix string) (output string) { - filepath.Walk(v.root, +func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { + return filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error { - // This WalkFunc inspects each path in the volume - // and prints an index line for all files that begin - // with prefix. if err != nil { - log.Printf("IndexHandler: %s: walking to %s: %s", + log.Printf("%s: IndexTo Walk error at %s: %s", v, path, err) return nil } - locator := filepath.Base(path) - // Skip directories that do not match prefix. - // We know there is nothing interesting inside. + basename := filepath.Base(path) if info.IsDir() && - !strings.HasPrefix(locator, prefix) && - !strings.HasPrefix(prefix, locator) { + !strings.HasPrefix(basename, prefix) && + !strings.HasPrefix(prefix, basename) { + // Skip directories that do not match + // prefix. We know there is nothing + // interesting inside. return filepath.SkipDir } - // Skip any file that is not apparently a locator, e.g. .meta files - if !IsValidLocator(locator) { + if info.IsDir() || + !IsValidLocator(basename) || + !strings.HasPrefix(basename, prefix) { return nil } - // Print filenames beginning with prefix - if !info.IsDir() && strings.HasPrefix(locator, prefix) { - output = output + fmt.Sprintf( - "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix()) - } - return nil + _, err = fmt.Fprintf(w, "%s+%d %d\n", + basename, info.Size(), info.ModTime().Unix()) + return err }) - - return } func (v *UnixVolume) Delete(loc string) error { + // Touch() must be called before calling Write() on a block. Touch() + // also uses lockfile(). This avoids a race condition between Write() + // and Delete() because either (a) the file will be deleted and Touch() + // will signal to the caller that the file is not present (and needs to + // be re-written), or (b) Touch() will update the file's timestamp and + // Delete() will read the correct up-to-date timestamp and choose not to + // delete the file. + + if v.readonly { + return MethodDisabledError + } p := v.blockPath(loc) f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644) if err != nil { return err } + defer f.Close() if e := lockfile(f); e != nil { return e } defer unlockfile(f) - // If the block has been PUT more recently than -permission_ttl, - // return success without removing the block. This guards against - // a race condition where a block is old enough that Data Manager - // has added it to the trash list, but the user submitted a PUT - // for the block since then. + // If the block has been PUT in the last blob_signature_ttl + // seconds, return success without removing the block. This + // protects data from garbage collection until it is no longer + // possible for clients to retrieve the unreferenced blocks + // anyway (because the permission signatures have expired). if fi, err := os.Stat(p); err != nil { return err } else { - if time.Since(fi.ModTime()) < permission_ttl { + if time.Since(fi.ModTime()) < blob_signature_ttl { return nil } } @@ -340,6 +357,10 @@ func (v *UnixVolume) String() string { return fmt.Sprintf("[UnixVolume %s]", v.root) } +func (v *UnixVolume) Writable() bool { + return !v.readonly +} + // lockfile and unlockfile use flock(2) to manage kernel file locks. func lockfile(f *os.File) error { return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)