From: Ward Vandewege Date: Thu, 9 Jul 2020 21:31:46 +0000 (-0400) Subject: 16585: add -useIndex flag. Remove global variables. X-Git-Tag: 2.1.0~158^2~3 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/34cb31d5191ce17c37ddd9d344f4d77125af64a8 16585: add -useIndex flag. Remove global variables. Arvados-DCO-1.1-Signed-off-by: Ward Vandewege --- diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go index 19d46efbd8..d9652fc208 100644 --- a/tools/keep-exercise/keep-exercise.go +++ b/tools/keep-exercise/keep-exercise.go @@ -19,6 +19,7 @@ package main import ( + "bufio" "context" "crypto/rand" "encoding/binary" @@ -27,13 +28,17 @@ import ( "io" "io/ioutil" "log" + mathRand "math/rand" "net/http" "os" "os/signal" + "strings" "sync/atomic" "syscall" "time" + "git.arvados.org/arvados.git/lib/config" + "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/keepclient" ) @@ -54,14 +59,34 @@ var ( getVersion = flag.Bool("version", false, "Print version information and exit.") RunTime = flag.Duration("run-time", 0, "time to run (e.g. 60s), or 0 to run indefinitely (default)") Repeat = flag.Int("repeat", 1, "number of times to repeat the experiment (default 1)") + UseIndex = flag.Bool("useIndex", false, "use the GetIndex call to get a list of blocks to read. Requires the SystemRoot token. Use this to rule out caching effects when reading.") ) -// Send 1234 to bytesInChan when we receive 1234 bytes from keepstore. -var bytesInChan = make(chan uint64) -var bytesOutChan = make(chan uint64) +func createKeepClient(stderr *log.Logger) (kc *keepclient.KeepClient) { + arv, err := arvadosclient.MakeArvadosClient() + if err != nil { + stderr.Fatal(err) + } + kc, err = keepclient.MakeKeepClient(arv) + if err != nil { + stderr.Fatal(err) + } + kc.Want_replicas = *Replicas -// Send struct{}{} to errorsChan when an error happens. -var errorsChan = make(chan struct{}) + kc.HTTPClient = &http.Client{ + Timeout: 10 * time.Minute, + // It's not safe to copy *http.DefaultTransport + // because it has a mutex (which might be locked) + // protecting a private map (which might not be nil). + // So we build our own, using the Go 1.12 default + // values. + Transport: &http.Transport{ + TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure), + }, + } + overrideServices(kc, stderr) + return kc +} func main() { flag.Parse() @@ -74,34 +99,23 @@ func main() { stderr := log.New(os.Stderr, "", log.LstdFlags) - if *ReadThreads > 0 && *WriteThreads == 0 { - stderr.Fatal("At least one write thread is required if rthreads is non-zero") + if *ReadThreads > 0 && *WriteThreads == 0 && !*UseIndex { + stderr.Fatal("At least one write thread is required if rthreads is non-zero and useIndex is not enabled") } if *ReadThreads == 0 && *WriteThreads == 0 { stderr.Fatal("Nothing to do!") } - arv, err := arvadosclient.MakeArvadosClient() - if err != nil { - stderr.Fatal(err) - } - kc, err := keepclient.MakeKeepClient(arv) - if err != nil { - stderr.Fatal(err) - } - kc.Want_replicas = *Replicas + kc := createKeepClient(stderr) - kc.HTTPClient = &http.Client{ - Timeout: 10 * time.Minute, - // It's not safe to copy *http.DefaultTransport - // because it has a mutex (which might be locked) - // protecting a private map (which might not be nil). - // So we build our own, using the Go 1.12 default - // values. - Transport: &http.Transport{ - TLSClientConfig: arvadosclient.MakeTLSConfig(arv.ApiInsecure), - }, + // When UseIndx is set, we need a KeepClient with SystemRoot powers to get + // the block index from the Keepstore. We use the SystemRootToken from + // the Arvados config.yml for that. + var cluster *arvados.Cluster + if *UseIndex { + cluster = loadConfig(stderr) + kc.Arvados.ApiToken = cluster.SystemRootToken } ctx, cancel := context.WithCancel(context.Background()) @@ -114,38 +128,52 @@ func main() { cancel() }() - overrideServices(kc, stderr) - csvHeader := "Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,RunTime,Repeat" + csvHeader := "Timestamp,Elapsed,Read (bytes),Avg Read Speed (MiB/s),Peak Read Speed (MiB/s),Written (bytes),Avg Write Speed (MiB/s),Peak Write Speed (MiB/s),Errors,ReadThreads,WriteThreads,VaryRequest,VaryThread,BlockSize,Replicas,StatsInterval,ServiceURL,ServiceUUID,UseIndex,RunTime,Repeat" var summary string for i := 0; i < *Repeat; i++ { if ctx.Err() == nil { - summary = runExperiment(ctx, kc, summary, csvHeader, stderr) + summary = runExperiment(ctx, cluster, kc, summary, csvHeader, stderr) stderr.Printf("*************************** experiment %d complete ******************************\n", i) summary += fmt.Sprintf(",%d\n", i) } } - stderr.Println("Summary:") - stderr.Println() - fmt.Println() - fmt.Println(csvHeader + ",Experiment") - fmt.Println(summary) + if ctx.Err() == nil { + stderr.Println("Summary:") + stderr.Println() + fmt.Println() + fmt.Println(csvHeader + ",Experiment") + fmt.Println(summary) + } } -func runExperiment(ctx context.Context, kc *keepclient.KeepClient, summary string, csvHeader string, stderr *log.Logger) (newSummary string) { - newSummary = summary +func runExperiment(ctx context.Context, cluster *arvados.Cluster, kc *keepclient.KeepClient, summary string, csvHeader string, stderr *log.Logger) (newSummary string) { + // Send 1234 to bytesInChan when we receive 1234 bytes from keepstore. + var bytesInChan = make(chan uint64) + var bytesOutChan = make(chan uint64) + // Send struct{}{} to errorsChan when an error happens. + var errorsChan = make(chan struct{}) + var nextLocator atomic.Value + // when UseIndex is set, this channel is used instead of nextLocator + var indexLocatorChan = make(chan string, 2) + + newSummary = summary // Start warmup ready := make(chan struct{}) var warmup bool if *ReadThreads > 0 { warmup = true - stderr.Printf("Start warmup phase, waiting for 1 available block before reading starts\n") + if !*UseIndex { + stderr.Printf("Start warmup phase, waiting for 1 available block before reading starts\n") + } else { + stderr.Printf("Start warmup phase, waiting for block index before reading starts\n") + } } nextBuf := make(chan []byte, 1) go makeBufs(nextBuf, 0, stderr) - if warmup { + if warmup && !*UseIndex { go func() { locator, _, err := kc.PutB(<-nextBuf) if err != nil { @@ -156,6 +184,16 @@ func runExperiment(ctx context.Context, kc *keepclient.KeepClient, summary strin stderr.Println("Warmup complete!") close(ready) }() + } else if *UseIndex { + // Get list of blocks to read + go getIndexLocators(ctx, cluster, kc, indexLocatorChan, stderr) + select { + case <-ctx.Done(): + return + case <-indexLocatorChan: + stderr.Println("Warmup complete!") + close(ready) + } } else { close(ready) } @@ -175,10 +213,16 @@ func runExperiment(ctx context.Context, kc *keepclient.KeepClient, summary strin nextBuf := make(chan []byte, 1) go makeBufs(nextBuf, i, stderr) } - go doWrites(ctx, kc, nextBuf, &nextLocator, stderr) + go doWrites(ctx, kc, nextBuf, &nextLocator, bytesOutChan, errorsChan, stderr) } - for i := 0; i < *ReadThreads; i++ { - go doReads(ctx, kc, &nextLocator, stderr) + if *UseIndex { + for i := 0; i < *ReadThreads; i++ { + go doIndexReads(ctx, kc, cluster, indexLocatorChan, bytesInChan, errorsChan, stderr) + } + } else { + for i := 0; i < *ReadThreads; i++ { + go doReads(ctx, kc, &nextLocator, bytesInChan, errorsChan, stderr) + } } t0 := time.Now() @@ -219,7 +263,7 @@ func runExperiment(ctx context.Context, kc *keepclient.KeepClient, summary strin if rateOut > maxRateOut { maxRateOut = rateOut } - line := fmt.Sprintf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%s,%d", + line := fmt.Sprintf("%v,%v,%v,%.1f,%.1f,%v,%.1f,%.1f,%d,%d,%d,%t,%t,%d,%d,%s,%s,%s,%t,%s,%d", time.Now().Format("2006/01/02 15:04:05"), elapsed, bytesIn, rateIn, maxRateIn, @@ -234,6 +278,7 @@ func runExperiment(ctx context.Context, kc *keepclient.KeepClient, summary strin *StatsInterval, *ServiceURL, *ServiceUUID, + *UseIndex, *RunTime, *Repeat, ) @@ -245,7 +290,6 @@ func runExperiment(ctx context.Context, kc *keepclient.KeepClient, summary strin printCsv = false } } - return } func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) { @@ -269,7 +313,7 @@ func makeBufs(nextBuf chan<- []byte, threadID int, stderr *log.Logger) { } } -func doWrites(ctx context.Context, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator *atomic.Value, stderr *log.Logger) { +func doWrites(ctx context.Context, kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator *atomic.Value, bytesOutChan chan<- uint64, errorsChan chan<- struct{}, stderr *log.Logger) { for ctx.Err() == nil { buf := <-nextBuf locator, _, err := kc.PutB(buf) @@ -283,7 +327,83 @@ func doWrites(ctx context.Context, kc *keepclient.KeepClient, nextBuf <-chan []b } } -func doReads(ctx context.Context, kc *keepclient.KeepClient, nextLocator *atomic.Value, stderr *log.Logger) { +func getIndexLocators(ctx context.Context, cluster *arvados.Cluster, kc *keepclient.KeepClient, indexLocatorChan chan<- string, stderr *log.Logger) { + if ctx.Err() == nil { + var locators []string + for uuid := range kc.LocalRoots() { + reader, err := kc.GetIndex(uuid, "") + if err != nil { + stderr.Fatalf("Error getting index: %s\n", err) + } + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + locators = append(locators, strings.Split(scanner.Text(), " ")[0]) + } + } + stderr.Printf("Found %d locators\n", len(locators)) + if len(locators) < 1 { + stderr.Fatal("Error: no locators found. The keepstores do not seem to contain any data. Remove the useIndex cli argument.") + } + + mathRand.Seed(time.Now().UnixNano()) + mathRand.Shuffle(len(locators), func(i, j int) { locators[i], locators[j] = locators[j], locators[i] }) + + for _, locator := range locators { + // We need the Collections.BlobSigningKey to sign our block requests. This requires access to /etc/arvados/config.yml + signedLocator := arvados.SignLocator(locator, kc.Arvados.ApiToken, time.Now().Local().Add(1*time.Hour), cluster.Collections.BlobSigningTTL.Duration(), []byte(cluster.Collections.BlobSigningKey)) + select { + case <-ctx.Done(): + return + case indexLocatorChan <- signedLocator: + } + } + stderr.Fatal("Error: ran out of locators to read!") + } +} + +func loadConfig(stderr *log.Logger) (cluster *arvados.Cluster) { + loader := config.NewLoader(os.Stdin, nil) + loader.SkipLegacy = true + + cfg, err := loader.Load() + if err != nil { + stderr.Fatal(err) + } + cluster, err = cfg.GetCluster("") + if err != nil { + stderr.Fatal(err) + } + return +} + +func doIndexReads(ctx context.Context, kc *keepclient.KeepClient, cluster *arvados.Cluster, indexLocatorChan <-chan string, bytesInChan chan<- uint64, errorsChan chan<- struct{}, stderr *log.Logger) { + for ctx.Err() == nil { + select { + case <-ctx.Done(): + return + case locator := <-indexLocatorChan: + rdr, size, url, err := kc.Get(locator) + if err != nil { + stderr.Print(err) + errorsChan <- struct{}{} + continue + } + n, err := io.Copy(ioutil.Discard, rdr) + rdr.Close() + if n != size || err != nil { + stderr.Printf("Got %d bytes (expected %d) from %s: %v", n, size, url, err) + errorsChan <- struct{}{} + continue + // Note we don't count the bytes received in + // partial/corrupt responses: we are measuring + // throughput, not resource consumption. + } + bytesInChan <- uint64(n) + } + } +} + +func doReads(ctx context.Context, kc *keepclient.KeepClient, nextLocator *atomic.Value, bytesInChan chan<- uint64, errorsChan chan<- struct{}, stderr *log.Logger) { var locator string for ctx.Err() == nil { locator = nextLocator.Load().(string)