Merge branch '21640-max-nofile'
[arvados.git] / tools / keep-exercise / keep-exercise.go
index 7641465aa329056db3a559db3f032181246e4a32..6d06a18322e53e77509e3cfe7629143f39831170 100644 (file)
 // fill your storage volumes with random data if you leave it running,
 // which can cost you money or leave you with too little room for
 // useful data.
 // fill your storage volumes with random data if you leave it running,
 // which can cost you money or leave you with too little room for
 // useful data.
-//
 package main
 
 import (
 package main
 
 import (
+       "bufio"
+       "context"
        "crypto/rand"
        "encoding/binary"
        "flag"
        "crypto/rand"
        "encoding/binary"
        "flag"
@@ -26,14 +27,18 @@ import (
        "io"
        "io/ioutil"
        "log"
        "io"
        "io/ioutil"
        "log"
+       mathRand "math/rand"
        "net/http"
        "os"
        "os/signal"
        "net/http"
        "os"
        "os/signal"
-       "sync"
+       "strings"
        "sync/atomic"
        "syscall"
        "time"
 
        "sync/atomic"
        "syscall"
        "time"
 
+       "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
 )
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
 )
@@ -54,37 +59,17 @@ var (
        getVersion    = flag.Bool("version", false, "Print version information and exit.")
        RunTime       = flag.Duration("run-time", 0, "time to run (e.g. 60s), or 0 to run indefinitely (default)")
        Repeat        = flag.Int("repeat", 1, "number of times to repeat the experiment (default 1)")
        getVersion    = flag.Bool("version", false, "Print version information and exit.")
        RunTime       = flag.Duration("run-time", 0, "time to run (e.g. 60s), or 0 to run indefinitely (default)")
        Repeat        = flag.Int("repeat", 1, "number of times to repeat the experiment (default 1)")
+       UseIndex      = flag.Bool("use-index", false, "use the GetIndex call to get a list of blocks to read. Requires the SystemRoot token. Use this to rule out caching effects when reading.")
 )
 
 )
 
-var summary string
-var csvHeader string
-
-func main() {
-       flag.Parse()
-
-       // Print version information if requested
-       if *getVersion {
-               fmt.Printf("keep-exercise %s\n", version)
-               os.Exit(0)
-       }
-
-       stderr := log.New(os.Stderr, "", log.LstdFlags)
-
-       if *ReadThreads > 0 && *WriteThreads == 0 {
-               stderr.Fatal("At least one write thread is required if rthreads is non-zero")
-       }
-
-       if *ReadThreads == 0 && *WriteThreads == 0 {
-               stderr.Fatal("Nothing to do!")
-       }
-
+func createKeepClient(lgr *log.Logger) (kc *keepclient.KeepClient) {
        arv, err := arvadosclient.MakeArvadosClient()
        if err != nil {
        arv, err := arvadosclient.MakeArvadosClient()
        if err != nil {
-               stderr.Fatal(err)
+               lgr.Fatal(err)
        }
        }
-       kc, err := keepclient.MakeKeepClient(arv)
+       kc, err = keepclient.MakeKeepClient(arv)
        if err != nil {
        if err != nil {
-               stderr.Fatal(err)
+               lgr.Fatal(err)
        }
        kc.Want_replicas = *Replicas
 
        }
        kc.Want_replicas = *Replicas
 
@@ -99,92 +84,165 @@ func main() {
                        TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure),
                },
        }
                        TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure),
                },
        }
