Save all ref tile data in slice 0.
authorTom Clegg <tom@tomclegg.ca>
Sun, 26 Sep 2021 02:47:00 +0000 (22:47 -0400)
committerTom Clegg <tom@tomclegg.ca>
Wed, 6 Oct 2021 13:11:42 +0000 (09:11 -0400)
refs #17996

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

gob.go
import.go
slice.go
slicenumpy.go
tilelib.go
tilelib_test.go

diff --git a/gob.go b/gob.go
index 5682db3771ac7002114f1f3a18666d499920a72a..0bd18739612c3b14dc13ff847731479372166100 100644 (file)
--- a/gob.go
+++ b/gob.go
@@ -29,6 +29,7 @@ type CompactSequence struct {
 
 type TileVariant struct {
        Tag      tagID
+       Ref      bool
        Variant  tileVariantID
        Blake2b  [blake2b.Size256]byte
        Sequence []byte
index 79537347ee6f3fa6dfd32f69df00455cb456f477..2f1a268511c0e1cc709a2e43656788e5116c9e1c 100644 (file)
--- a/import.go
+++ b/import.go
@@ -234,7 +234,7 @@ func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error {
        return nil
 }
 
-func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, []importStats, error) {
+func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string, isRef bool) (tileSeq, []importStats, error) {
        var input io.ReadCloser
        input, err := open(infile)
        if err != nil {
@@ -249,7 +249,7 @@ func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, []
                }
                defer input.Close()
        }
-       return tilelib.TileFasta(infile, input, cmd.matchChromosome)
+       return tilelib.TileFasta(infile, input, cmd.matchChromosome, isRef)
 }
 
 func (cmd *importer) loadTagLibrary() (*tagLibrary, error) {
@@ -349,7 +349,7 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error {
                                defer phases.Done()
                                log.Printf("%s starting", infile)
                                defer log.Printf("%s done", infile)
-                               tseqs, stats, err := cmd.tileFasta(tilelib, infile)
+                               tseqs, stats, err := cmd.tileFasta(tilelib, infile, false)
                                allstats[idx*2] = stats
                                var kept, dropped int
                                variants[0], kept, dropped = tseqs.Variants()
@@ -361,7 +361,7 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error {
                                defer phases.Done()
                                log.Printf("%s starting", infile2)
                                defer log.Printf("%s done", infile2)
-                               tseqs, stats, err := cmd.tileFasta(tilelib, infile2)
+                               tseqs, stats, err := cmd.tileFasta(tilelib, infile2, false)
                                allstats[idx*2+1] = stats
                                var kept, dropped int
                                variants[1], kept, dropped = tseqs.Variants()
@@ -374,7 +374,7 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error {
                                defer phases.Done()
                                log.Printf("%s starting", infile)
                                defer log.Printf("%s done", infile)
-                               tseqs, stats, err := cmd.tileFasta(tilelib, infile)
+                               tseqs, stats, err := cmd.tileFasta(tilelib, infile, true)
                                allstats[idx*2] = stats
                                if err != nil {
                                        return err
@@ -540,7 +540,7 @@ func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (t
                return
        }
        defer consensus.Wait()
-       tileseq, stats, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout, cmd.matchChromosome)
+       tileseq, stats, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout, cmd.matchChromosome, false)
        if err != nil {
                return
        }
index 0ee3feaedf99ea5ebb995649984516b870e1816a..5e4c4fc276d28c5bb44ad34d1e473b87e968b6ff 100644 (file)
--- a/slice.go
+++ b/slice.go
@@ -176,7 +176,11 @@ func Slice(tagsPerFile int, dstdir string, srcdirs []string) error {
                                atomic.AddInt64(&countTileVariants, int64(len(ent.TileVariants)))
                                for _, tv := range ent.TileVariants {
                                        tv.Variant = tileVariantID(int(tv.Variant)*namespaces + namespace)
-                                       err := encs[int(tv.Tag)/tagsPerFile].Encode(LibraryEntry{
+                                       fileno := 0
+                                       if !tv.Ref {
+                                               fileno = int(tv.Tag) / tagsPerFile
+                                       }
+                                       err := encs[fileno].Encode(LibraryEntry{
                                                TileVariants: []TileVariant{tv},
                                        })
                                        if err != nil {
index 7b997e7a78b8c217cf4174eee4a3d4cb36b80b9b..c48a5d54a9ce508d7137f24a078168363a2e40dc 100644 (file)
@@ -110,10 +110,12 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
 
        var cgnames []string
        var refseq map[string][]tileLibRef
+       var reftiledata = make(map[tileLibRef][]byte, 11000000)
        in0, err := open(infiles[0])
        if err != nil {
                return 1
        }
+
        taglen := -1
        DecodeLibrary(in0, strings.HasSuffix(infiles[0], ".gz"), func(ent *LibraryEntry) error {
                if len(ent.TagSet) > 0 {
@@ -127,6 +129,11 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                for _, cg := range ent.CompactGenomes {
                        cgnames = append(cgnames, cg.Name)
                }
+               for _, tv := range ent.TileVariants {
+                       if tv.Ref {
+                               reftiledata[tileLibRef{tv.Tag, tv.Variant}] = tv.Sequence
+                       }
+               }
                return nil
        })
        if err != nil {
@@ -166,7 +173,7 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                }
        }
 
-       log.Info("building list of reference tiles to load") // TODO: more efficient if we had saved all ref tiles in slice0
+       log.Info("indexing reference tiles")
        type reftileinfo struct {
                variant  tileVariantID
                seqname  string // chr1
@@ -176,34 +183,15 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
        reftile := map[tagID]*reftileinfo{}
        for seqname, cseq := range refseq {
                for _, libref := range cseq {
-                       reftile[libref.Tag] = &reftileinfo{seqname: seqname, variant: libref.Variant}
-               }
-       }
-       log.Info("loading reference tiles from all slices")
-       throttleCPU := throttle{Max: runtime.GOMAXPROCS(0)}
-       for _, infile := range infiles {
-               infile := infile
-               throttleCPU.Go(func() error {
-                       defer log.Infof("%s: done", infile)
-                       f, err := open(infile)
-                       if err != nil {
-                               return err
+                       reftile[libref.Tag] = &reftileinfo{
+                               seqname:  seqname,
+                               variant:  libref.Variant,
+                               tiledata: reftiledata[libref],
                        }
-                       defer f.Close()
-                       return DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
-                               for _, tv := range ent.TileVariants {
-                                       if dst, ok := reftile[tv.Tag]; ok && dst.variant == tv.Variant {
-                                               dst.tiledata = tv.Sequence
-                                       }
-                               }
-                               return nil
-                       })
-               })
-       }
-       if err = throttleCPU.Wait(); err != nil {
-               return 1
+               }
        }
 
+       throttleCPU := throttle{Max: runtime.GOMAXPROCS(0)}
        log.Info("reconstructing reference sequences")
        for seqname, cseq := range refseq {
                seqname, cseq := seqname, cseq
@@ -213,6 +201,9 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        for _, libref := range cseq {
                                rt := reftile[libref.Tag]
                                rt.pos = pos
+                               if len(rt.tiledata) == 0 {
+                                       return fmt.Errorf("missing tiledata for tag %d variant %d in %s in ref", libref.Tag, libref.Variant, seqname)
+                               }
                                pos += len(rt.tiledata) - taglen
                        }
                        return nil
@@ -239,6 +230,9 @@ func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, s
                        log.Infof("%04d: reading %s", infileIdx, infile)
                        err = DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
                                for _, tv := range ent.TileVariants {
+                                       if tv.Ref {
+                                               continue
+                                       }
                                        variants := seq[tv.Tag]
                                        if len(variants) == 0 {
                                                variants = make([]TileVariant, 100)
index e396e1c9d05ed08c630b0221522c223ae63ec8fa..289f8895e5ba420b00d0d449f9a916cfcaf5d4f5 100644 (file)
@@ -71,6 +71,8 @@ type tileLibrary struct {
        variants       int64
        // if non-nil, write out any tile variants added while tiling
        encoder *gob.Encoder
+       // set Ref flag when writing new variants to encoder
+       encodeRef bool
 
        onAddTileVariant func(libref tileLibRef, hash [blake2b.Size256]byte, seq []byte) error
        onAddGenome      func(CompactGenome) error
@@ -120,7 +122,7 @@ func (tilelib *tileLibrary) loadTileVariants(tvs []TileVariant, variantmap map[t
        for _, tv := range tvs {
                // Assign a new variant ID (unique across all inputs)
                // for each input variant.
-               variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).Variant
+               variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence, tv.Ref).Variant
        }
        return nil
 }
@@ -307,7 +309,7 @@ func (tilelib *tileLibrary) LoadDir(ctx context.Context, path string) error {
                                        mtx.Unlock()
                                }
                                for _, tv := range ent.TileVariants {
-                                       variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence).Variant
+                                       variantmap[tileLibRef{Tag: tv.Tag, Variant: tv.Variant}] = tilelib.getRef(tv.Tag, tv.Sequence, tv.Ref).Variant
                                }
                                cgs = append(cgs, ent.CompactGenomes...)
                                cseqs = append(cseqs, ent.CompactSequences...)
@@ -546,7 +548,7 @@ type importStats struct {
        DroppedOutOfOrderTiles int
 }
 
-func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChromosome *regexp.Regexp) (tileSeq, []importStats, error) {
+func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChromosome *regexp.Regexp, isRef bool) (tileSeq, []importStats, error) {
        ret := tileSeq{}
        type jobT struct {
                label string
@@ -630,7 +632,7 @@ func (tilelib *tileLibrary) TileFasta(filelabel string, rdr io.Reader, matchChro
                                } else {
                                        endpos = found[i+1].pos + taglen
                                }
-                               path[i] = tilelib.getRef(f.tagid, job.fasta[startpos:endpos])
+                               path[i] = tilelib.getRef(f.tagid, job.fasta[startpos:endpos], isRef)
                                if countBases(job.fasta[startpos:endpos]) != endpos-startpos {
                                        atomic.AddInt64(&lowquality, 1)
                                }
@@ -667,7 +669,7 @@ func (tilelib *tileLibrary) Len() int64 {
 
 // Return a tileLibRef for a tile with the given tag and sequence,
 // adding the sequence to the library if needed.
-func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef {
+func (tilelib *tileLibrary) getRef(tag tagID, seq []byte, usedByRef bool) tileLibRef {
        dropSeq := false
        if !tilelib.retainNoCalls {
                for _, b := range seq {
@@ -788,6 +790,7 @@ func (tilelib *tileLibrary) getRef(tag tagID, seq []byte) tileLibRef {
                tilelib.encoder.Encode(LibraryEntry{
                        TileVariants: []TileVariant{{
                                Tag:      tag,
+                               Ref:      usedByRef,
                                Variant:  variant,
                                Blake2b:  seqhash,
                                Sequence: saveSeq,
index 1e5b35c46c48e1ebf275057c4b4ec2d8cd34ae39..5801b9baf3b6c864b09a3882b7d4b2eabd05e68c 100644 (file)
@@ -47,7 +47,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) {
                s.tag[0]+
                "cccccccccccccccccccc\n"+
                s.tag[2]+
-               "\n"), matchAllChromosomes)
+               "\n"), matchAllChromosomes, false)
        c.Assert(err, check.IsNil)
        c.Check(tseq, check.DeepEquals, tileSeq{"test-seq": []tileLibRef{{4, 1}, {0, 1}, {2, 1}}})
 
@@ -59,7 +59,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) {
                s.tag[1]+
                "ggggggggggggggggggggggg\n"+
                s.tag[2]+
-               "\n"), matchAllChromosomes)
+               "\n"), matchAllChromosomes, false)
        c.Assert(err, check.IsNil)
        c.Check(tseq, check.DeepEquals, tileSeq{"test-seq": []tileLibRef{{0, 1}, {1, 1}, {2, 1}}})
 
@@ -71,7 +71,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) {
                s.tag[3]+
                "ggggggggggggggggggggggg\n"+
                s.tag[4]+
-               "\n"), matchAllChromosomes)
+               "\n"), matchAllChromosomes, false)
        c.Assert(err, check.IsNil)
        c.Check(tseq, check.DeepEquals, tileSeq{"test-seq": []tileLibRef{{2, 1}, {3, 1}, {4, 1}}})
 
@@ -83,7 +83,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) {
                s.tag[0]+
                "ggggggggggggggggggggggg\n"+
                s.tag[2]+
-               "\n"), matchAllChromosomes)
+               "\n"), matchAllChromosomes, false)
        c.Assert(err, check.IsNil)
        c.Check(tseq, check.DeepEquals, tileSeq{"test-seq": []tileLibRef{{0, 1}, {2, 1}}})
 
@@ -95,7 +95,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) {
                s.tag[2]+
                "ggggggggggggggggggggggg\n"+
                s.tag[1]+
-               "\n"), matchAllChromosomes)
+               "\n"), matchAllChromosomes, false)
        c.Assert(err, check.IsNil)
        c.Check(tseq, check.DeepEquals, tileSeq{"test-seq": []tileLibRef{{0, 1}, {1, 1}}})
 
