X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/c396a3c5364bbedd75c06b0bc49054236c91b110..bd981a00bfb1d74cec8477d5054ee4194fb9cb7e:/import.go diff --git a/import.go b/import.go index 36558ca8d6..57596fbb07 100644 --- a/import.go +++ b/import.go @@ -32,6 +32,8 @@ type importer struct { projectUUID string runLocal bool skipOOO bool + outputTiles bool + includeNoCalls bool encoder *gob.Encoder } @@ -50,6 +52,8 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for output data") flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)") flags.BoolVar(&cmd.skipOOO, "skip-ooo", false, "skip out-of-order tags") + flags.BoolVar(&cmd.outputTiles, "output-tiles", false, "include tile variant sequences in output file") + flags.BoolVar(&cmd.includeNoCalls, "include-no-calls", false, "treat tiles with no-calls as regular tiles") priority := flags.Int("priority", 500, "container request priority") pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`") loglevel := flags.String("loglevel", "info", "logging threshold (trace, debug, info, warn, error, fatal, or panic)") @@ -108,7 +112,16 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std err = errors.New("cannot specify output file in container mode: not implemented") return 1 } - runner.Args = append([]string{"import", "-local=true", "-loglevel=" + *loglevel, fmt.Sprintf("-skip-ooo=%v", cmd.skipOOO), "-tag-library", cmd.tagLibraryFile, "-ref", cmd.refFile, "-o", cmd.outputFile}, inputs...) + runner.Args = append([]string{"import", + "-local=true", + "-loglevel=" + *loglevel, + fmt.Sprintf("-skip-ooo=%v", cmd.skipOOO), + fmt.Sprintf("-output-tiles=%v", cmd.outputTiles), + fmt.Sprintf("-include-no-calls=%v", cmd.includeNoCalls), + "-tag-library", cmd.tagLibraryFile, + "-ref", cmd.refFile, + "-o", cmd.outputFile, + }, inputs...) var output string output, err = runner.Run() if err != nil { @@ -123,15 +136,10 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std return 1 } - tilelib, err := cmd.loadTileLibrary() + taglib, err := cmd.loadTagLibrary() if err != nil { return 1 } - go func() { - for range time.Tick(10 * time.Minute) { - log.Printf("tilelib.Len() == %d", tilelib.Len()) - } - }() var output io.WriteCloser if cmd.outputFile == "-" { @@ -146,6 +154,17 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std bufw := bufio.NewWriter(output) cmd.encoder = gob.NewEncoder(bufw) + tilelib := &tileLibrary{taglib: taglib, includeNoCalls: cmd.includeNoCalls, skipOOO: cmd.skipOOO} + if cmd.outputTiles { + cmd.encoder.Encode(LibraryEntry{TagSet: taglib.Tags()}) + tilelib.encoder = cmd.encoder + } + go func() { + for range time.Tick(10 * time.Minute) { + log.Printf("tilelib.Len() == %d", tilelib.Len()) + } + }() + err = cmd.tileInputs(tilelib, infiles) if err != nil { return 1 @@ -178,7 +197,7 @@ func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, er return tilelib.TileFasta(infile, input) } -func (cmd *importer) loadTileLibrary() (*tileLibrary, error) { +func (cmd *importer) loadTagLibrary() (*tagLibrary, error) { log.Printf("tag library %s load starting", cmd.tagLibraryFile) f, err := os.Open(cmd.tagLibraryFile) if err != nil { @@ -202,15 +221,22 @@ func (cmd *importer) loadTileLibrary() (*tileLibrary, error) { return nil, fmt.Errorf("cannot tile: tag library is empty") } log.Printf("tag library %s load done", cmd.tagLibraryFile) - return &tileLibrary{taglib: &taglib, skipOOO: cmd.skipOOO}, nil + return &taglib, nil } +var ( + vcfFilenameRe = regexp.MustCompile(`\.vcf(\.gz)?$`) + fasta1FilenameRe = regexp.MustCompile(`\.1\.fa(sta)?(\.gz)?$`) + fasta2FilenameRe = regexp.MustCompile(`\.2\.fa(sta)?(\.gz)?$`) + fastaFilenameRe = regexp.MustCompile(`\.fa(sta)?(\.gz)?$`) +) + func listInputFiles(paths []string) (files []string, err error) { for _, path := range paths { if fi, err := os.Stat(path); err != nil { return nil, fmt.Errorf("%s: stat failed: %s", path, err) } else if !fi.IsDir() { - if !strings.HasSuffix(path, ".2.fasta") || strings.HasSuffix(path, ".2.fasta.gz") { + if !fasta2FilenameRe.MatchString(path) { files = append(files, path) } continue @@ -226,23 +252,27 @@ func listInputFiles(paths []string) (files []string, err error) { } sort.Strings(names) for _, name := range names { - if strings.HasSuffix(name, ".vcf") || strings.HasSuffix(name, ".vcf.gz") { + if vcfFilenameRe.MatchString(name) { files = append(files, filepath.Join(path, name)) - } else if strings.HasSuffix(name, ".1.fasta") || strings.HasSuffix(name, ".1.fasta.gz") { + } else if fastaFilenameRe.MatchString(name) && !fasta2FilenameRe.MatchString(name) { files = append(files, filepath.Join(path, name)) } } d.Close() } for _, file := range files { - if strings.HasSuffix(file, ".1.fasta") || strings.HasSuffix(file, ".1.fasta.gz") { - continue - } else if _, err := os.Stat(file + ".csi"); err == nil { - continue - } else if _, err = os.Stat(file + ".tbi"); err == nil { + if fastaFilenameRe.MatchString(file) { continue + } else if vcfFilenameRe.MatchString(file) { + if _, err := os.Stat(file + ".csi"); err == nil { + continue + } else if _, err = os.Stat(file + ".tbi"); err == nil { + continue + } else { + return nil, fmt.Errorf("%s: cannot read without .tbi or .csi index file", file) + } } else { - return nil, fmt.Errorf("%s: cannot read without .tbi or .csi index file", file) + return nil, fmt.Errorf("don't know how to handle filename %s", file) } } return @@ -258,25 +288,42 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { var phases sync.WaitGroup phases.Add(2) variants := make([][]tileVariantID, 2) - if strings.HasSuffix(infile, ".1.fasta") || strings.HasSuffix(infile, ".1.fasta.gz") { + if fasta1FilenameRe.MatchString(infile) { todo <- func() error { defer phases.Done() log.Printf("%s starting", infile) defer log.Printf("%s done", infile) tseqs, err := cmd.tileFasta(tilelib, infile) - variants[0] = tseqs.Variants() + var kept, dropped int + variants[0], kept, dropped = tseqs.Variants() + log.Printf("%s found %d unique tags plus %d repeats", infile, kept, dropped) return err } - infile2 := regexp.MustCompile(`\.1\.fasta(\.gz)?$`).ReplaceAllString(infile, `.2.fasta$1`) + infile2 := fasta1FilenameRe.ReplaceAllString(infile, `.2.fa$1$2`) todo <- func() error { defer phases.Done() log.Printf("%s starting", infile2) defer log.Printf("%s done", infile2) tseqs, err := cmd.tileFasta(tilelib, infile2) - variants[1] = tseqs.Variants() + var kept, dropped int + variants[1], kept, dropped = tseqs.Variants() + log.Printf("%s found %d unique tags plus %d repeats", infile, kept, dropped) return err } - } else { + } else if fastaFilenameRe.MatchString(infile) { + todo <- func() error { + defer phases.Done() + defer phases.Done() + log.Printf("%s starting", infile) + defer log.Printf("%s done", infile) + tseqs, err := cmd.tileFasta(tilelib, infile) + var kept, dropped int + variants[0], kept, dropped = tseqs.Variants() + variants[1] = variants[0] + log.Printf("%s found %d unique tags plus %d repeats", infile, kept, dropped) + return err + } + } else if vcfFilenameRe.MatchString(infile) { for phase := 0; phase < 2; phase++ { phase := phase todo <- func() error { @@ -284,10 +331,14 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { log.Printf("%s phase %d starting", infile, phase+1) defer log.Printf("%s phase %d done", infile, phase+1) tseqs, err := cmd.tileGVCF(tilelib, infile, phase) - variants[phase] = tseqs.Variants() + var kept, dropped int + variants[phase], kept, dropped = tseqs.Variants() + log.Printf("%s phase %d found %d unique tags plus %d repeats", infile, phase+1, kept, dropped) return err } } + } else { + panic(fmt.Sprintf("bug: unhandled filename %q", infile)) } encodeJobs.Add(1) go func() {