+       overrideServices(kc, lgr)
+       return kc
+}
+
+func main() {
+       if ok, code := cmd.ParseFlags(flag.CommandLine, os.Args[0], os.Args[1:], "", os.Stderr); !ok {
+               os.Exit(code)
+       } else if *getVersion {
+               fmt.Printf("%s %s\n", os.Args[0], version)
+               return
+       }
+
+       lgr := log.New(os.Stderr, "", log.LstdFlags)
+
+       if *ReadThreads > 0 && *WriteThreads == 0 && !*UseIndex {
+               lgr.Fatal("At least one write thread is required if rthreads is non-zero and -use-index is not enabled")
+       }
+
+       if *ReadThreads == 0 && *WriteThreads == 0 {
+               lgr.Fatal("Nothing to do!")
+       }
+
+       kc := createKeepClient(lgr)
+
+       // When UseIndex is set, we need a KeepClient with SystemRoot powers to get
+       // the block index from the Keepstore. We use the SystemRootToken from
+       // the Arvados config.yml for that.
+       var cluster *arvados.Cluster
+       if *ReadThreads > 0 && *UseIndex {
+               cluster = loadConfig(lgr)
+               kc.Arvados.ApiToken = cluster.SystemRootToken
+       }
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+       sigChan := make(chan os.Signal, 1)
+       signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
+       go func() {
+               <-sigChan
+               // FIXME
+               //fmt.Print("\r") // Suppress the ^C print
+               cancel()
+       }()
 
 
-       overrideServices(kc, stderr)
-       csvHeader = "Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime,Repeat"
+       csvHeader := "Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,UseIndex,RunTime,Repeat"
+       var summary string
 
 
-       for i := 0; i < *Repeat; i++ {
-               runExperiment(kc, stderr)
-               stderr.Printf("*************************** experiment %d complete ******************************\n", i)
+       var nextBufs []chan []byte
+       for i := 0; i < *WriteThreads; i++ {
+               nextBuf := make(chan []byte, 1)
+               nextBufs = append(nextBufs, nextBuf)
+               go makeBufs(nextBuf, i, lgr)
+       }
+
+       for i := 0; i < *Repeat && ctx.Err() == nil; i++ {
+               summary = runExperiment(ctx, cluster, kc, nextBufs, summary, csvHeader, lgr)
+               lgr.Printf("*************************** experiment %d complete ******************************\n", i)
                summary += fmt.Sprintf(",%d\n", i)
        }
                summary += fmt.Sprintf(",%d\n", i)
        }
-       stderr.Println("Summary:")
-       stderr.Println()
+
+       lgr.Println("Summary:")
+       lgr.Println()
+       fmt.Println()
        fmt.Println(csvHeader + ",Experiment")
        fmt.Println(summary)
 }
 
        fmt.Println(csvHeader + ",Experiment")
        fmt.Println(summary)
 }
 
