From: Tom Clegg Date: Sun, 26 Sep 2021 02:47:00 +0000 (-0400) Subject: Save all ref tile data in slice 0. X-Git-Url: https://git.arvados.org/lightning.git/commitdiff_plain/4c5a84272388853c36b290f687a76df53c6c4b3d Save all ref tile data in slice 0. refs #17996 Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/gob.go b/gob.go index 5682db3771..0bd1873961 100644 --- a/gob.go +++ b/gob.go @@ -29,6 +29,7 @@ type CompactSequence struct { type TileVariant struct { Tag tagID + Ref bool Variant tileVariantID Blake2b [blake2b.Size256]byte Sequence []byte diff --git a/import.go b/import.go index 79537347ee..2f1a268511 100644 --- a/import.go +++ b/import.go @@ -234,7 +234,7 @@ func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error { return nil } -func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, []importStats, error) { +func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string, isRef bool) (tileSeq, []importStats, error) { var input io.ReadCloser input, err := open(infile) if err != nil { @@ -249,7 +249,7 @@ func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, [] } defer input.Close() } - return tilelib.TileFasta(infile, input, cmd.matchChromosome) + return tilelib.TileFasta(infile, input, cmd.matchChromosome, isRef) } func (cmd *importer) loadTagLibrary() (*tagLibrary, error) { @@ -349,7 +349,7 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { defer phases.Done() log.Printf("%s starting", infile) defer log.Printf("%s done", infile) - tseqs, stats, err := cmd.tileFasta(tilelib, infile) + tseqs, stats, err := cmd.tileFasta(tilelib, infile, false) allstats[idx*2] = stats var kept, dropped int variants[0], kept, dropped = tseqs.Variants() @@ -361,7 +361,7 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { defer phases.Done() log.Printf("%s starting", infile2) defer log.Printf("%s done", infile2) - tseqs, stats, err := cmd.tileFasta(tilelib, infile2) + tseqs, stats, err := cmd.tileFasta(tilelib, infile2, false) allstats[idx*2+1] = stats var kept, dropped int variants[1], kept, dropped = tseqs.Variants() @@ -374,7 +374,7 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { defer phases.Done() log.Printf("%s starting", infile) defer log.Printf("%s done", infile) - tseqs, stats, err := cmd.tileFasta(tilelib, infile) + tseqs, stats, err := cmd.tileFasta(tilelib, infile, true) allstats[idx*2] = stats if err != nil { return err @@ -540,7 +540,7 @@ func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (t return } defer consensus.Wait() - tileseq, stats, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout, cmd.matchChromosome) + tileseq, stats, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout, cmd.matchChromosome, false) if err != nil { return } diff --git a/slice.go b/slice.go index 0ee3feaedf..5e4c4fc276 100644 --- a/slice.go +++ b/slice.go @@ -176,7 +176,11 @@ func Slice(tagsPerFile int, dstdir string, srcdirs []string) error { atomic.AddInt64(&countTileVariants, int64(len(ent.TileVariants))) for _, tv := range ent.TileVariants { tv.Variant = tileVariantID(int(tv.Variant)*namespaces + namespace) - err := encs[int(tv.Tag)/tagsPerFile].Encode(LibraryEntry{ + fileno := 0 + if !tv.Ref { + fileno = int(tv.Tag) / tagsPerFile + } + err := encs[fileno].Encode(LibraryEntry{ TileVariants: []TileVariant{tv}, }) if err != nil { diff --git a/slicenumpy.go b/slicenumpy.go index 7b997e7a78..c48a5d54a9 100644 --- a/slicenumpy.go +++ b/slicenumpy.go @@ -110,10 +110,12 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s var cgnames []string var refseq map[string][]tileLibRef + var reftiledata = make(map[tileLibRef][]byte, 11000000) in0, err := open(infiles[0]) if err != nil { return 1 } + taglen := -1 DecodeLibrary(in0, strings.HasSuffix(infiles[0], ".gz"), func(ent *LibraryEntry) error { if len(ent.TagSet) > 0 { @@ -127,6 +129,11 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s for _, cg := range ent.CompactGenomes { cgnames = append(cgnames, cg.Name) } + for _, tv := range ent.TileVariants { + if tv.Ref { + reftiledata[tileLibRef{tv.Tag, tv.Variant}] = tv.Sequence + } + } return nil }) if err != nil { @@ -166,7 +173,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s } } - log.Info("building list of reference tiles to load") // TODO: more efficient if we had saved all ref tiles in slice0 + log.Info("indexing reference tiles") type reftileinfo struct { variant tileVariantID seqname string // chr1 @@ -176,34 +183,15 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s reftile := map[tagID]*reftileinfo{} for seqname, cseq := range refseq { for _, libref := range cseq { - reftile[libref.Tag] = &reftileinfo{seqname: seqname, variant: libref.Variant} - } - } - log.Info("loading reference tiles from all slices") - throttleCPU := throttle{Max: runtime.GOMAXPROCS(0)} - for _, infile := range infiles { - infile := infile - throttleCPU.Go(func() error { - defer log.Infof("%s: done", infile) - f, err := open(infile) - if err != nil { - return err + reftile[libref.Tag] = &reftileinfo{ + seqname: seqname, + variant: libref.Variant, + tiledata: reftiledata[libref], } - defer f.Close() - return DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error { - for _, tv := range ent.TileVariants { - if dst, ok := reftile[tv.Tag]; ok && dst.variant == tv.Variant { - dst.tiledata = tv.Sequence - } - } - return nil - }) - }) - } - if err = throttleCPU.Wait(); err != nil { - return 1 + } } + throttleCPU := throttle{Max: runtime.GOMAXPROCS(0)} log.Info("reconstructing reference sequences") for seqname, cseq := range refseq { seqname, cseq := seqname, cseq @@ -213,6 +201,9 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s for _, libref := range cseq { rt := reftile[libref.Tag] rt.pos = pos + if len(rt.tiledata) == 0 { + return fmt.Errorf("missing tiledata for tag %d variant %d in %s in ref", libref.Tag, libref.Variant, seqname) + } pos += len(rt.tiledata) - taglen } return nil @@ -239,6 +230,9 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s log.Infof("%04d: reading %s", infileIdx, infile) err = DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error { for _, tv := range ent.TileVariants { + if tv.Ref { + continue + } variants := seq[tv.Tag] if len(variants) == 0 { variants = make([]TileVariant, 100) diff --git a/tilelib.go b/tilelib.go index e396e1c9d0..289f8895e5 100644 --- a/tilelib.go +++ b/tilelib.go @@ -71,6 +71,8 @@ type tileLibrary struct { variants int64 // if non-nil, write out any tile variants added while tiling encoder *gob.Encoder + // set Ref flag when writing new variants to encoder + encodeRef bool onAddTileVariant func(libref tileLibRef, hash [blake2b.Size256]byte, seq []byte) error onAddGenome func(CompactGenome) error @@ -120,7 +122,7 @@ func (tilelib *tileLibrary) loadTileVariants(tvs []TileVariant, variantmap map[t for _, tv := range tvs { // Assign a new variant ID (unique across all inputs) // for each input variant. - variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).Variant + variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence, tv.Ref).Variant } return nil } @@ -307,7 +309,7 @@ func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string) error { mtx.Unlock() } for _, tv := range ent.TileVariants { - variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).Variant + variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence, tv.Ref).Variant } cgs = append(cgs, ent.CompactGenomes...) cseqs = append(cseqs, ent.CompactSequences...) @@ -546,7 +548,7 @@ type importStats struct { DroppedOutOfOrderTiles int } -func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChromosome *regexp.Regexp) (tileSeq, []importStats, error) { +func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChromosome *regexp.Regexp, isRef bool) (tileSeq, []importStats, error) { ret := tileSeq{} type jobT struct { label string @@ -630,7 +632,7 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChro } else { endpos = found[i+1].pos + taglen } - path[i] = tilelib.getRef(f.tagid, job.fasta[startpos:endpos]) + path[i] = tilelib.getRef(f.tagid, job.fasta[startpos:endpos], isRef) if countBases(job.fasta[startpos:endpos]) != endpos-startpos { atomic.AddInt64(&lowquality, 1) } @@ -667,7 +669,7 @@ func (tilelib *tileLibrary) Len() int64 { // Return a tileLibRef for a tile with the given tag and sequence, // adding the sequence to the library if needed. -func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef { +func (tilelib *tileLibrary) getRef(tag tagID, seq []byte, usedByRef bool) tileLibRef { dropSeq := false if !tilelib.retainNoCalls { for _, b := range seq { @@ -788,6 +790,7 @@ func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef { tilelib.encoder.Encode(LibraryEntry{ TileVariants: []TileVariant{{ Tag: tag, + Ref: usedByRef, Variant: variant, Blake2b: seqhash, Sequence: saveSeq, diff --git a/tilelib_test.go b/tilelib_test.go index 1e5b35c46c..5801b9baf3 100644 --- a/tilelib_test.go +++ b/tilelib_test.go @@ -47,7 +47,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) { s.tag[0]+ "cccccccccccccccccccc\n"+ s.tag[2]+ - "\n"), matchAllChromosomes) + "\n"), matchAllChromosomes, false) c.Assert(err, check.IsNil) c.Check(tseq, check.DeepEquals, tileSeq{"test-seq": []tileLibRef{{4, 1}, {0, 1}, {2, 1}}}) @@ -59,7 +59,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) { s.tag[1]+ "ggggggggggggggggggggggg\n"+ s.tag[2]+ - "\n"), matchAllChromosomes) + "\n"), matchAllChromosomes, false) c.Assert(err, check.IsNil) c.Check(tseq, check.DeepEquals, tileSeq{"test-seq": []tileLibRef{{0, 1}, {1, 1}, {2, 1}}}) @@ -71,7 +71,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) { s.tag[3]+ "ggggggggggggggggggggggg\n"+ s.tag[4]+ - "\n"), matchAllChromosomes) + "\n"), matchAllChromosomes, false) c.Assert(err, check.IsNil) c.Check(tseq, check.DeepEquals, tileSeq{"test-seq": []tileLibRef{{2, 1}, {3, 1}, {4, 1}}}) @@ -83,7 +83,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) { s.tag[0]+ "ggggggggggggggggggggggg\n"+ s.tag[2]+ - "\n"), matchAllChromosomes) + "\n"), matchAllChromosomes, false) c.Assert(err, check.IsNil) c.Check(tseq, check.DeepEquals, tileSeq{"test-seq": []tileLibRef{{0, 1}, {2, 1}}}) @@ -95,7 +95,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) { s.tag[2]+ "ggggggggggggggggggggggg\n"+ s.tag[1]+ - "\n"), matchAllChromosomes) + "\n"), matchAllChromosomes, false) c.Assert(err, check.IsNil) c.Check(tseq, check.DeepEquals, tileSeq{"test-seq": []tileLibRef{{0, 1}, {1, 1}}}) @@ -109,7 +109,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) { s.tag[1]+ "ggggggggggggggggggggggg\n"+ s.tag[2]+ - "\n"), matchAllChromosomes) + "\n"), matchAllChromosomes, false) c.Assert(err, check.IsNil) c.Check(tseq, check.DeepEquals, tileSeq{"test-seq": []tileLibRef{{0, 1}, {1, 1}, {2, 1}}}) @@ -125,7 +125,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) { s.tag[0]+ "ggggggggggggggggggggggg\n"+ s.tag[4]+ - "\n"), matchAllChromosomes) + "\n"), matchAllChromosomes, false) c.Assert(err, check.IsNil) c.Check(tseq, check.DeepEquals, tileSeq{"test-seq": []tileLibRef{{0, 1}, {1, 1}, {3, 1}, {4, 1}}}) @@ -137,7 +137,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) { s.tag[1]+ "ggggggggggggggggggggggg\n"+ s.tag[3]+ - "\n"), matchAllChromosomes) + "\n"), matchAllChromosomes, false) c.Assert(err, check.IsNil) c.Check(tseq, check.DeepEquals, tileSeq{"test-seq": []tileLibRef{{0, 1}, {1, 1}, {3, 1}}}) }