X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/f344dfcf761863e4b8fdbd4dd1fb515903bf01cf..69b71af4136fdaeeb5c2afbc559208dc5f428c48:/import.go diff --git a/import.go b/import.go index 433ed65bcc..629fccac2f 100644 --- a/import.go +++ b/import.go @@ -4,6 +4,7 @@ import ( "bufio" "compress/gzip" "encoding/gob" + "encoding/json" "errors" "flag" "fmt" @@ -22,16 +23,25 @@ import ( "time" "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/lucasb-eyer/go-colorful" log "github.com/sirupsen/logrus" + "gonum.org/v1/plot" + "gonum.org/v1/plot/plotter" + "gonum.org/v1/plot/vg" + "gonum.org/v1/plot/vg/draw" ) type importer struct { - tagLibraryFile string - refFile string - outputFile string - projectUUID string - runLocal bool - encoder *gob.Encoder + tagLibraryFile string + refFile string + outputFile string + projectUUID string + runLocal bool + skipOOO bool + outputTiles bool + saveIncompleteTiles bool + outputStats string + encoder *gob.Encoder } func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { @@ -48,7 +58,13 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std flags.StringVar(&cmd.outputFile, "o", "-", "output `file`") flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for output data") flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)") + flags.BoolVar(&cmd.skipOOO, "skip-ooo", false, "skip out-of-order tags") + flags.BoolVar(&cmd.outputTiles, "output-tiles", false, "include tile variant sequences in output file") + flags.BoolVar(&cmd.saveIncompleteTiles, "save-incomplete-tiles", false, "treat tiles with no-calls as regular tiles") + flags.StringVar(&cmd.outputStats, "output-stats", "", "output stats to `file` (json)") + priority := flags.Int("priority", 500, "container request priority") pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`") + loglevel := flags.String("loglevel", "info", "logging threshold (trace, debug, info, warn, error, fatal, or panic)") err = flags.Parse(args) if err == flag.ErrHelp { err = nil @@ -69,13 +85,20 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std }() } + lvl, err := log.ParseLevel(*loglevel) + if err != nil { + return 2 + } + log.SetLevel(lvl) + if !cmd.runLocal { runner := arvadosContainerRunner{ Name: "lightning import", Client: arvados.NewClientFromEnv(), ProjectUUID: cmd.projectUUID, - RAM: 30000000000, - VCPUs: 16, + RAM: 80000000000, + VCPUs: 32, + Priority: *priority, } err = runner.TranslatePaths(&cmd.tagLibraryFile, &cmd.refFile, &cmd.outputFile) if err != nil { @@ -89,7 +112,7 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std } } if cmd.outputFile == "-" { - cmd.outputFile = "/mnt/output/library.gob" + cmd.outputFile = "/mnt/output/library.gob.gz" } else { // Not yet implemented, but this should write // the collection to an existing collection, @@ -97,13 +120,23 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std err = errors.New("cannot specify output file in container mode: not implemented") return 1 } - runner.Args = append([]string{"import", "-local=true", "-tag-library", cmd.tagLibraryFile, "-ref", cmd.refFile, "-o", cmd.outputFile}, inputs...) + runner.Args = append([]string{"import", + "-local=true", + "-loglevel=" + *loglevel, + fmt.Sprintf("-skip-ooo=%v", cmd.skipOOO), + fmt.Sprintf("-output-tiles=%v", cmd.outputTiles), + fmt.Sprintf("-save-incomplete-tiles=%v", cmd.saveIncompleteTiles), + "-output-stats", "/mnt/output/stats.json", + "-tag-library", cmd.tagLibraryFile, + "-ref", cmd.refFile, + "-o", cmd.outputFile, + }, inputs...) var output string output, err = runner.Run() if err != nil { return 1 } - fmt.Fprintln(stdout, output+"/library.gob") + fmt.Fprintln(stdout, output+"/library.gob.gz") return 0 } @@ -112,29 +145,40 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std return 1 } - tilelib, err := cmd.loadTileLibrary() + taglib, err := cmd.loadTagLibrary() if err != nil { return 1 } - go func() { - for range time.Tick(10 * time.Minute) { - log.Printf("tilelib.Len() == %d", tilelib.Len()) - } - }() - var output io.WriteCloser + var outw, outf io.WriteCloser if cmd.outputFile == "-" { - output = nopCloser{stdout} + outw = nopCloser{stdout} } else { - output, err = os.OpenFile(cmd.outputFile, os.O_CREATE|os.O_WRONLY, 0777) + outf, err = os.OpenFile(cmd.outputFile, os.O_CREATE|os.O_WRONLY, 0777) if err != nil { return 1 } - defer output.Close() + defer outf.Close() + if strings.HasSuffix(cmd.outputFile, ".gz") { + outw = gzip.NewWriter(outf) + } else { + outw = outf + } } - bufw := bufio.NewWriter(output) + bufw := bufio.NewWriter(outw) cmd.encoder = gob.NewEncoder(bufw) + tilelib := &tileLibrary{taglib: taglib, retainNoCalls: cmd.saveIncompleteTiles, skipOOO: cmd.skipOOO} + if cmd.outputTiles { + cmd.encoder.Encode(LibraryEntry{TagSet: taglib.Tags()}) + tilelib.encoder = cmd.encoder + } + go func() { + for range time.Tick(10 * time.Minute) { + log.Printf("tilelib.Len() == %d", tilelib.Len()) + } + }() + err = cmd.tileInputs(tilelib, infiles) if err != nil { return 1 @@ -143,31 +187,37 @@ func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, std if err != nil { return 1 } - err = output.Close() + err = outw.Close() if err != nil { return 1 } + if outf != nil && outf != outw { + err = outf.Close() + if err != nil { + return 1 + } + } return 0 } -func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, error) { +func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, []importStats, error) { var input io.ReadCloser input, err := os.Open(infile) if err != nil { - return nil, err + return nil, nil, err } defer input.Close() if strings.HasSuffix(infile, ".gz") { input, err = gzip.NewReader(input) if err != nil { - return nil, err + return nil, nil, err } defer input.Close() } return tilelib.TileFasta(infile, input) } -func (cmd *importer) loadTileLibrary() (*tileLibrary, error) { +func (cmd *importer) loadTagLibrary() (*tagLibrary, error) { log.Printf("tag library %s load starting", cmd.tagLibraryFile) f, err := os.Open(cmd.tagLibraryFile) if err != nil { @@ -191,15 +241,22 @@ func (cmd *importer) loadTileLibrary() (*tileLibrary, error) { return nil, fmt.Errorf("cannot tile: tag library is empty") } log.Printf("tag library %s load done", cmd.tagLibraryFile) - return &tileLibrary{taglib: &taglib}, nil + return &taglib, nil } +var ( + vcfFilenameRe = regexp.MustCompile(`\.vcf(\.gz)?$`) + fasta1FilenameRe = regexp.MustCompile(`\.1\.fa(sta)?(\.gz)?$`) + fasta2FilenameRe = regexp.MustCompile(`\.2\.fa(sta)?(\.gz)?$`) + fastaFilenameRe = regexp.MustCompile(`\.fa(sta)?(\.gz)?$`) +) + func listInputFiles(paths []string) (files []string, err error) { for _, path := range paths { if fi, err := os.Stat(path); err != nil { return nil, fmt.Errorf("%s: stat failed: %s", path, err) } else if !fi.IsDir() { - if !strings.HasSuffix(path, ".2.fasta") || strings.HasSuffix(path, ".2.fasta.gz") { + if !fasta2FilenameRe.MatchString(path) { files = append(files, path) } continue @@ -215,23 +272,27 @@ func listInputFiles(paths []string) (files []string, err error) { } sort.Strings(names) for _, name := range names { - if strings.HasSuffix(name, ".vcf") || strings.HasSuffix(name, ".vcf.gz") { + if vcfFilenameRe.MatchString(name) { files = append(files, filepath.Join(path, name)) - } else if strings.HasSuffix(name, ".1.fasta") || strings.HasSuffix(name, ".1.fasta.gz") { + } else if fastaFilenameRe.MatchString(name) && !fasta2FilenameRe.MatchString(name) { files = append(files, filepath.Join(path, name)) } } d.Close() } for _, file := range files { - if strings.HasSuffix(file, ".1.fasta") || strings.HasSuffix(file, ".1.fasta.gz") { - continue - } else if _, err := os.Stat(file + ".csi"); err == nil { - continue - } else if _, err = os.Stat(file + ".tbi"); err == nil { + if fastaFilenameRe.MatchString(file) { continue + } else if vcfFilenameRe.MatchString(file) { + if _, err := os.Stat(file + ".csi"); err == nil { + continue + } else if _, err = os.Stat(file + ".tbi"); err == nil { + continue + } else { + return nil, fmt.Errorf("%s: cannot read without .tbi or .csi index file", file) + } } else { - return nil, fmt.Errorf("%s: cannot read without .tbi or .csi index file", file) + return nil, fmt.Errorf("don't know how to handle filename %s", file) } } return @@ -241,42 +302,77 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { starttime := time.Now() errs := make(chan error, 1) todo := make(chan func() error, len(infiles)*2) + allstats := make([][]importStats, len(infiles)*2) var encodeJobs sync.WaitGroup - for _, infile := range infiles { - infile := infile + for idx, infile := range infiles { + idx, infile := idx, infile var phases sync.WaitGroup phases.Add(2) variants := make([][]tileVariantID, 2) - if strings.HasSuffix(infile, ".1.fasta") || strings.HasSuffix(infile, ".1.fasta.gz") { + if fasta1FilenameRe.MatchString(infile) { todo <- func() error { defer phases.Done() log.Printf("%s starting", infile) defer log.Printf("%s done", infile) - tseqs, err := cmd.tileFasta(tilelib, infile) - variants[0] = tseqs.Variants() + tseqs, stats, err := cmd.tileFasta(tilelib, infile) + allstats[idx*2] = stats + var kept, dropped int + variants[0], kept, dropped = tseqs.Variants() + log.Printf("%s found %d unique tags plus %d repeats", infile, kept, dropped) return err } - infile2 := regexp.MustCompile(`\.1\.fasta(\.gz)?$`).ReplaceAllString(infile, `.2.fasta$1`) + infile2 := fasta1FilenameRe.ReplaceAllString(infile, `.2.fa$1$2`) todo <- func() error { defer phases.Done() log.Printf("%s starting", infile2) defer log.Printf("%s done", infile2) - tseqs, err := cmd.tileFasta(tilelib, infile2) - variants[1] = tseqs.Variants() + tseqs, stats, err := cmd.tileFasta(tilelib, infile2) + allstats[idx*2+1] = stats + var kept, dropped int + variants[1], kept, dropped = tseqs.Variants() + log.Printf("%s found %d unique tags plus %d repeats", infile2, kept, dropped) + return err } - } else { + } else if fastaFilenameRe.MatchString(infile) { + todo <- func() error { + defer phases.Done() + defer phases.Done() + log.Printf("%s starting", infile) + defer log.Printf("%s done", infile) + tseqs, stats, err := cmd.tileFasta(tilelib, infile) + allstats[idx*2] = stats + if err != nil { + return err + } + totlen := 0 + for _, tseq := range tseqs { + totlen += len(tseq) + } + log.Printf("%s tiled %d seqs, total len %d", infile, len(tseqs), totlen) + return cmd.encoder.Encode(LibraryEntry{ + CompactSequences: []CompactSequence{{Name: infile, TileSequences: tseqs}}, + }) + } + // Don't write out a CompactGenomes entry + continue + } else if vcfFilenameRe.MatchString(infile) { for phase := 0; phase < 2; phase++ { phase := phase todo <- func() error { defer phases.Done() log.Printf("%s phase %d starting", infile, phase+1) defer log.Printf("%s phase %d done", infile, phase+1) - tseqs, err := cmd.tileGVCF(tilelib, infile, phase) - variants[phase] = tseqs.Variants() + tseqs, stats, err := cmd.tileGVCF(tilelib, infile, phase) + allstats[idx*2] = stats + var kept, dropped int + variants[phase], kept, dropped = tseqs.Variants() + log.Printf("%s phase %d found %d unique tags plus %d repeats", infile, phase+1, kept, dropped) return err } } + } else { + panic(fmt.Sprintf("bug: unhandled filename %q", infile)) } encodeJobs.Add(1) go func() { @@ -285,17 +381,8 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { if len(errs) > 0 { return } - ntags := len(variants[0]) - if ntags < len(variants[1]) { - ntags = len(variants[1]) - } - flat := make([]tileVariantID, ntags*2) - for i := 0; i < ntags; i++ { - flat[i*2] = variants[0][i] - flat[i*2+1] = variants[1][i] - } err := cmd.encoder.Encode(LibraryEntry{ - CompactGenomes: []CompactGenome{{Name: infile, Variants: flat}}, + CompactGenomes: []CompactGenome{{Name: infile, Variants: flatten(variants)}}, }) if err != nil { select { @@ -326,19 +413,42 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { } } remain := len(todo) + int(atomic.LoadInt64(&running)) - 1 - ttl := time.Now().Sub(starttime) * time.Duration(remain) / time.Duration(cap(todo)-remain) - eta := time.Now().Add(ttl) - log.Printf("progress %d/%d, eta %v (%v)", cap(todo)-remain, cap(todo), eta, ttl) + if remain < cap(todo) { + ttl := time.Now().Sub(starttime) * time.Duration(remain) / time.Duration(cap(todo)-remain) + eta := time.Now().Add(ttl) + log.Printf("progress %d/%d, eta %v (%v)", cap(todo)-remain, cap(todo), eta, ttl) + } } }() } tileJobs.Wait() encodeJobs.Wait() + go close(errs) - return <-errs + err := <-errs + if err != nil { + return err + } + + if cmd.outputStats != "" { + f, err := os.OpenFile(cmd.outputStats, os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return err + } + var flatstats []importStats + for _, stats := range allstats { + flatstats = append(flatstats, stats...) + } + err = json.NewEncoder(f).Encode(flatstats) + if err != nil { + return err + } + } + + return nil } -func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, err error) { +func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, stats []importStats, err error) { if cmd.refFile == "" { err = errors.New("cannot import vcf: reference data (-ref) not specified") return @@ -370,7 +480,7 @@ func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (t return } defer consensus.Wait() - tileseq, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout) + tileseq, stats, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout) if err != nil { return } @@ -385,3 +495,91 @@ func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (t } return } + +func flatten(variants [][]tileVariantID) []tileVariantID { + ntags := 0 + for _, v := range variants { + if ntags < len(v) { + ntags = len(v) + } + } + flat := make([]tileVariantID, ntags*2) + for i := 0; i < ntags; i++ { + for hap := 0; hap < 2; hap++ { + if i < len(variants[hap]) { + flat[i*2+hap] = variants[hap][i] + } + } + } + return flat +} + +type importstatsplot struct{} + +func (cmd *importstatsplot) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { + err := cmd.Plot(stdin, stdout) + if err != nil { + log.Errorf("%s", err) + return 1 + } + return 0 +} + +func (cmd *importstatsplot) Plot(stdin io.Reader, stdout io.Writer) error { + var stats []importStats + err := json.NewDecoder(stdin).Decode(&stats) + if err != nil { + return err + } + + p, err := plot.New() + if err != nil { + return err + } + p.Title.Text = "coverage preserved by import (excl X<0.65)" + p.X.Label.Text = "input base calls ÷ sequence length" + p.Y.Label.Text = "output base calls ÷ input base calls" + p.Add(plotter.NewGrid()) + + data := map[string]plotter.XYs{} + for _, stat := range stats { + data[stat.InputLabel] = append(data[stat.InputLabel], plotter.XY{ + X: float64(stat.InputCoverage) / float64(stat.InputLength), + Y: float64(stat.TileCoverage) / float64(stat.InputCoverage), + }) + } + + labels := []string{} + for label := range data { + labels = append(labels, label) + } + sort.Strings(labels) + palette, err := colorful.SoftPalette(len(labels)) + if err != nil { + return err + } + nextInPalette := 0 + for idx, label := range labels { + s, err := plotter.NewScatter(data[label]) + if err != nil { + return err + } + s.GlyphStyle.Color = palette[idx] + s.GlyphStyle.Radius = vg.Millimeter / 2 + s.GlyphStyle.Shape = draw.CrossGlyph{} + nextInPalette += 7 + p.Add(s) + if false { + p.Legend.Add(label, s) + } + } + p.X.Min = 0.65 + p.X.Max = 1 + + w, err := p.WriterTo(8*vg.Inch, 6*vg.Inch, "svg") + if err != nil { + return err + } + _, err = w.WriteTo(stdout) + return err +}