From 395c842444856a96a4216353a201ff5f42e3ef64 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Sat, 24 Oct 2020 21:18:33 -0400 Subject: [PATCH] Write import stats to stats.json. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- cmd.go | 2 +- import.go | 55 +++++++++++++++++++++++++++++++++++++----------- pipeline_test.go | 4 ++-- stats.go | 6 +++--- tilelib.go | 28 +++++++++++++++++++++--- tilelib_test.go | 16 +++++++------- 6 files changed, 82 insertions(+), 29 deletions(-) diff --git a/cmd.go b/cmd.go index 0fdad66a46..2e59e51b76 100644 --- a/cmd.go +++ b/cmd.go @@ -29,7 +29,7 @@ var ( "pca-py": &pythonPCA{}, "plot": &pythonPlot{}, "diff-fasta": &diffFasta{}, - "stats": &stats{}, + "stats": &statscmd{}, "merge": &merger{}, }) ) diff --git a/import.go b/import.go index 254029e477..aa83a70798 100644 --- a/import.go +++ b/import.go @@ -4,6 +4,7 @@ import ( "bufio" "compress/gzip" "encoding/gob" + "encoding/json" "errors" "flag" "fmt" @@ -34,6 +35,7 @@ type importer struct { skipOOO bool outputTiles bool includeNoCalls bool + outputStats string encoder *gob.Encoder } @@ -54,6 +56,7 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std flags.BoolVar(&cmd.skipOOO, "skip-ooo", false, "skip out-of-order tags") flags.BoolVar(&cmd.outputTiles, "output-tiles", false, "include tile variant sequences in output file") flags.BoolVar(&cmd.includeNoCalls, "include-no-calls", false, "treat tiles with no-calls as regular tiles") + flags.StringVar(&cmd.outputStats, "output-stats", "", "output stats to `file` (json)") priority := flags.Int("priority", 500, "container request priority") pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`") loglevel := flags.String("loglevel", "info", "logging threshold (trace, debug, info, warn, error, fatal, or panic)") @@ -118,6 +121,7 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std fmt.Sprintf("-skip-ooo=%v", cmd.skipOOO), fmt.Sprintf("-output-tiles=%v", cmd.outputTiles), fmt.Sprintf("-include-no-calls=%v", cmd.includeNoCalls), + "-output-stats", "/mnt/output/stats.json", "-tag-library", cmd.tagLibraryFile, "-ref", cmd.refFile, "-o", cmd.outputFile, @@ -180,17 +184,17 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std return 0 } -func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, error) { +func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, []importStats, error) { var input io.ReadCloser input, err := os.Open(infile) if err != nil { - return nil, err + return nil, nil, err } defer input.Close() if strings.HasSuffix(infile, ".gz") { input, err = gzip.NewReader(input) if err != nil { - return nil, err + return nil, nil, err } defer input.Close() } @@ -282,9 +286,10 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { starttime := time.Now() errs := make(chan error, 1) todo := make(chan func() error, len(infiles)*2) + allstats := make([][]importStats, len(infiles)*2) var encodeJobs sync.WaitGroup - for _, infile := range infiles { - infile := infile + for idx, infile := range infiles { + idx, infile := idx, infile var phases sync.WaitGroup phases.Add(2) variants := make([][]tileVariantID, 2) @@ -293,7 +298,8 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { defer phases.Done() log.Printf("%s starting", infile) defer log.Printf("%s done", infile) - tseqs, err := cmd.tileFasta(tilelib, infile) + tseqs, stats, err := cmd.tileFasta(tilelib, infile) + allstats[idx*2] = stats var kept, dropped int variants[0], kept, dropped = tseqs.Variants() log.Printf("%s found %d unique tags plus %d repeats", infile, kept, dropped) @@ -304,10 +310,12 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { defer phases.Done() log.Printf("%s starting", infile2) defer log.Printf("%s done", infile2) - tseqs, err := cmd.tileFasta(tilelib, infile2) + tseqs, stats, err := cmd.tileFasta(tilelib, infile2) + allstats[idx*2+1] = stats var kept, dropped int variants[1], kept, dropped = tseqs.Variants() log.Printf("%s found %d unique tags plus %d repeats", infile2, kept, dropped) + return err } } else if fastaFilenameRe.MatchString(infile) { @@ -316,7 +324,8 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { defer phases.Done() log.Printf("%s starting", infile) defer log.Printf("%s done", infile) - tseqs, err := cmd.tileFasta(tilelib, infile) + tseqs, stats, err := cmd.tileFasta(tilelib, infile) + allstats[idx*2] = stats if err != nil { return err } @@ -338,7 +347,8 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { defer phases.Done() log.Printf("%s phase %d starting", infile, phase+1) defer log.Printf("%s phase %d done", infile, phase+1) - tseqs, err := cmd.tileGVCF(tilelib, infile, phase) + tseqs, stats, err := cmd.tileGVCF(tilelib, infile, phase) + allstats[idx*2] = stats var kept, dropped int variants[phase], kept, dropped = tseqs.Variants() log.Printf("%s phase %d found %d unique tags plus %d repeats", infile, phase+1, kept, dropped) @@ -397,11 +407,32 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { } tileJobs.Wait() encodeJobs.Wait() + go close(errs) - return <-errs + err := <-errs + if err != nil { + return err + } + + if cmd.outputStats != "" { + f, err := os.OpenFile(cmd.outputStats, os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return err + } + var flatstats []importStats + for _, stats := range allstats { + flatstats = append(flatstats, stats...) + } + err = json.NewEncoder(f).Encode(flatstats) + if err != nil { + return err + } + } + + return nil } -func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, err error) { +func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, stats []importStats, err error) { if cmd.refFile == "" { err = errors.New("cannot import vcf: reference data (-ref) not specified") return @@ -433,7 +464,7 @@ func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (t return } defer consensus.Wait() - tileseq, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout) + tileseq, stats, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout) if err != nil { return } diff --git a/pipeline_test.go b/pipeline_test.go index 4dd544d813..bf6cf5597c 100644 --- a/pipeline_test.go +++ b/pipeline_test.go @@ -35,7 +35,7 @@ func (s *pipelineSuite) TestImport(c *check.C) { wg.Add(1) go func() { defer wg.Done() - code := (&stats{}).RunCommand("lightning stats", []string{"-local"}, statsin, statsout, os.Stderr) + code := (&statscmd{}).RunCommand("lightning stats", []string{"-local"}, statsin, statsout, os.Stderr) c.Check(code, check.Equals, 0) }() wg.Wait() @@ -76,7 +76,7 @@ func (s *pipelineSuite) TestImportMerge(c *check.C) { c.Logf("len(merged) %d", merged.Len()) statsout := &bytes.Buffer{} - code = (&stats{}).RunCommand("lightning stats", []string{"-local"}, bytes.NewReader(merged.Bytes()), statsout, os.Stderr) + code = (&statscmd{}).RunCommand("lightning stats", []string{"-local"}, bytes.NewReader(merged.Bytes()), statsout, os.Stderr) c.Check(code, check.Equals, 0) c.Check(statsout.Len() > 0, check.Equals, true) c.Logf("%s", statsout.String()) diff --git a/stats.go b/stats.go index a02fef701d..8da2a69ad7 100644 --- a/stats.go +++ b/stats.go @@ -17,11 +17,11 @@ import ( log "github.com/sirupsen/logrus" ) -type stats struct { +type statscmd struct { debugUnplaced bool } -func (cmd *stats) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { +func (cmd *statscmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { var err error defer func() { if err != nil { @@ -116,7 +116,7 @@ func (cmd *stats) RunCommand(prog string, args []string, stdin io.Reader, stdout return 0 } -func (cmd *stats) doStats(input io.Reader, output io.Writer) error { +func (cmd *statscmd) doStats(input io.Reader, output io.Writer) error { var ret struct { Genomes int CalledBases []int64 diff --git a/tilelib.go b/tilelib.go index 9ce63a8c6c..75e952696a 100644 --- a/tilelib.go +++ b/tilelib.go @@ -235,7 +235,17 @@ func (tilelib *tileLibrary) LoadGob(ctx context.Context, rdr io.Reader, onLoadGe return nil } -func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, error) { +type importStats struct { + InputFile string + InputLabel string + InputLength int + InputCoverage int + TileCoverage int + PathLength int + DroppedOutOfOrderTiles int +} + +func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, []importStats, error) { ret := tileSeq{} type jobT struct { label string @@ -269,6 +279,7 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, totalFoundTags := 0 totalPathLen := 0 skippedSequences := 0 + stats := make([]importStats, 0, len(todo)) for job := range todo { if len(job.fasta) == 0 { continue @@ -339,11 +350,22 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader) (tileSeq, ret[job.label] = pathcopy log.Debugf("%s %s tiled with path len %d, skipped %d", filelabel, job.label, len(path), skipped) - log.Infof("%s %s fasta in %d coverage in %d coverage out %d", filelabel, job.label, len(job.fasta), countBases(job.fasta), basesOut) + basesIn := countBases(job.fasta) + log.Infof("%s %s fasta in %d coverage in %d coverage out %d", filelabel, job.label, len(job.fasta), basesIn, basesOut) + stats = append(stats, importStats{ + InputFile: filelabel, + InputLabel: job.label, + InputLength: len(job.fasta), + InputCoverage: basesIn, + TileCoverage: basesOut, + PathLength: len(path), + DroppedOutOfOrderTiles: skipped, + }) + totalPathLen += len(path) } log.Printf("%s tiled with total path len %d in %d sequences (skipped %d sequences with '_' in name, skipped %d out-of-order tags)", filelabel, totalPathLen, len(ret), skippedSequences, totalFoundTags-totalPathLen) - return ret, scanner.Err() + return ret, stats, scanner.Err() } func (tilelib *tileLibrary) Len() int { diff --git a/tilelib_test.go b/tilelib_test.go index 0e7edd6759..6d6ff5abb8 100644 --- a/tilelib_test.go +++ b/tilelib_test.go @@ -34,7 +34,7 @@ gttattaataataacttatcatca func (s *tilelibSuite) TestSkipOOO(c *check.C) { // tags appear in seq: 4, 0, 2 (but skipOOO is false) tilelib := &tileLibrary{taglib: &s.taglib, skipOOO: false} - tseq, err := tilelib.TileFasta("test-label", bytes.NewBufferString(">test-seq\n"+ + tseq, _, err := tilelib.TileFasta("test-label", bytes.NewBufferString(">test-seq\n"+ s.tag[4]+ "ggggggggggggggggggggggg\n"+ s.tag[0]+ @@ -46,7 +46,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) { // tags appear in seq: 0, 1, 2 -> don't skip tilelib = &tileLibrary{taglib: &s.taglib, skipOOO: true} - tseq, err = tilelib.TileFasta("test-label", bytes.NewBufferString(">test-seq\n"+ + tseq, _, err = tilelib.TileFasta("test-label", bytes.NewBufferString(">test-seq\n"+ s.tag[0]+ "cccccccccccccccccccc\n"+ s.tag[1]+ @@ -58,7 +58,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) { // tags appear in seq: 2, 3, 4 -> don't skip tilelib = &tileLibrary{taglib: &s.taglib, skipOOO: true} - tseq, err = tilelib.TileFasta("test-label", bytes.NewBufferString(">test-seq\n"+ + tseq, _, err = tilelib.TileFasta("test-label", bytes.NewBufferString(">test-seq\n"+ s.tag[2]+ "cccccccccccccccccccc\n"+ s.tag[3]+ @@ -70,7 +70,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) { // tags appear in seq: 4, 0, 2 -> skip 4 tilelib = &tileLibrary{taglib: &s.taglib, skipOOO: true} - tseq, err = tilelib.TileFasta("test-label", bytes.NewBufferString(">test-seq\n"+ + tseq, _, err = tilelib.TileFasta("test-label", bytes.NewBufferString(">test-seq\n"+ s.tag[4]+ "cccccccccccccccccccc\n"+ s.tag[0]+ @@ -82,7 +82,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) { // tags appear in seq: 0, 2, 1 -> skip 2 tilelib = &tileLibrary{taglib: &s.taglib, skipOOO: true} - tseq, err = tilelib.TileFasta("test-label", bytes.NewBufferString(">test-seq\n"+ + tseq, _, err = tilelib.TileFasta("test-label", bytes.NewBufferString(">test-seq\n"+ s.tag[0]+ "cccccccccccccccccccc\n"+ s.tag[2]+ @@ -94,7 +94,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) { // tags appear in seq: 0, 1, 1, 2 -> skip second tag1 tilelib = &tileLibrary{taglib: &s.taglib, skipOOO: true} - tseq, err = tilelib.TileFasta("test-label", bytes.NewBufferString(">test-seq\n"+ + tseq, _, err = tilelib.TileFasta("test-label", bytes.NewBufferString(">test-seq\n"+ s.tag[0]+ "cccccccccccccccccccc\n"+ s.tag[1]+ @@ -108,7 +108,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) { // tags appear in seq: 0, 1, 3, 0, 4 -> skip second tag0 tilelib = &tileLibrary{taglib: &s.taglib, skipOOO: true} - tseq, err = tilelib.TileFasta("test-label", bytes.NewBufferString(">test-seq\n"+ + tseq, _, err = tilelib.TileFasta("test-label", bytes.NewBufferString(">test-seq\n"+ s.tag[0]+ "cccccccccccccccccccc\n"+ s.tag[1]+ @@ -124,7 +124,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) { // tags appear in seq: 0, 1, 3 -> don't skip tilelib = &tileLibrary{taglib: &s.taglib, skipOOO: true} - tseq, err = tilelib.TileFasta("test-label", bytes.NewBufferString(">test-seq\n"+ + tseq, _, err = tilelib.TileFasta("test-label", bytes.NewBufferString(">test-seq\n"+ s.tag[0]+ "cccccccccccccccccccc\n"+ s.tag[1]+ -- 2.30.2