-func runExperiment(kc *keepclient.KeepClient, stderr *log.Logger) {
-       var wg sync.WaitGroup
+func runExperiment(ctx context.Context, cluster *arvados.Cluster, kc *keepclient.KeepClient, nextBufs []chan []byte, summary string, csvHeader string, lgr *log.Logger) (newSummary string) {
+       // Send 1234 to bytesInChan when we receive 1234 bytes from keepstore.
+       var bytesInChan = make(chan uint64)
+       var bytesOutChan = make(chan uint64)
+       // Send struct{}{} to errorsChan when an error happens.
+       var errorsChan = make(chan struct{})
+
        var nextLocator atomic.Value
        var nextLocator atomic.Value
+       // when UseIndex is set, this channel is used instead of nextLocator
+       var indexLocatorChan = make(chan string, 2)
+
+       newSummary = summary
 
 
-       wg.Add(1)
-       stopCh := make(chan struct{})
+       // Start warmup
+       ready := make(chan struct{})
+       var warmup bool
        if *ReadThreads > 0 {
        if *ReadThreads > 0 {
-               stderr.Printf("Start warmup phase, waiting for 1 available block before reading starts\n")
-       }
-       for i := 0; i < *WriteThreads; i++ {
-               nextBuf := make(chan []byte, 1)
-               wg.Add(1)
-               go makeBufs(&wg, nextBuf, i, stopCh, stderr)
-               wg.Add(1)
-               go doWrites(&wg, kc, nextBuf, &nextLocator, stopCh, stderr)
+               warmup = true
+               if !*UseIndex {
+                       lgr.Printf("Start warmup phase, waiting for 1 available block before reading starts\n")
+               } else {
+                       lgr.Printf("Start warmup phase, waiting for block index before reading starts\n")
+               }
        }
        }
-       if *ReadThreads > 0 {
-               for nextLocator.Load() == nil {
-                       select {
-                       case _ = <-bytesOutChan:
+       if warmup && !*UseIndex {
+               go func() {
+                       locator, _, err := kc.PutB(<-nextBufs[0])
+                       if err != nil {
+                               lgr.Print(err)
+                               errorsChan <- struct{}{}
                        }
                        }
+                       nextLocator.Store(locator)
+                       lgr.Println("Warmup complete!")
+                       close(ready)
+               }()
+       } else if warmup && *UseIndex {
+               // Get list of blocks to read
+               go getIndexLocators(ctx, cluster, kc, indexLocatorChan, lgr)
+               select {
+               case <-ctx.Done():
+                       return
+               case <-indexLocatorChan:
+                       lgr.Println("Warmup complete!")
+                       close(ready)
                }
                }
-               stderr.Printf("Warmup complete")
+       } else {
+               close(ready)
        }
        }
-       go countBeans(&wg, stopCh, stderr)
-       for i := 0; i < *ReadThreads; i++ {
-               wg.Add(1)
-               go doReads(&wg, kc, &nextLocator, stopCh, stderr)
+       select {
+       case <-ctx.Done():
+               return
+       case <-ready:
        }
        }
-       wg.Wait()
-}
 
 
-// Send 1234 to bytesInChan when we receive 1234 bytes from keepstore.
-var bytesInChan = make(chan uint64)
-var bytesOutChan = make(chan uint64)
+       // Warmup complete
+       ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*RunTime))
+       defer cancel()
 
 
-// Send struct{}{} to errorsChan when an error happens.
-var errorsChan = make(chan struct{})
+       for i := 0; i < *WriteThreads; i++ {
+               go doWrites(ctx, kc, nextBufs[i], &nextLocator, bytesOutChan, errorsChan, lgr)
+       }
+       if *UseIndex {
+               for i := 0; i < *ReadThreads; i++ {
+                       go doReads(ctx, kc, nil, indexLocatorChan, bytesInChan, errorsChan, lgr)
+               }
+       } else {
+               for i := 0; i < *ReadThreads; i++ {
+                       go doReads(ctx, kc, &nextLocator, nil, bytesInChan, errorsChan, lgr)
+               }
+       }
 
 
-func countBeans(wg *sync.WaitGroup, stopCh chan struct{}, stderr *log.Logger) {
-       defer wg.Done()
        t0 := time.Now()
        var tickChan <-chan time.Time
        t0 := time.Now()
        var tickChan <-chan time.Time
-       var endChan <-chan time.Time
-       c := make(chan os.Signal, 1)
-       signal.Notify(c, os.Interrupt, syscall.SIGTERM)
        if *StatsInterval > 0 {
                tickChan = time.NewTicker(*StatsInterval).C
        }
        if *StatsInterval > 0 {
                tickChan = time.NewTicker(*StatsInterval).C
        }
-       if *RunTime > 0 {
-               endChan = time.NewTicker(*RunTime).C
-       }
        var bytesIn uint64
        var bytesOut uint64
        var errors uint64
        var rateIn, rateOut float64
        var maxRateIn, maxRateOut float64
        var bytesIn uint64
        var bytesOut uint64
        var errors uint64
        var rateIn, rateOut float64
        var maxRateIn, maxRateOut float64
-       var exit, abort, printCsv bool
+       var exit, printCsv bool
        csv := log.New(os.Stdout, "", 0)
        csv := log.New(os.Stdout, "", 0)
+       csv.Println()
        csv.Println(csvHeader)
        for {
                select {
        csv.Println(csvHeader)
        for {
                select {
-               case <-tickChan:
-                       printCsv = true
-               case <-endChan:
+               case <-ctx.Done():
                        printCsv = true
                        exit = true
                        printCsv = true
                        exit = true
-               case <-c:
+               case <-tickChan:
                        printCsv = true
                        printCsv = true
-                       abort = true
-                       fmt.Print("\r") // Suppress the ^C print
                case i := <-bytesInChan:
                        bytesIn += i
                case o := <-bytesOutChan:
                case i := <-bytesInChan:
                        bytesIn += i
                case o := <-bytesOutChan:
@@ -202,8 +260,8 @@ func countBeans(wg *sync.WaitGroup, stopCh chan struct{}, stderr *log.Logger) {
                        if rateOut > maxRateOut {
                                maxRateOut = rateOut
                        }
                        if rateOut > maxRateOut {
                                maxRateOut = rateOut
                        }
-                       line := fmt.Sprintf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s,%d",
-                               time.Now().Format("2006-01-02 15:04:05"),
+                       line := fmt.Sprintf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%t,%s,%d",
+                               time.Now().Format("2006/01/02 15:04:05"),
                                elapsed,
                                bytesIn, rateIn, maxRateIn,
                                bytesOut, rateOut, maxRateOut,
                                elapsed,
                                bytesIn, rateIn, maxRateIn,
                                bytesOut, rateOut, maxRateOut,
@@ -217,27 +275,21 @@ func countBeans(wg *sync.WaitGroup, stopCh chan struct{}, stderr *log.Logger) {
                                *StatsInterval,
                                *ServiceURL,
                                *ServiceUUID,
                                *StatsInterval,
                                *ServiceURL,
                                *ServiceUUID,
+                               *UseIndex,
                                *RunTime,
                                *Repeat,
                        )
                        csv.Println(line)
                        if exit {
                                *RunTime,
                                *Repeat,
                        )
                        csv.Println(line)
                        if exit {
-                               summary += line
+                               newSummary += line
+                               return
                        }
                        printCsv = false
                }
                        }
                        printCsv = false
                }
-               if abort {
-                       os.Exit(0)
-               }
-               if exit {
-                       close(stopCh)
-                       break
-               }
        }
 }
 
        }
 }
 
