X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2c6557f613fcf6cdcebb08c321a5d061aeb780c6..5b7b834bcab1a32fe77851d78fd984e1c96465ee:/services/keepstore/unix_volume.go diff --git a/services/keepstore/unix_volume.go b/services/keepstore/unix_volume.go index 98edfae14d..92cf12ac18 100644 --- a/services/keepstore/unix_volume.go +++ b/services/keepstore/unix_volume.go @@ -32,12 +32,13 @@ func init() { } func newUnixVolume(params newVolumeParams) (volume, error) { - v := &UnixVolume{ - uuid: params.UUID, - cluster: params.Cluster, - volume: params.ConfigVolume, - logger: params.Logger, - metrics: params.MetricsVecs, + v := &unixVolume{ + uuid: params.UUID, + cluster: params.Cluster, + volume: params.ConfigVolume, + logger: params.Logger, + metrics: params.MetricsVecs, + bufferPool: params.BufferPool, } err := json.Unmarshal(params.ConfigVolume.DriverParameters, &v) if err != nil { @@ -47,7 +48,7 @@ func newUnixVolume(params newVolumeParams) (volume, error) { return v, v.check() } -func (v *UnixVolume) check() error { +func (v *unixVolume) check() error { if v.Root == "" { return errors.New("DriverParameters.Root was not provided") } @@ -66,16 +67,17 @@ func (v *UnixVolume) check() error { return err } -// A UnixVolume stores and retrieves blocks in a local directory. -type UnixVolume struct { +// A unixVolume stores and retrieves blocks in a local directory. +type unixVolume struct { Root string // path to the volume's root directory Serialize bool - uuid string - cluster *arvados.Cluster - volume arvados.Volume - logger logrus.FieldLogger - metrics *volumeMetricsVecs + uuid string + cluster *arvados.Cluster + volume arvados.Volume + logger logrus.FieldLogger + metrics *volumeMetricsVecs + bufferPool *bufferPool // something to lock during IO, typically a sync.Mutex (or nil // to skip locking) @@ -89,7 +91,7 @@ type UnixVolume struct { // filesystem root to storage directory, joined by "/". For example, // the device ID for a local directory "/mnt/xvda1/keep" might be // "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep". -func (v *UnixVolume) DeviceID() string { +func (v *unixVolume) DeviceID() string { giveup := func(f string, args ...interface{}) string { v.logger.Infof(f+"; using hostname:path for volume %s", append(args, v.uuid)...) host, _ := os.Hostname() @@ -163,7 +165,7 @@ func (v *UnixVolume) DeviceID() string { } // BlockTouch sets the timestamp for the given locator to the current time -func (v *UnixVolume) BlockTouch(hash string) error { +func (v *unixVolume) BlockTouch(hash string) error { p := v.blockPath(hash) f, err := v.os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644) if err != nil { @@ -187,7 +189,7 @@ func (v *UnixVolume) BlockTouch(hash string) error { } // Mtime returns the stored timestamp for the given locator. -func (v *UnixVolume) Mtime(loc string) (time.Time, error) { +func (v *unixVolume) Mtime(loc string) (time.Time, error) { p := v.blockPath(loc) fi, err := v.os.Stat(p) if err != nil { @@ -196,23 +198,8 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) { return fi.ModTime(), nil } -// Lock the locker (if one is in use), open the file for reading, and -// call the given function if and when the file is ready to read. -func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error { - if err := v.lock(ctx); err != nil { - return err - } - defer v.unlock() - f, err := v.os.Open(path) - if err != nil { - return err - } - defer f.Close() - return fn(newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes)) -} - // stat is os.Stat() with some extra sanity checks. -func (v *UnixVolume) stat(path string) (os.FileInfo, error) { +func (v *unixVolume) stat(path string) (os.FileInfo, error) { stat, err := v.os.Stat(path) if err == nil { if stat.Size() < 0 { @@ -225,26 +212,33 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) { } // BlockRead reads a block from the volume. -func (v *UnixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) { +func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error { path := v.blockPath(hash) stat, err := v.stat(path) if err != nil { - return 0, v.translateError(err) + return v.translateError(err) } - var n int64 - err = v.getFunc(ctx, path, func(rdr io.Reader) error { - n, err = io.Copy(w, rdr) - if err == nil && n != stat.Size() { - err = io.ErrUnexpectedEOF - } + if err := v.lock(ctx); err != nil { return err - }) - return int(n), err + } + defer v.unlock() + f, err := v.os.Open(path) + if err != nil { + return err + } + defer f.Close() + src := newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes) + dst := io.NewOffsetWriter(w, 0) + n, err := io.Copy(dst, src) + if err == nil && n != stat.Size() { + err = io.ErrUnexpectedEOF + } + return err } // BlockWrite stores a block on the volume. If it already exists, its // timestamp is updated. -func (v *UnixVolume) BlockWrite(ctx context.Context, hash string, data []byte) error { +func (v *unixVolume) BlockWrite(ctx context.Context, hash string, data []byte) error { if v.isFull() { return errFull } @@ -293,7 +287,7 @@ func (v *UnixVolume) BlockWrite(ctx context.Context, hash string, data []byte) e var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`) var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`) -func (v *UnixVolume) Index(ctx context.Context, prefix string, w io.Writer) error { +func (v *unixVolume) Index(ctx context.Context, prefix string, w io.Writer) error { rootdir, err := v.os.Open(v.Root) if err != nil { return err @@ -374,7 +368,7 @@ func (v *UnixVolume) Index(ctx context.Context, prefix string, w io.Writer) erro // BlobTrashLifetime == 0, the block is deleted; otherwise, the block // is renamed as path/{loc}.trash.{deadline}, where deadline = now + // BlobTrashLifetime. -func (v *UnixVolume) BlockTrash(loc string) error { +func (v *unixVolume) BlockTrash(loc string) error { // Touch() must be called before calling Write() on a block. Touch() // also uses lockfile(). This avoids a race condition between Write() // and Trash() because either (a) the file will be trashed and Touch() @@ -417,7 +411,7 @@ func (v *UnixVolume) BlockTrash(loc string) error { // BlockUntrash moves block from trash back into store // Look for path/{loc}.trash.{deadline} in storage, // and rename the first such file as path/{loc} -func (v *UnixVolume) BlockUntrash(hash string) error { +func (v *unixVolume) BlockUntrash(hash string) error { v.os.stats.TickOps("readdir") v.os.stats.Tick(&v.os.stats.ReaddirOps) files, err := ioutil.ReadDir(v.blockDir(hash)) @@ -450,19 +444,19 @@ func (v *UnixVolume) BlockUntrash(hash string) error { // blockDir returns the fully qualified directory name for the directory // where loc is (or would be) stored on this volume. -func (v *UnixVolume) blockDir(loc string) string { +func (v *unixVolume) blockDir(loc string) string { return filepath.Join(v.Root, loc[0:3]) } // blockPath returns the fully qualified pathname for the path to loc // on this volume. -func (v *UnixVolume) blockPath(loc string) string { +func (v *unixVolume) blockPath(loc string) string { return filepath.Join(v.blockDir(loc), loc) } // isFull returns true if the free space on the volume is less than // MinFreeKilobytes. -func (v *UnixVolume) isFull() (isFull bool) { +func (v *unixVolume) isFull() (isFull bool) { fullSymlink := v.Root + "/full" // Check if the volume has been marked as full in the last hour. @@ -492,7 +486,7 @@ func (v *UnixVolume) isFull() (isFull bool) { // FreeDiskSpace returns the number of unused 1k blocks available on // the volume. -func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) { +func (v *unixVolume) FreeDiskSpace() (free uint64, err error) { var fs syscall.Statfs_t err = syscall.Statfs(v.Root, &fs) if err == nil { @@ -504,14 +498,14 @@ func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) { } // InternalStats returns I/O and filesystem ops counters. -func (v *UnixVolume) InternalStats() interface{} { +func (v *unixVolume) InternalStats() interface{} { return &v.os.stats } // lock acquires the serialize lock, if one is in use. If ctx is done // before the lock is acquired, lock returns ctx.Err() instead of // acquiring the lock. -func (v *UnixVolume) lock(ctx context.Context) error { +func (v *unixVolume) lock(ctx context.Context) error { if v.locker == nil { return nil } @@ -535,7 +529,7 @@ func (v *UnixVolume) lock(ctx context.Context) error { } // unlock releases the serialize lock, if one is in use. -func (v *UnixVolume) unlock() { +func (v *unixVolume) unlock() { if v.locker == nil { return } @@ -543,7 +537,7 @@ func (v *UnixVolume) unlock() { } // lockfile and unlockfile use flock(2) to manage kernel file locks. -func (v *UnixVolume) lockfile(f *os.File) error { +func (v *unixVolume) lockfile(f *os.File) error { v.os.stats.TickOps("flock") v.os.stats.Tick(&v.os.stats.FlockOps) err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX) @@ -551,7 +545,7 @@ func (v *UnixVolume) lockfile(f *os.File) error { return err } -func (v *UnixVolume) unlockfile(f *os.File) error { +func (v *unixVolume) unlockfile(f *os.File) error { err := syscall.Flock(int(f.Fd()), syscall.LOCK_UN) v.os.stats.TickErr(err) return err @@ -559,7 +553,7 @@ func (v *UnixVolume) unlockfile(f *os.File) error { // Where appropriate, translate a more specific filesystem error to an // error recognized by handlers, like os.ErrNotExist. -func (v *UnixVolume) translateError(err error) error { +func (v *unixVolume) translateError(err error) error { switch err.(type) { case *os.PathError: // stat() returns a PathError if the parent directory @@ -574,7 +568,7 @@ var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`) // EmptyTrash walks hierarchy looking for {hash}.trash.* // and deletes those with deadline < now. -func (v *UnixVolume) EmptyTrash() { +func (v *unixVolume) EmptyTrash() { var bytesDeleted, bytesInTrash int64 var blocksDeleted, blocksInTrash int64