13383: Implement EmptyTrashWorkers in azure and filesystem backends.
[arvados.git] / services / keepstore / volume_unix.go
index 645862deae71a876dccae4e7d5e3afc0d20fd180..23d675359244942097072d88e1bd98daf9d46c6c 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
@@ -14,10 +18,9 @@ import (
        "strconv"
        "strings"
        "sync"
+       "sync/atomic"
        "syscall"
        "time"
-
-       log "github.com/Sirupsen/logrus"
 )
 
 type unixVolumeAdder struct {
@@ -108,6 +111,7 @@ type UnixVolume struct {
        ReadOnly             bool
        Serialize            bool
        DirectoryReplication int
+       StorageClasses       []string
 
        // something to lock during IO, typically a sync.Mutex (or nil
        // to skip locking)
@@ -479,6 +483,11 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                                "+", fileInfo[0].Size(),
                                " ", fileInfo[0].ModTime().UnixNano(),
                                "\n")
+                       if err != nil {
+                               log.Print("Error writing : ", err)
+                               lastErr = err
+                               break
+                       }
                }
                blockdir.Close()
        }
@@ -642,6 +651,11 @@ func (v *UnixVolume) Replication() int {
        return v.DirectoryReplication
 }
 
+// GetStorageClasses implements Volume
+func (v *UnixVolume) GetStorageClasses() []string {
+       return v.StorageClasses
+}
+
 // InternalStats returns I/O and filesystem ops counters.
 func (v *UnixVolume) InternalStats() interface{} {
        return &v.os.stats
@@ -712,39 +726,61 @@ var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
 // and deletes those with deadline < now.
 func (v *UnixVolume) EmptyTrash() {
        var bytesDeleted, bytesInTrash int64
-       var blocksDeleted, blocksInTrash int
+       var blocksDeleted, blocksInTrash int64
 
-       err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
-               if err != nil {
-                       log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
-                       return nil
-               }
+       doFile := func(path string, info os.FileInfo) {
                if info.Mode().IsDir() {
-                       return nil
+                       return
                }
                matches := unixTrashLocRegexp.FindStringSubmatch(path)
                if len(matches) != 3 {
-                       return nil
+                       return
                }
                deadline, err := strconv.ParseInt(matches[2], 10, 64)
                if err != nil {
                        log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
-                       return nil
+                       return
                }
-               bytesInTrash += info.Size()
-               blocksInTrash++
+               atomic.AddInt64(&bytesInTrash, info.Size())
+               atomic.AddInt64(&blocksInTrash, 1)
                if deadline > time.Now().Unix() {
-                       return nil
+                       return
                }
                err = v.os.Remove(path)
                if err != nil {
                        log.Printf("EmptyTrash: Remove %v: %v", path, err)
+                       return
+               }
+               atomic.AddInt64(&bytesDeleted, info.Size())
+               atomic.AddInt64(&blocksDeleted, 1)
+       }
+
+       type dirent struct {
+               path string
+               info os.FileInfo
+       }
+       var wg sync.WaitGroup
+       todo := make(chan dirent, theConfig.EmptyTrashWorkers)
+       for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for e := range todo {
+                               doFile(e.path, e.info)
+                       }
+               }()
+       }
+
+       err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
+               if err != nil {
+                       log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
                        return nil
                }
-               bytesDeleted += info.Size()
-               blocksDeleted++
+               todo <- dirent{path, info}
                return nil
        })
+       close(todo)
+       wg.Wait()
 
        if err != nil {
                log.Printf("EmptyTrash error for %v: %v", v.String(), err)