X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d7b9e50e1ac6c3432f9e5c87bffd872b5f6fd572..d412382c1c8ad0303bed7584094a983c82fe3557:/tools/keep-exercise/keep-exercise.go diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go index 19d46efbd8..1acd8d8b98 100644 --- a/tools/keep-exercise/keep-exercise.go +++ b/tools/keep-exercise/keep-exercise.go @@ -19,6 +19,7 @@ package main import ( + "bufio" "context" "crypto/rand" "encoding/binary" @@ -27,13 +28,18 @@ import ( "io" "io/ioutil" "log" + mathRand "math/rand" "net/http" "os" "os/signal" + "strings" "sync/atomic" "syscall" "time" + "git.arvados.org/arvados.git/lib/cmd" + "git.arvados.org/arvados.git/lib/config" + "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/keepclient" ) @@ -54,41 +60,17 @@ var ( getVersion = flag.Bool("version", false, "Print version information and exit.") RunTime = flag.Duration("run-time", 0, "time to run (e.g. 60s), or 0 to run indefinitely (default)") Repeat = flag.Int("repeat", 1, "number of times to repeat the experiment (default 1)") + UseIndex = flag.Bool("use-index", false, "use the GetIndex call to get a list of blocks to read. Requires the SystemRoot token. Use this to rule out caching effects when reading.") ) -// Send 1234 to bytesInChan when we receive 1234 bytes from keepstore. -var bytesInChan = make(chan uint64) -var bytesOutChan = make(chan uint64) - -// Send struct{}{} to errorsChan when an error happens. -var errorsChan = make(chan struct{}) - -func main() { - flag.Parse() - - // Print version information if requested - if *getVersion { - fmt.Printf("keep-exercise %s\n", version) - os.Exit(0) - } - - stderr := log.New(os.Stderr, "", log.LstdFlags) - - if *ReadThreads > 0 && *WriteThreads == 0 { - stderr.Fatal("At least one write thread is required if rthreads is non-zero") - } - - if *ReadThreads == 0 && *WriteThreads == 0 { - stderr.Fatal("Nothing to do!") - } - +func createKeepClient(lgr *log.Logger) (kc *keepclient.KeepClient) { arv, err := arvadosclient.MakeArvadosClient() if err != nil { - stderr.Fatal(err) + lgr.Fatal(err) } - kc, err := keepclient.MakeKeepClient(arv) + kc, err = keepclient.MakeKeepClient(arv) if err != nil { - stderr.Fatal(err) + lgr.Fatal(err) } kc.Want_replicas = *Replicas @@ -103,6 +85,38 @@ func main() { TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure), }, } + overrideServices(kc, lgr) + return kc +} + +func main() { + if ok, code := cmd.ParseFlags(flag.CommandLine, os.Args[0], os.Args[1:], "", os.Stderr); !ok { + os.Exit(code) + } else if *getVersion { + fmt.Printf("%s %s\n", os.Args[0], version) + return + } + + lgr := log.New(os.Stderr, "", log.LstdFlags) + + if *ReadThreads > 0 && *WriteThreads == 0 && !*UseIndex { + lgr.Fatal("At least one write thread is required if rthreads is non-zero and -use-index is not enabled") + } + + if *ReadThreads == 0 && *WriteThreads == 0 { + lgr.Fatal("Nothing to do!") + } + + kc := createKeepClient(lgr) + + // When UseIndex is set, we need a KeepClient with SystemRoot powers to get + // the block index from the Keepstore. We use the SystemRootToken from + // the Arvados config.yml for that. + var cluster *arvados.Cluster + if *ReadThreads > 0 && *UseIndex { + cluster = loadConfig(lgr) + kc.Arvados.ApiToken = cluster.SystemRootToken + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -110,52 +124,79 @@ func main() { signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) go func() { <-sigChan - fmt.Print("\r") // Suppress the ^C print + // FIXME + //fmt.Print("\r") // Suppress the ^C print cancel() }() - overrideServices(kc, stderr) - csvHeader := "Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime,Repeat" + csvHeader := "Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,UseIndex,RunTime,Repeat" var summary string - for i := 0; i < *Repeat; i++ { - if ctx.Err() == nil { - summary = runExperiment(ctx, kc, summary, csvHeader, stderr) - stderr.Printf("*************************** experiment %d complete ******************************\n", i) - summary += fmt.Sprintf(",%d\n", i) - } + var nextBufs []chan []byte + for i := 0; i < *WriteThreads; i++ { + nextBuf := make(chan []byte, 1) + nextBufs = append(nextBufs, nextBuf) + go makeBufs(nextBuf, i, lgr) + } + + for i := 0; i < *Repeat && ctx.Err() == nil; i++ { + summary = runExperiment(ctx, cluster, kc, nextBufs, summary, csvHeader, lgr) + lgr.Printf("*************************** experiment %d complete ******************************\n", i) + summary += fmt.Sprintf(",%d\n", i) } - stderr.Println("Summary:") - stderr.Println() + + lgr.Println("Summary:") + lgr.Println() fmt.Println() fmt.Println(csvHeader + ",Experiment") fmt.Println(summary) } -func runExperiment(ctx context.Context, kc *keepclient.KeepClient, summary string, csvHeader string, stderr *log.Logger) (newSummary string) { - newSummary = summary +func runExperiment(ctx context.Context, cluster *arvados.Cluster, kc *keepclient.KeepClient, nextBufs []chan []byte, summary string, csvHeader string, lgr *log.Logger) (newSummary string) { + // Send 1234 to bytesInChan when we receive 1234 bytes from keepstore. + var bytesInChan = make(chan uint64) + var bytesOutChan = make(chan uint64) + // Send struct{}{} to errorsChan when an error happens. + var errorsChan = make(chan struct{}) + var nextLocator atomic.Value + // when UseIndex is set, this channel is used instead of nextLocator + var indexLocatorChan = make(chan string, 2) + + newSummary = summary // Start warmup ready := make(chan struct{}) var warmup bool if *ReadThreads > 0 { warmup = true - stderr.Printf("Start warmup phase, waiting for 1 available block before reading starts\n") + if !*UseIndex { + lgr.Printf("Start warmup phase, waiting for 1 available block before reading starts\n") + } else { + lgr.Printf("Start warmup phase, waiting for block index before reading starts\n") + } } - nextBuf := make(chan []byte, 1) - go makeBufs(nextBuf, 0, stderr) - if warmup { + if warmup && !*UseIndex { go func() { - locator, _, err := kc.PutB(<-nextBuf) + locator, _, err := kc.PutB(<-nextBufs[0]) if err != nil { - stderr.Print(err) + lgr.Print(err) errorsChan <- struct{}{} } nextLocator.Store(locator) - stderr.Println("Warmup complete!") + lgr.Println("Warmup complete!") close(ready) }() + } else if warmup && *UseIndex { + // Get list of blocks to read + go getIndexLocators(ctx, cluster, kc, indexLocatorChan, lgr) + select { + case <-ctx.Done(): + return + case <-indexLocatorChan: + lgr.Println("Warmup complete!") + close(ready) + } } else { close(ready) } @@ -170,15 +211,16 @@ func runExperiment(ctx context.Context, kc *keepclient.KeepClient, summary strin defer cancel() for i := 0; i < *WriteThreads; i++ { - if i > 0 { - // the makeBufs goroutine with index 0 was already started for the warmup phase, above - nextBuf := make(chan []byte, 1) - go makeBufs(nextBuf, i, stderr) - } - go doWrites(ctx, kc, nextBuf, &nextLocator, stderr) + go doWrites(ctx, kc, nextBufs[i], &nextLocator, bytesOutChan, errorsChan, lgr) } - for i := 0; i < *ReadThreads; i++ { - go doReads(ctx, kc, &nextLocator, stderr) + if *UseIndex { + for i := 0; i < *ReadThreads; i++ { + go doReads(ctx, kc, nil, indexLocatorChan, bytesInChan, errorsChan, lgr) + } + } else { + for i := 0; i < *ReadThreads; i++ { + go doReads(ctx, kc, &nextLocator, nil, bytesInChan, errorsChan, lgr) + } } t0 := time.Now() @@ -219,7 +261,7 @@ func runExperiment(ctx context.Context, kc *keepclient.KeepClient, summary strin if rateOut > maxRateOut { maxRateOut = rateOut } - line := fmt.Sprintf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s,%d", + line := fmt.Sprintf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%t,%s,%d", time.Now().Format("2006/01/02 15:04:05"), elapsed, bytesIn, rateIn, maxRateIn, @@ -234,6 +276,7 @@ func runExperiment(ctx context.Context, kc *keepclient.KeepClient, summary strin *StatsInterval, *ServiceURL, *ServiceUUID, + *UseIndex, *RunTime, *Repeat, ) @@ -245,10 +288,9 @@ func runExperiment(ctx context.Context, kc *keepclient.KeepClient, summary strin printCsv = false } } - return } -func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) { +func makeBufs(nextBuf chan<- []byte, threadID int, lgr *log.Logger) { buf := make([]byte, *BlockSize) if *VaryThread { binary.PutVarint(buf, int64(threadID)) @@ -261,7 +303,7 @@ func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) { if *VaryRequest { rnd := make([]byte, randSize) if _, err := io.ReadFull(rand.Reader, rnd); err != nil { - stderr.Fatal(err) + lgr.Fatal(err) } buf = append(rnd, buf[randSize:]...) } @@ -269,12 +311,14 @@ func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) { } } -func doWrites(ctx context.Context, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator *atomic.Value, stderr *log.Logger) { +func doWrites(ctx context.Context, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator *atomic.Value, bytesOutChan chan<- uint64, errorsChan chan<- struct{}, lgr *log.Logger) { for ctx.Err() == nil { + //lgr.Printf("%s nextbuf %s, waiting for nextBuf\n",nextBuf,time.Now()) buf := <-nextBuf + //lgr.Printf("%s nextbuf %s, done waiting for nextBuf\n",nextBuf,time.Now()) locator, _, err := kc.PutB(buf) if err != nil { - stderr.Print(err) + lgr.Print(err) errorsChan <- struct{}{} continue } @@ -283,20 +327,85 @@ func doWrites(ctx context.Context, kc *keepclient.KeepClient, nextBuf <-chan []b } } -func doReads(ctx context.Context, kc *keepclient.KeepClient, nextLocator *atomic.Value, stderr *log.Logger) { - var locator string +func getIndexLocators(ctx context.Context, cluster *arvados.Cluster, kc *keepclient.KeepClient, indexLocatorChan chan<- string, lgr *log.Logger) { + if ctx.Err() != nil { + return + } + locatorsMap := make(map[string]bool) + var locators []string + var count int64 + for uuid := range kc.LocalRoots() { + reader, err := kc.GetIndex(uuid, "") + if err != nil { + lgr.Fatalf("Error getting index: %s\n", err) + } + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + locatorsMap[strings.Split(scanner.Text(), " ")[0]] = true + count++ + } + } + for l := range locatorsMap { + locators = append(locators, l) + } + lgr.Printf("Found %d locators\n", count) + lgr.Printf("Found %d locators (deduplicated)\n", len(locators)) + if len(locators) < 1 { + lgr.Fatal("Error: no locators found. The keepstores do not seem to contain any data. Remove the -use-index cli argument.") + } + + mathRand.Seed(time.Now().UnixNano()) + mathRand.Shuffle(len(locators), func(i, j int) { locators[i], locators[j] = locators[j], locators[i] }) + + for _, locator := range locators { + // We need the Collections.BlobSigningKey to sign our block requests. This requires access to /etc/arvados/config.yml + signedLocator := arvados.SignLocator(locator, kc.Arvados.ApiToken, time.Now().Local().Add(1*time.Hour), cluster.Collections.BlobSigningTTL.Duration(), []byte(cluster.Collections.BlobSigningKey)) + select { + case <-ctx.Done(): + return + case indexLocatorChan <- signedLocator: + } + } + lgr.Fatal("Error: ran out of locators to read!") +} + +func loadConfig(lgr *log.Logger) (cluster *arvados.Cluster) { + loader := config.NewLoader(os.Stdin, nil) + loader.SkipLegacy = true + + cfg, err := loader.Load() + if err != nil { + lgr.Fatal(err) + } + cluster, err = cfg.GetCluster("") + if err != nil { + lgr.Fatal(err) + } + return +} + +func doReads(ctx context.Context, kc *keepclient.KeepClient, nextLocator *atomic.Value, indexLocatorChan <-chan string, bytesInChan chan<- uint64, errorsChan chan<- struct{}, lgr *log.Logger) { for ctx.Err() == nil { - locator = nextLocator.Load().(string) + var locator string + if indexLocatorChan != nil { + select { + case <-ctx.Done(): + return + case locator = <-indexLocatorChan: + } + } else { + locator = nextLocator.Load().(string) + } rdr, size, url, err := kc.Get(locator) if err != nil { - stderr.Print(err) + lgr.Print(err) errorsChan <- struct{}{} continue } n, err := io.Copy(ioutil.Discard, rdr) rdr.Close() if n != size || err != nil { - stderr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err) + lgr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err) errorsChan <- struct{}{} continue // Note we don't count the bytes received in @@ -307,7 +416,7 @@ func doReads(ctx context.Context, kc *keepclient.KeepClient, nextLocator *atomic } } -func overrideServices(kc *keepclient.KeepClient, stderr *log.Logger) { +func overrideServices(kc *keepclient.KeepClient, lgr *log.Logger) { roots := make(map[string]string) if *ServiceURL != "" { roots["zzzzz-bi6l4-000000000000000"] = *ServiceURL @@ -319,7 +428,7 @@ func overrideServices(kc *keepclient.KeepClient, stderr *log.Logger) { } } if len(roots) == 0 { - stderr.Fatalf("Service %q was not in list advertised by API %+q", *ServiceUUID, kc.GatewayRoots()) + lgr.Fatalf("Service %q was not in list advertised by API %+q", *ServiceUUID, kc.GatewayRoots()) } } else { return