From 866decb73a9cf07b28e26c5028a1d42a5ef243a7 Mon Sep 17 00:00:00 2001 From: Ward Vandewege Date: Tue, 21 Jul 2020 17:10:40 -0400 Subject: [PATCH] 16585: implement review feedback. Arvados-DCO-1.1-Signed-off-by: Ward Vandewege --- tools/keep-exercise/keep-exercise.go | 191 +++++++++++++-------------- 1 file changed, 90 insertions(+), 101 deletions(-) diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go index 2aa0b44fc4..84e1a6ce8e 100644 --- a/tools/keep-exercise/keep-exercise.go +++ b/tools/keep-exercise/keep-exercise.go @@ -59,17 +59,17 @@ var ( getVersion = flag.Bool("version", false, "Print version information and exit.") RunTime = flag.Duration("run-time", 0, "time to run (e.g. 60s), or 0 to run indefinitely (default)") Repeat = flag.Int("repeat", 1, "number of times to repeat the experiment (default 1)") - UseIndex = flag.Bool("useIndex", false, "use the GetIndex call to get a list of blocks to read. Requires the SystemRoot token. Use this to rule out caching effects when reading.") + UseIndex = flag.Bool("use-index", false, "use the GetIndex call to get a list of blocks to read. Requires the SystemRoot token. Use this to rule out caching effects when reading.") ) -func createKeepClient(stderr *log.Logger) (kc *keepclient.KeepClient) { +func createKeepClient(lgr *log.Logger) (kc *keepclient.KeepClient) { arv, err := arvadosclient.MakeArvadosClient() if err != nil { - stderr.Fatal(err) + lgr.Fatal(err) } kc, err = keepclient.MakeKeepClient(arv) if err != nil { - stderr.Fatal(err) + lgr.Fatal(err) } kc.Want_replicas = *Replicas @@ -84,7 +84,7 @@ func createKeepClient(stderr *log.Logger) (kc *keepclient.KeepClient) { TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure), }, } - overrideServices(kc, stderr) + overrideServices(kc, lgr) return kc } @@ -97,24 +97,24 @@ func main() { os.Exit(0) } - stderr := log.New(os.Stderr, "", log.LstdFlags) + lgr := log.New(os.Stderr, "", log.LstdFlags) if *ReadThreads > 0 && *WriteThreads == 0 && !*UseIndex { - stderr.Fatal("At least one write thread is required if rthreads is non-zero and useIndex is not enabled") + lgr.Fatal("At least one write thread is required if rthreads is non-zero and -use-index is not enabled") } if *ReadThreads == 0 && *WriteThreads == 0 { - stderr.Fatal("Nothing to do!") + lgr.Fatal("Nothing to do!") } - kc := createKeepClient(stderr) + kc := createKeepClient(lgr) - // When UseIndx is set, we need a KeepClient with SystemRoot powers to get + // When UseIndex is set, we need a KeepClient with SystemRoot powers to get // the block index from the Keepstore. We use the SystemRootToken from // the Arvados config.yml for that. var cluster *arvados.Cluster if *ReadThreads > 0 && *UseIndex { - cluster = loadConfig(stderr) + cluster = loadConfig(lgr) kc.Arvados.ApiToken = cluster.SystemRootToken } @@ -124,7 +124,8 @@ func main() { signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) go func() { <-sigChan - fmt.Print("\r") // Suppress the ^C print + // FIXME + //fmt.Print("\r") // Suppress the ^C print cancel() }() @@ -135,26 +136,23 @@ func main() { for i := 0; i < *WriteThreads; i++ { nextBuf := make(chan []byte, 1) nextBufs = append(nextBufs, nextBuf) - go makeBufs(nextBuf, i, stderr) + go makeBufs(nextBuf, i, lgr) } - for i := 0; i < *Repeat; i++ { - if ctx.Err() == nil { - summary = runExperiment(ctx, cluster, kc, nextBufs, summary, csvHeader, stderr) - stderr.Printf("*************************** experiment %d complete ******************************\n", i) - summary += fmt.Sprintf(",%d\n", i) - } - } - if ctx.Err() == nil { - stderr.Println("Summary:") - stderr.Println() - fmt.Println() - fmt.Println(csvHeader + ",Experiment") - fmt.Println(summary) + for i := 0; i < *Repeat && ctx.Err() == nil; i++ { + summary = runExperiment(ctx, cluster, kc, nextBufs, summary, csvHeader, lgr) + lgr.Printf("*************************** experiment %d complete ******************************\n", i) + summary += fmt.Sprintf(",%d\n", i) } + + lgr.Println("Summary:") + lgr.Println() + fmt.Println() + fmt.Println(csvHeader + ",Experiment") + fmt.Println(summary) } -func runExperiment(ctx context.Context, cluster *arvados.Cluster, kc *keepclient.KeepClient, nextBufs []chan []byte, summary string, csvHeader string, stderr *log.Logger) (newSummary string) { +func runExperiment(ctx context.Context, cluster *arvados.Cluster, kc *keepclient.KeepClient, nextBufs []chan []byte, summary string, csvHeader string, lgr *log.Logger) (newSummary string) { // Send 1234 to bytesInChan when we receive 1234 bytes from keepstore. var bytesInChan = make(chan uint64) var bytesOutChan = make(chan uint64) @@ -173,30 +171,30 @@ func runExperiment(ctx context.Context, cluster *arvados.Cluster, kc *keepclient if *ReadThreads > 0 { warmup = true if !*UseIndex { - stderr.Printf("Start warmup phase, waiting for 1 available block before reading starts\n") + lgr.Printf("Start warmup phase, waiting for 1 available block before reading starts\n") } else { - stderr.Printf("Start warmup phase, waiting for block index before reading starts\n") + lgr.Printf("Start warmup phase, waiting for block index before reading starts\n") } } if warmup && !*UseIndex { go func() { locator, _, err := kc.PutB(<-nextBufs[0]) if err != nil { - stderr.Print(err) + lgr.Print(err) errorsChan <- struct{}{} } nextLocator.Store(locator) - stderr.Println("Warmup complete!") + lgr.Println("Warmup complete!") close(ready) }() } else if warmup && *UseIndex { // Get list of blocks to read - go getIndexLocators(ctx, cluster, kc, indexLocatorChan, stderr) + go getIndexLocators(ctx, cluster, kc, indexLocatorChan, lgr) select { case <-ctx.Done(): return case <-indexLocatorChan: - stderr.Println("Warmup complete!") + lgr.Println("Warmup complete!") close(ready) } } else { @@ -213,15 +211,15 @@ func runExperiment(ctx context.Context, cluster *arvados.Cluster, kc *keepclient defer cancel() for i := 0; i < *WriteThreads; i++ { - go doWrites(ctx, kc, nextBufs[i], &nextLocator, bytesOutChan, errorsChan, stderr) + go doWrites(ctx, kc, nextBufs[i], &nextLocator, bytesOutChan, errorsChan, lgr) } if *UseIndex { for i := 0; i < *ReadThreads; i++ { - go doIndexReads(ctx, kc, cluster, indexLocatorChan, bytesInChan, errorsChan, stderr) + go doReads(ctx, kc, nil, indexLocatorChan, bytesInChan, errorsChan, lgr) } } else { for i := 0; i < *ReadThreads; i++ { - go doReads(ctx, kc, &nextLocator, bytesInChan, errorsChan, stderr) + go doReads(ctx, kc, &nextLocator, nil, bytesInChan, errorsChan, lgr) } } @@ -292,7 +290,7 @@ func runExperiment(ctx context.Context, cluster *arvados.Cluster, kc *keepclient } } -func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) { +func makeBufs(nextBuf chan<- []byte, threadID int, lgr *log.Logger) { buf := make([]byte, *BlockSize) if *VaryThread { binary.PutVarint(buf, int64(threadID)) @@ -305,7 +303,7 @@ func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) { if *VaryRequest { rnd := make([]byte, randSize) if _, err := io.ReadFull(rand.Reader, rnd); err != nil { - stderr.Fatal(err) + lgr.Fatal(err) } buf = append(rnd, buf[randSize:]...) } @@ -313,12 +311,14 @@ func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) { } } -func doWrites(ctx context.Context, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator *atomic.Value, bytesOutChan chan<- uint64, errorsChan chan<- struct{}, stderr *log.Logger) { +func doWrites(ctx context.Context, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator *atomic.Value, bytesOutChan chan<- uint64, errorsChan chan<- struct{}, lgr *log.Logger) { for ctx.Err() == nil { + //lgr.Printf("%s nextbuf %s, waiting for nextBuf\n",nextBuf,time.Now()) buf := <-nextBuf + //lgr.Printf("%s nextbuf %s, done waiting for nextBuf\n",nextBuf,time.Now()) locator, _, err := kc.PutB(buf) if err != nil { - stderr.Print(err) + lgr.Print(err) errorsChan <- struct{}{} continue } @@ -327,96 +327,85 @@ func doWrites(ctx context.Context, kc *keepclient.KeepClient, nextBuf <-chan []b } } -func getIndexLocators(ctx context.Context, cluster *arvados.Cluster, kc *keepclient.KeepClient, indexLocatorChan chan<- string, stderr *log.Logger) { - if ctx.Err() == nil { - var locators []string - for uuid := range kc.LocalRoots() { - reader, err := kc.GetIndex(uuid, "") - if err != nil { - stderr.Fatalf("Error getting index: %s\n", err) - } - scanner := bufio.NewScanner(reader) - for scanner.Scan() { - locators = append(locators, strings.Split(scanner.Text(), " ")[0]) - } +func getIndexLocators(ctx context.Context, cluster *arvados.Cluster, kc *keepclient.KeepClient, indexLocatorChan chan<- string, lgr *log.Logger) { + if ctx.Err() != nil { + return + } + locatorsMap := make(map[string]bool) + var locators []string + var count int64 + for uuid := range kc.LocalRoots() { + reader, err := kc.GetIndex(uuid, "") + if err != nil { + lgr.Fatalf("Error getting index: %s\n", err) } - stderr.Printf("Found %d locators\n", len(locators)) - if len(locators) < 1 { - stderr.Fatal("Error: no locators found. The keepstores do not seem to contain any data. Remove the useIndex cli argument.") + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + locatorsMap[strings.Split(scanner.Text(), " ")[0]] = true + count++ } + } + for l := range locatorsMap { + locators = append(locators, l) + } + lgr.Printf("Found %d locators\n", count) + lgr.Printf("Found %d locators (deduplicated)\n", len(locators)) + if len(locators) < 1 { + lgr.Fatal("Error: no locators found. The keepstores do not seem to contain any data. Remove the -use-index cli argument.") + } - mathRand.Seed(time.Now().UnixNano()) - mathRand.Shuffle(len(locators), func(i, j int) { locators[i], locators[j] = locators[j], locators[i] }) + mathRand.Seed(time.Now().UnixNano()) + mathRand.Shuffle(len(locators), func(i, j int) { locators[i], locators[j] = locators[j], locators[i] }) - for _, locator := range locators { - // We need the Collections.BlobSigningKey to sign our block requests. This requires access to /etc/arvados/config.yml - signedLocator := arvados.SignLocator(locator, kc.Arvados.ApiToken, time.Now().Local().Add(1*time.Hour), cluster.Collections.BlobSigningTTL.Duration(), []byte(cluster.Collections.BlobSigningKey)) - select { - case <-ctx.Done(): - return - case indexLocatorChan <- signedLocator: - } + for _, locator := range locators { + // We need the Collections.BlobSigningKey to sign our block requests. This requires access to /etc/arvados/config.yml + signedLocator := arvados.SignLocator(locator, kc.Arvados.ApiToken, time.Now().Local().Add(1*time.Hour), cluster.Collections.BlobSigningTTL.Duration(), []byte(cluster.Collections.BlobSigningKey)) + select { + case <-ctx.Done(): + return + case indexLocatorChan <- signedLocator: } - stderr.Fatal("Error: ran out of locators to read!") } + lgr.Fatal("Error: ran out of locators to read!") } -func loadConfig(stderr *log.Logger) (cluster *arvados.Cluster) { +func loadConfig(lgr *log.Logger) (cluster *arvados.Cluster) { loader := config.NewLoader(os.Stdin, nil) loader.SkipLegacy = true cfg, err := loader.Load() if err != nil { - stderr.Fatal(err) + lgr.Fatal(err) } cluster, err = cfg.GetCluster("") if err != nil { - stderr.Fatal(err) + lgr.Fatal(err) } return } -func doIndexReads(ctx context.Context, kc *keepclient.KeepClient, cluster *arvados.Cluster, indexLocatorChan <-chan string, bytesInChan chan<- uint64, errorsChan chan<- struct{}, stderr *log.Logger) { +func doReads(ctx context.Context, kc *keepclient.KeepClient, nextLocator *atomic.Value, indexLocatorChan <-chan string, bytesInChan chan<- uint64, errorsChan chan<- struct{}, lgr *log.Logger) { for ctx.Err() == nil { - select { - case <-ctx.Done(): - return - case locator := <-indexLocatorChan: - rdr, size, url, err := kc.Get(locator) - if err != nil { - stderr.Print(err) - errorsChan <- struct{}{} - continue - } - n, err := io.Copy(ioutil.Discard, rdr) - rdr.Close() - if n != size || err != nil { - stderr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err) - errorsChan <- struct{}{} - continue - // Note we don't count the bytes received in - // partial/corrupt responses: we are measuring - // throughput, not resource consumption. + var locator string + if indexLocatorChan != nil { + select { + case <-ctx.Done(): + return + case locator = <-indexLocatorChan: } - bytesInChan <- uint64(n) + } else { + locator = nextLocator.Load().(string) } - } -} - -func doReads(ctx context.Context, kc *keepclient.KeepClient, nextLocator *atomic.Value, bytesInChan chan<- uint64, errorsChan chan<- struct{}, stderr *log.Logger) { - var locator string - for ctx.Err() == nil { - locator = nextLocator.Load().(string) rdr, size, url, err := kc.Get(locator) if err != nil { - stderr.Print(err) + lgr.Print(err) errorsChan <- struct{}{} continue } n, err := io.Copy(ioutil.Discard, rdr) rdr.Close() if n != size || err != nil { - stderr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err) + lgr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err) errorsChan <- struct{}{} continue // Note we don't count the bytes received in @@ -427,7 +416,7 @@ func doReads(ctx context.Context, kc *keepclient.KeepClient, nextLocator *atomic } } -func overrideServices(kc *keepclient.KeepClient, stderr *log.Logger) { +func overrideServices(kc *keepclient.KeepClient, lgr *log.Logger) { roots := make(map[string]string) if *ServiceURL != "" { roots["zzzzz-bi6l4-000000000000000"] = *ServiceURL @@ -439,7 +428,7 @@ func overrideServices(kc *keepclient.KeepClient, stderr *log.Logger) { } } if len(roots) == 0 { - stderr.Fatalf("Service %q was not in list advertised by API %+q", *ServiceUUID, kc.GatewayRoots()) + lgr.Fatalf("Service %q was not in list advertised by API %+q", *ServiceUUID, kc.GatewayRoots()) } } else { return -- 2.30.2