-func makeBufs(wg *sync.WaitGroup, nextBuf chan<- []byte, threadID int, stopCh <-chan struct{}, stderr *log.Logger) {
-       defer wg.Done()
+func makeBufs(nextBuf chan<- []byte, threadID int, lgr *log.Logger) {
        buf := make([]byte, *BlockSize)
        if *VaryThread {
                binary.PutVarint(buf, int64(threadID))
        buf := make([]byte, *BlockSize)
        if *VaryThread {
                binary.PutVarint(buf, int64(threadID))
@@ -250,74 +302,120 @@ func makeBufs(wg *sync.WaitGroup, nextBuf chan<- []byte, threadID int, stopCh <-
                if *VaryRequest {
                        rnd := make([]byte, randSize)
                        if _, err := io.ReadFull(rand.Reader, rnd); err != nil {
                if *VaryRequest {
                        rnd := make([]byte, randSize)
                        if _, err := io.ReadFull(rand.Reader, rnd); err != nil {
-                               stderr.Fatal(err)
+                               lgr.Fatal(err)
                        }
                        buf = append(rnd, buf[randSize:]...)
                }
                        }
                        buf = append(rnd, buf[randSize:]...)
                }
-               select {
-               case <-stopCh:
-                       close(nextBuf)
-                       return
-               case nextBuf <- buf:
+               nextBuf <- buf
+       }
+}
+
+func doWrites(ctx context.Context, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator *atomic.Value, bytesOutChan chan<- uint64, errorsChan chan<- struct{}, lgr *log.Logger) {
+       for ctx.Err() == nil {
+               //lgr.Printf("%s nextbuf %s, waiting for nextBuf\n",nextBuf,time.Now())
+               buf := <-nextBuf
+               //lgr.Printf("%s nextbuf %s, done waiting for nextBuf\n",nextBuf,time.Now())
+               locator, _, err := kc.PutB(buf)
+               if err != nil {
+                       lgr.Print(err)
+                       errorsChan <- struct{}{}
+                       continue
                }
                }
+               bytesOutChan <- uint64(len(buf))
+               nextLocator.Store(locator)
        }
 }
 
        }
 }
 
-func doWrites(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator *atomic.Value, stopCh <-chan struct{}, stderr *log.Logger) {
-       defer wg.Done()
+func getIndexLocators(ctx context.Context, cluster *arvados.Cluster, kc *keepclient.KeepClient, indexLocatorChan chan<- string, lgr *log.Logger) {
+       if ctx.Err() != nil {
+               return
+       }
+       locatorsMap := make(map[string]bool)
+       var locators []string
+       var count int64
+       for uuid := range kc.LocalRoots() {
+               reader, err := kc.GetIndex(uuid, "")
+               if err != nil {
+                       lgr.Fatalf("Error getting index: %s\n", err)
+               }
+               scanner := bufio.NewScanner(reader)
+               for scanner.Scan() {
+                       locatorsMap[strings.Split(scanner.Text(), " ")[0]] = true
+                       count++
+               }
+       }
+       for l := range locatorsMap {
+               locators = append(locators, l)
+       }
+       lgr.Printf("Found %d locators\n", count)
+       lgr.Printf("Found %d locators (deduplicated)\n", len(locators))
+       if len(locators) < 1 {
+               lgr.Fatal("Error: no locators found. The keepstores do not seem to contain any data. Remove the -use-index cli argument.")
+       }
+
+       mathRand.Seed(time.Now().UnixNano())
+       mathRand.Shuffle(len(locators), func(i, j int) { locators[i], locators[j] = locators[j], locators[i] })
 
 
-       for {
+       for _, locator := range locators {
+               // We need the Collections.BlobSigningKey to sign our block requests. This requires access to /etc/arvados/config.yml
+               signedLocator := arvados.SignLocator(locator, kc.Arvados.ApiToken, time.Now().Local().Add(1*time.Hour), cluster.Collections.BlobSigningTTL.Duration(), []byte(cluster.Collections.BlobSigningKey))
                select {
                select {
-               case <-stopCh:
+               case <-ctx.Done():
                        return
                        return
-               case buf := <-nextBuf:
-                       locator, _, err := kc.PutB(buf)
-                       if err != nil {
-                               stderr.Print(err)
-                               errorsChan <- struct{}{}
-                               continue
-                       }
-                       select {
-                       case <-stopCh:
-                               return
-                       case bytesOutChan <- uint64(len(buf)):
-                       }
-                       nextLocator.Store(locator)
+               case indexLocatorChan <- signedLocator:
                }
        }
                }
        }
+       lgr.Fatal("Error: ran out of locators to read!")
 }
 
 }
 
-func doReads(wg *sync.WaitGroup, kc *keepclient.KeepClient, nextLocator *atomic.Value, stopCh <-chan struct{}, stderr *log.Logger) {
-       defer wg.Done()
+func loadConfig(lgr *log.Logger) (cluster *arvados.Cluster) {
+       loader := config.NewLoader(os.Stdin, nil)
+       loader.SkipLegacy = true
 
 
-       var locator string
-       for {
-               locator = nextLocator.Load().(string)
+       cfg, err := loader.Load()
+       if err != nil {
+               lgr.Fatal(err)
+       }
+       cluster, err = cfg.GetCluster("")
+       if err != nil {
+               lgr.Fatal(err)
+       }
+       return
+}
+
+func doReads(ctx context.Context, kc *keepclient.KeepClient, nextLocator *atomic.Value, indexLocatorChan <-chan string, bytesInChan chan<- uint64, errorsChan chan<- struct{}, lgr *log.Logger) {
+       for ctx.Err() == nil {
+               var locator string
+               if indexLocatorChan != nil {
+                       select {
+                       case <-ctx.Done():
+                               return
+                       case locator = <-indexLocatorChan:
+                       }
+               } else {
+                       locator = nextLocator.Load().(string)
+               }
                rdr, size, url, err := kc.Get(locator)
                if err != nil {
                rdr, size, url, err := kc.Get(locator)
                if err != nil {
-                       stderr.Print(err)
+                       lgr.Print(err)
                        errorsChan <- struct{}{}
                        continue
                }
                n, err := io.Copy(ioutil.Discard, rdr)
                rdr.Close()
                if n != size || err != nil {
                        errorsChan <- struct{}{}
                        continue
                }
                n, err := io.Copy(ioutil.Discard, rdr)
                rdr.Close()
                if n != size || err != nil {
-                       stderr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err)
+                       lgr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err)
                        errorsChan <- struct{}{}
                        continue
                        // Note we don't count the bytes received in
                        // partial/corrupt responses: we are measuring
                        // throughput, not resource consumption.
                }
                        errorsChan <- struct{}{}
                        continue
                        // Note we don't count the bytes received in
                        // partial/corrupt responses: we are measuring
                        // throughput, not resource consumption.
                }
-               select {
-               case <-stopCh:
-                       return
-               case bytesInChan <- uint64(n):
-               }
+               bytesInChan <- uint64(n)
        }
 }
 
        }
 }
 
-func overrideServices(kc *keepclient.KeepClient, stderr *log.Logger) {
+func overrideServices(kc *keepclient.KeepClient, lgr *log.Logger) {
        roots := make(map[string]string)
        if *ServiceURL != "" {
                roots["zzzzz-bi6l4-000000000000000"] = *ServiceURL
        roots := make(map[string]string)
        if *ServiceURL != "" {
                roots["zzzzz-bi6l4-000000000000000"] = *ServiceURL
@@ -329,7 +427,7 @@ func overrideServices(kc *keepclient.KeepClient, stderr *log.Logger) {
                        }
                }
                if len(roots) == 0 {
                        }
                }
                if len(roots) == 0 {
-                       stderr.Fatalf("Service %q was not in list advertised by API %+q", *ServiceUUID, kc.GatewayRoots())
+                       lgr.Fatalf("Service %q was not in list advertised by API %+q", *ServiceUUID, kc.GatewayRoots())
                }
        } else {
                return
                }
        } else {
                return