@@ -109,7 +109,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) {
                s.tag[1]+
                "ggggggggggggggggggggggg\n"+
                s.tag[2]+
-               "\n"), matchAllChromosomes)
+               "\n"), matchAllChromosomes, false)
        c.Assert(err, check.IsNil)
        c.Check(tseq, check.DeepEquals, tileSeq{"test-seq": []tileLibRef{{0, 1}, {1, 1}, {2, 1}}})
 
@@ -125,7 +125,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) {
                s.tag[0]+
                "ggggggggggggggggggggggg\n"+
                s.tag[4]+
-               "\n"), matchAllChromosomes)
+               "\n"), matchAllChromosomes, false)
        c.Assert(err, check.IsNil)
        c.Check(tseq, check.DeepEquals, tileSeq{"test-seq": []tileLibRef{{0, 1}, {1, 1}, {3, 1}, {4, 1}}})
 
@@ -137,7 +137,7 @@ func (s *tilelibSuite) TestSkipOOO(c *check.C) {
                s.tag[1]+
                "ggggggggggggggggggggggg\n"+
                s.tag[3]+
-               "\n"), matchAllChromosomes)
+               "\n"), matchAllChromosomes, false)
        c.Assert(err, check.IsNil)
        c.Check(tseq, check.DeepEquals, tileSeq{"test-seq": []tileLibRef{{0, 1}, {1, 1}, {3, 1}}})
 }