X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f608029e1aec903bc35a4748ef51e6f076dae0aa..39f6e9f70f683237d9488faac1c549ca19ac9dae:/services/keepstore/unix_volume.go diff --git a/services/keepstore/unix_volume.go b/services/keepstore/unix_volume.go index 46f4db4095..f01ad97553 100644 --- a/services/keepstore/unix_volume.go +++ b/services/keepstore/unix_volume.go @@ -28,20 +28,26 @@ import ( ) func init() { - driver["Directory"] = newDirectoryVolume + driver["Directory"] = newUnixVolume } -func newDirectoryVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) { - v := &UnixVolume{cluster: cluster, volume: volume, logger: logger, metrics: metrics} - err := json.Unmarshal(volume.DriverParameters, &v) +func newUnixVolume(params newVolumeParams) (volume, error) { + v := &unixVolume{ + uuid: params.UUID, + cluster: params.Cluster, + volume: params.ConfigVolume, + logger: params.Logger, + metrics: params.MetricsVecs, + } + err := json.Unmarshal(params.ConfigVolume.DriverParameters, &v) if err != nil { return nil, err } - v.logger = v.logger.WithField("Volume", v.String()) + v.logger = v.logger.WithField("Volume", v.DeviceID()) return v, v.check() } -func (v *UnixVolume) check() error { +func (v *unixVolume) check() error { if v.Root == "" { return errors.New("DriverParameters.Root was not provided") } @@ -53,18 +59,19 @@ func (v *UnixVolume) check() error { } // Set up prometheus metrics - lbls := prometheus.Labels{"device_id": v.GetDeviceID()} + lbls := prometheus.Labels{"device_id": v.DeviceID()} v.os.stats.opsCounters, v.os.stats.errCounters, v.os.stats.ioBytes = v.metrics.getCounterVecsFor(lbls) _, err := v.os.Stat(v.Root) return err } -// A UnixVolume stores and retrieves blocks in a local directory. -type UnixVolume struct { +// A unixVolume stores and retrieves blocks in a local directory. +type unixVolume struct { Root string // path to the volume's root directory Serialize bool + uuid string cluster *arvados.Cluster volume arvados.Volume logger logrus.FieldLogger @@ -77,15 +84,16 @@ type UnixVolume struct { os osWithStats } -// GetDeviceID returns a globally unique ID for the volume's root +// DeviceID returns a globally unique ID for the volume's root // directory, consisting of the filesystem's UUID and the path from // filesystem root to storage directory, joined by "/". For example, // the device ID for a local directory "/mnt/xvda1/keep" might be // "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep". -func (v *UnixVolume) GetDeviceID() string { +func (v *unixVolume) DeviceID() string { giveup := func(f string, args ...interface{}) string { - v.logger.Infof(f+"; using blank DeviceID for volume %s", append(args, v)...) - return "" + v.logger.Infof(f+"; using hostname:path for volume %s", append(args, v.uuid)...) + host, _ := os.Hostname() + return host + ":" + v.Root } buf, err := exec.Command("findmnt", "--noheadings", "--target", v.Root).CombinedOutput() if err != nil { @@ -154,12 +162,9 @@ func (v *UnixVolume) GetDeviceID() string { return giveup("could not find entry in %q matching %q", udir, dev) } -// Touch sets the timestamp for the given locator to the current time -func (v *UnixVolume) Touch(loc string) error { - if v.volume.ReadOnly { - return MethodDisabledError - } - p := v.blockPath(loc) +// BlockTouch sets the timestamp for the given locator to the current time +func (v *unixVolume) BlockTouch(hash string) error { + p := v.blockPath(hash) f, err := v.os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644) if err != nil { return err @@ -182,7 +187,7 @@ func (v *UnixVolume) Touch(loc string) error { } // Mtime returns the stored timestamp for the given locator. -func (v *UnixVolume) Mtime(loc string) (time.Time, error) { +func (v *unixVolume) Mtime(loc string) (time.Time, error) { p := v.blockPath(loc) fi, err := v.os.Stat(p) if err != nil { @@ -193,7 +198,7 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) { // Lock the locker (if one is in use), open the file for reading, and // call the given function if and when the file is ready to read. -func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error { +func (v *unixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error { if err := v.lock(ctx); err != nil { return err } @@ -203,82 +208,55 @@ func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader return err } defer f.Close() - return fn(NewCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes)) + return fn(newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes)) } // stat is os.Stat() with some extra sanity checks. -func (v *UnixVolume) stat(path string) (os.FileInfo, error) { +func (v *unixVolume) stat(path string) (os.FileInfo, error) { stat, err := v.os.Stat(path) if err == nil { if stat.Size() < 0 { err = os.ErrInvalid } else if stat.Size() > BlockSize { - err = TooLongError + err = errTooLarge } } return stat, err } -// Get retrieves a block, copies it to the given slice, and returns -// the number of bytes copied. -func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) { - return getWithPipe(ctx, loc, buf, v) -} - -// ReadBlock implements BlockReader. -func (v *UnixVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) error { - path := v.blockPath(loc) +// BlockRead reads a block from the volume. +func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) { + path := v.blockPath(hash) stat, err := v.stat(path) if err != nil { - return v.translateError(err) + return 0, v.translateError(err) } - return v.getFunc(ctx, path, func(rdr io.Reader) error { - n, err := io.Copy(w, rdr) + var n int64 + err = v.getFunc(ctx, path, func(rdr io.Reader) error { + n, err = io.Copy(w, rdr) if err == nil && n != stat.Size() { err = io.ErrUnexpectedEOF } return err }) + return int(n), err } -// Compare returns nil if Get(loc) would return the same content as -// expect. It is functionally equivalent to Get() followed by -// bytes.Compare(), but uses less memory. -func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) error { - path := v.blockPath(loc) - if _, err := v.stat(path); err != nil { - return v.translateError(err) +// BlockWrite stores a block on the volume. If it already exists, its +// timestamp is updated. +func (v *unixVolume) BlockWrite(ctx context.Context, hash string, data []byte) error { + if v.isFull() { + return errFull } - return v.getFunc(ctx, path, func(rdr io.Reader) error { - return compareReaderWithBuf(ctx, rdr, expect, loc[:32]) - }) -} - -// Put stores a block of data identified by the locator string -// "loc". It returns nil on success. If the volume is full, it -// returns a FullError. If the write fails due to some other error, -// that error is returned. -func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error { - return putWithPipe(ctx, loc, block, v) -} - -// WriteBlock implements BlockWriter. -func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error { - if v.volume.ReadOnly { - return MethodDisabledError - } - if v.IsFull() { - return FullError - } - bdir := v.blockDir(loc) + bdir := v.blockDir(hash) if err := os.MkdirAll(bdir, 0755); err != nil { return fmt.Errorf("error creating directory %s: %s", bdir, err) } - bpath := v.blockPath(loc) - tmpfile, err := v.os.TempFile(bdir, "tmp"+loc) + bpath := v.blockPath(hash) + tmpfile, err := v.os.TempFile(bdir, "tmp"+hash) if err != nil { - return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, loc, err) + return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, hash, err) } defer v.os.Remove(tmpfile.Name()) defer tmpfile.Close() @@ -287,7 +265,7 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) return err } defer v.unlock() - n, err := io.Copy(tmpfile, rdr) + n, err := tmpfile.Write(data) v.os.stats.TickOutBytes(uint64(n)) if err != nil { return fmt.Errorf("error writing %s: %s", bpath, err) @@ -312,53 +290,10 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) return nil } -// Status returns a VolumeStatus struct describing the volume's -// current state, or nil if an error occurs. -// -func (v *UnixVolume) Status() *VolumeStatus { - fi, err := v.os.Stat(v.Root) - if err != nil { - v.logger.WithError(err).Error("stat failed") - return nil - } - devnum := fi.Sys().(*syscall.Stat_t).Dev - - var fs syscall.Statfs_t - if err := syscall.Statfs(v.Root, &fs); err != nil { - v.logger.WithError(err).Error("statfs failed") - return nil - } - // These calculations match the way df calculates disk usage: - // "free" space is measured by fs.Bavail, but "used" space - // uses fs.Blocks - fs.Bfree. - free := fs.Bavail * uint64(fs.Bsize) - used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize) - return &VolumeStatus{ - MountPoint: v.Root, - DeviceNum: devnum, - BytesFree: free, - BytesUsed: used, - } -} - var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`) var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`) -// IndexTo writes (to the given Writer) a list of blocks found on this -// volume which begin with the specified prefix. If the prefix is an -// empty string, IndexTo writes a complete list of blocks. -// -// Each block is given in the format -// -// locator+size modification-time {newline} -// -// e.g.: -// -// e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303 -// e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043 -// e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136 -// -func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { +func (v *unixVolume) Index(ctx context.Context, prefix string, w io.Writer) error { rootdir, err := v.os.Open(v.Root) if err != nil { return err @@ -371,6 +306,9 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { return err } for _, subdir := range subdirs { + if ctx.Err() != nil { + return ctx.Err() + } if !strings.HasPrefix(subdir, prefix) && !strings.HasPrefix(prefix, subdir) { // prefix excludes all blocks stored in this dir continue @@ -379,24 +317,31 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { continue } blockdirpath := filepath.Join(v.Root, subdir) - blockdir, err := v.os.Open(blockdirpath) - if err != nil { - v.logger.WithError(err).Errorf("error reading %q", blockdirpath) - return fmt.Errorf("error reading %q: %s", blockdirpath, err) - } - v.os.stats.TickOps("readdir") - v.os.stats.Tick(&v.os.stats.ReaddirOps) - // ReadDir() (compared to Readdir(), which returns - // FileInfo structs) helps complete the sequence of - // readdirent calls as quickly as possible, reducing - // the likelihood of NFS EBADCOOKIE (523) errors. - dirents, err := blockdir.ReadDir(-1) - blockdir.Close() - if err != nil { - v.logger.WithError(err).Errorf("error reading %q", blockdirpath) - return fmt.Errorf("error reading %q: %s", blockdirpath, err) + + var dirents []os.DirEntry + for attempt := 0; ; attempt++ { + v.os.stats.TickOps("readdir") + v.os.stats.Tick(&v.os.stats.ReaddirOps) + dirents, err = os.ReadDir(blockdirpath) + if ctx.Err() != nil { + return ctx.Err() + } else if err == nil { + break + } else if attempt < 5 && strings.Contains(err.Error(), "errno 523") { + // EBADCOOKIE (NFS stopped accepting + // our readdirent cookie) -- retry a + // few times before giving up + v.logger.WithError(err).Printf("retry after error reading %s", blockdirpath) + continue + } else { + return err + } } + for _, dirent := range dirents { + if ctx.Err() != nil { + return ctx.Err() + } fileInfo, err := dirent.Info() if os.IsNotExist(err) { // File disappeared between ReadDir() and now @@ -425,11 +370,11 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { return nil } -// Trash trashes the block data from the unix storage -// If BlobTrashLifetime == 0, the block is deleted -// Else, the block is renamed as path/{loc}.trash.{deadline}, -// where deadline = now + BlobTrashLifetime -func (v *UnixVolume) Trash(loc string) error { +// BlockTrash trashes the block data from the unix storage. If +// BlobTrashLifetime == 0, the block is deleted; otherwise, the block +// is renamed as path/{loc}.trash.{deadline}, where deadline = now + +// BlobTrashLifetime. +func (v *unixVolume) BlockTrash(loc string) error { // Touch() must be called before calling Write() on a block. Touch() // also uses lockfile(). This avoids a race condition between Write() // and Trash() because either (a) the file will be trashed and Touch() @@ -437,10 +382,6 @@ func (v *UnixVolume) Trash(loc string) error { // be re-written), or (b) Touch() will update the file's timestamp and // Trash() will read the correct up-to-date timestamp and choose not to // trash the file. - - if v.volume.ReadOnly || !v.cluster.Collections.BlobTrash { - return MethodDisabledError - } if err := v.lock(context.TODO()); err != nil { return err } @@ -473,17 +414,13 @@ func (v *UnixVolume) Trash(loc string) error { return v.os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Unix())) } -// Untrash moves block from trash back into store +// BlockUntrash moves block from trash back into store // Look for path/{loc}.trash.{deadline} in storage, // and rename the first such file as path/{loc} -func (v *UnixVolume) Untrash(loc string) (err error) { - if v.volume.ReadOnly { - return MethodDisabledError - } - +func (v *unixVolume) BlockUntrash(hash string) error { v.os.stats.TickOps("readdir") v.os.stats.Tick(&v.os.stats.ReaddirOps) - files, err := ioutil.ReadDir(v.blockDir(loc)) + files, err := ioutil.ReadDir(v.blockDir(hash)) if err != nil { return err } @@ -493,11 +430,11 @@ func (v *UnixVolume) Untrash(loc string) (err error) { } foundTrash := false - prefix := fmt.Sprintf("%v.trash.", loc) + prefix := fmt.Sprintf("%v.trash.", hash) for _, f := range files { if strings.HasPrefix(f.Name(), prefix) { foundTrash = true - err = v.os.Rename(v.blockPath(f.Name()), v.blockPath(loc)) + err = v.os.Rename(v.blockPath(f.Name()), v.blockPath(hash)) if err == nil { break } @@ -508,25 +445,24 @@ func (v *UnixVolume) Untrash(loc string) (err error) { return os.ErrNotExist } - return + return nil } // blockDir returns the fully qualified directory name for the directory // where loc is (or would be) stored on this volume. -func (v *UnixVolume) blockDir(loc string) string { +func (v *unixVolume) blockDir(loc string) string { return filepath.Join(v.Root, loc[0:3]) } // blockPath returns the fully qualified pathname for the path to loc // on this volume. -func (v *UnixVolume) blockPath(loc string) string { +func (v *unixVolume) blockPath(loc string) string { return filepath.Join(v.blockDir(loc), loc) } -// IsFull returns true if the free space on the volume is less than +// isFull returns true if the free space on the volume is less than // MinFreeKilobytes. -// -func (v *UnixVolume) IsFull() (isFull bool) { +func (v *unixVolume) isFull() (isFull bool) { fullSymlink := v.Root + "/full" // Check if the volume has been marked as full in the last hour. @@ -540,9 +476,9 @@ func (v *UnixVolume) IsFull() (isFull bool) { } if avail, err := v.FreeDiskSpace(); err == nil { - isFull = avail < MinFreeKilobytes + isFull = avail < BlockSize } else { - v.logger.WithError(err).Errorf("%s: FreeDiskSpace failed", v) + v.logger.WithError(err).Errorf("%s: FreeDiskSpace failed", v.DeviceID()) isFull = false } @@ -556,31 +492,26 @@ func (v *UnixVolume) IsFull() (isFull bool) { // FreeDiskSpace returns the number of unused 1k blocks available on // the volume. -// -func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) { +func (v *unixVolume) FreeDiskSpace() (free uint64, err error) { var fs syscall.Statfs_t err = syscall.Statfs(v.Root, &fs) if err == nil { // Statfs output is not guaranteed to measure free // space in terms of 1K blocks. - free = fs.Bavail * uint64(fs.Bsize) / 1024 + free = fs.Bavail * uint64(fs.Bsize) } return } -func (v *UnixVolume) String() string { - return fmt.Sprintf("[UnixVolume %s]", v.Root) -} - // InternalStats returns I/O and filesystem ops counters. -func (v *UnixVolume) InternalStats() interface{} { +func (v *unixVolume) InternalStats() interface{} { return &v.os.stats } // lock acquires the serialize lock, if one is in use. If ctx is done // before the lock is acquired, lock returns ctx.Err() instead of // acquiring the lock. -func (v *UnixVolume) lock(ctx context.Context) error { +func (v *unixVolume) lock(ctx context.Context) error { if v.locker == nil { return nil } @@ -604,7 +535,7 @@ func (v *UnixVolume) lock(ctx context.Context) error { } // unlock releases the serialize lock, if one is in use. -func (v *UnixVolume) unlock() { +func (v *unixVolume) unlock() { if v.locker == nil { return } @@ -612,7 +543,7 @@ func (v *UnixVolume) unlock() { } // lockfile and unlockfile use flock(2) to manage kernel file locks. -func (v *UnixVolume) lockfile(f *os.File) error { +func (v *unixVolume) lockfile(f *os.File) error { v.os.stats.TickOps("flock") v.os.stats.Tick(&v.os.stats.FlockOps) err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX) @@ -620,7 +551,7 @@ func (v *UnixVolume) lockfile(f *os.File) error { return err } -func (v *UnixVolume) unlockfile(f *os.File) error { +func (v *unixVolume) unlockfile(f *os.File) error { err := syscall.Flock(int(f.Fd()), syscall.LOCK_UN) v.os.stats.TickErr(err) return err @@ -628,7 +559,7 @@ func (v *UnixVolume) unlockfile(f *os.File) error { // Where appropriate, translate a more specific filesystem error to an // error recognized by handlers, like os.ErrNotExist. -func (v *UnixVolume) translateError(err error) error { +func (v *unixVolume) translateError(err error) error { switch err.(type) { case *os.PathError: // stat() returns a PathError if the parent directory @@ -643,11 +574,7 @@ var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`) // EmptyTrash walks hierarchy looking for {hash}.trash.* // and deletes those with deadline < now. -func (v *UnixVolume) EmptyTrash() { - if v.cluster.Collections.BlobDeleteConcurrency < 1 { - return - } - +func (v *unixVolume) EmptyTrash() { var bytesDeleted, bytesInTrash int64 var blocksDeleted, blocksInTrash int64