X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0f644e242ef37c911ad3dc25aca8135c339de349..dcb4760843cc0ed4647e8eaa43abb5d2f049cd0c:/services/keepstore/volume_unix.go diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go index da9b110c56..23d6753592 100644 --- a/services/keepstore/volume_unix.go +++ b/services/keepstore/volume_unix.go @@ -18,10 +18,9 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "syscall" "time" - - log "github.com/Sirupsen/logrus" ) type unixVolumeAdder struct { @@ -112,6 +111,7 @@ type UnixVolume struct { ReadOnly bool Serialize bool DirectoryReplication int + StorageClasses []string // something to lock during IO, typically a sync.Mutex (or nil // to skip locking) @@ -483,6 +483,11 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { "+", fileInfo[0].Size(), " ", fileInfo[0].ModTime().UnixNano(), "\n") + if err != nil { + log.Print("Error writing : ", err) + lastErr = err + break + } } blockdir.Close() } @@ -646,6 +651,11 @@ func (v *UnixVolume) Replication() int { return v.DirectoryReplication } +// GetStorageClasses implements Volume +func (v *UnixVolume) GetStorageClasses() []string { + return v.StorageClasses +} + // InternalStats returns I/O and filesystem ops counters. func (v *UnixVolume) InternalStats() interface{} { return &v.os.stats @@ -716,39 +726,61 @@ var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`) // and deletes those with deadline < now. func (v *UnixVolume) EmptyTrash() { var bytesDeleted, bytesInTrash int64 - var blocksDeleted, blocksInTrash int + var blocksDeleted, blocksInTrash int64 - err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error { - if err != nil { - log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err) - return nil - } + doFile := func(path string, info os.FileInfo) { if info.Mode().IsDir() { - return nil + return } matches := unixTrashLocRegexp.FindStringSubmatch(path) if len(matches) != 3 { - return nil + return } deadline, err := strconv.ParseInt(matches[2], 10, 64) if err != nil { log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err) - return nil + return } - bytesInTrash += info.Size() - blocksInTrash++ + atomic.AddInt64(&bytesInTrash, info.Size()) + atomic.AddInt64(&blocksInTrash, 1) if deadline > time.Now().Unix() { - return nil + return } err = v.os.Remove(path) if err != nil { log.Printf("EmptyTrash: Remove %v: %v", path, err) + return + } + atomic.AddInt64(&bytesDeleted, info.Size()) + atomic.AddInt64(&blocksDeleted, 1) + } + + type dirent struct { + path string + info os.FileInfo + } + var wg sync.WaitGroup + todo := make(chan dirent, theConfig.EmptyTrashWorkers) + for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for e := range todo { + doFile(e.path, e.info) + } + }() + } + + err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error { + if err != nil { + log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err) return nil } - bytesDeleted += info.Size() - blocksDeleted++ + todo <- dirent{path, info} return nil }) + close(todo) + wg.Wait() if err != nil { log.Printf("EmptyTrash error for %v: %v", v.String(), err)