X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/e58f325cacbdbbe44ba870193fc3453a53145137..395c842444856a96a4216353a201ff5f42e3ef64:/import.go diff --git a/import.go b/import.go index 254029e477..aa83a70798 100644 --- a/import.go +++ b/import.go @@ -4,6 +4,7 @@ import ( "bufio" "compress/gzip" "encoding/gob" + "encoding/json" "errors" "flag" "fmt" @@ -34,6 +35,7 @@ type importer struct { skipOOO bool outputTiles bool includeNoCalls bool + outputStats string encoder *gob.Encoder } @@ -54,6 +56,7 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std flags.BoolVar(&cmd.skipOOO, "skip-ooo", false, "skip out-of-order tags") flags.BoolVar(&cmd.outputTiles, "output-tiles", false, "include tile variant sequences in output file") flags.BoolVar(&cmd.includeNoCalls, "include-no-calls", false, "treat tiles with no-calls as regular tiles") + flags.StringVar(&cmd.outputStats, "output-stats", "", "output stats to `file` (json)") priority := flags.Int("priority", 500, "container request priority") pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`") loglevel := flags.String("loglevel", "info", "logging threshold (trace, debug, info, warn, error, fatal, or panic)") @@ -118,6 +121,7 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std fmt.Sprintf("-skip-ooo=%v", cmd.skipOOO), fmt.Sprintf("-output-tiles=%v", cmd.outputTiles), fmt.Sprintf("-include-no-calls=%v", cmd.includeNoCalls), + "-output-stats", "/mnt/output/stats.json", "-tag-library", cmd.tagLibraryFile, "-ref", cmd.refFile, "-o", cmd.outputFile, @@ -180,17 +184,17 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std return 0 } -func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, error) { +func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, []importStats, error) { var input io.ReadCloser input, err := os.Open(infile) if err != nil { - return nil, err + return nil, nil, err } defer input.Close() if strings.HasSuffix(infile, ".gz") { input, err = gzip.NewReader(input) if err != nil { - return nil, err + return nil, nil, err } defer input.Close() } @@ -282,9 +286,10 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { starttime := time.Now() errs := make(chan error, 1) todo := make(chan func() error, len(infiles)*2) + allstats := make([][]importStats, len(infiles)*2) var encodeJobs sync.WaitGroup - for _, infile := range infiles { - infile := infile + for idx, infile := range infiles { + idx, infile := idx, infile var phases sync.WaitGroup phases.Add(2) variants := make([][]tileVariantID, 2) @@ -293,7 +298,8 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { defer phases.Done() log.Printf("%s starting", infile) defer log.Printf("%s done", infile) - tseqs, err := cmd.tileFasta(tilelib, infile) + tseqs, stats, err := cmd.tileFasta(tilelib, infile) + allstats[idx*2] = stats var kept, dropped int variants[0], kept, dropped = tseqs.Variants() log.Printf("%s found %d unique tags plus %d repeats", infile, kept, dropped) @@ -304,10 +310,12 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { defer phases.Done() log.Printf("%s starting", infile2) defer log.Printf("%s done", infile2) - tseqs, err := cmd.tileFasta(tilelib, infile2) + tseqs, stats, err := cmd.tileFasta(tilelib, infile2) + allstats[idx*2+1] = stats var kept, dropped int variants[1], kept, dropped = tseqs.Variants() log.Printf("%s found %d unique tags plus %d repeats", infile2, kept, dropped) + return err } } else if fastaFilenameRe.MatchString(infile) { @@ -316,7 +324,8 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { defer phases.Done() log.Printf("%s starting", infile) defer log.Printf("%s done", infile) - tseqs, err := cmd.tileFasta(tilelib, infile) + tseqs, stats, err := cmd.tileFasta(tilelib, infile) + allstats[idx*2] = stats if err != nil { return err } @@ -338,7 +347,8 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { defer phases.Done() log.Printf("%s phase %d starting", infile, phase+1) defer log.Printf("%s phase %d done", infile, phase+1) - tseqs, err := cmd.tileGVCF(tilelib, infile, phase) + tseqs, stats, err := cmd.tileGVCF(tilelib, infile, phase) + allstats[idx*2] = stats var kept, dropped int variants[phase], kept, dropped = tseqs.Variants() log.Printf("%s phase %d found %d unique tags plus %d repeats", infile, phase+1, kept, dropped) @@ -397,11 +407,32 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { } tileJobs.Wait() encodeJobs.Wait() + go close(errs) - return <-errs + err := <-errs + if err != nil { + return err + } + + if cmd.outputStats != "" { + f, err := os.OpenFile(cmd.outputStats, os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return err + } + var flatstats []importStats + for _, stats := range allstats { + flatstats = append(flatstats, stats...) + } + err = json.NewEncoder(f).Encode(flatstats) + if err != nil { + return err + } + } + + return nil } -func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, err error) { +func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, stats []importStats, err error) { if cmd.refFile == "" { err = errors.New("cannot import vcf: reference data (-ref) not specified") return @@ -433,7 +464,7 @@ func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (t return } defer consensus.Wait() - tileseq, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout) + tileseq, stats, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout) if err != nil { return }