Fix some tests.
[lightning.git] / import.go
index 7a8820620b050417ddfdccee2270175239c77d43..20058dc8a9f184dbc5c88c9d32cd32399dd3da34 100644 (file)
--- a/import.go
+++ b/import.go
@@ -1,3 +1,7 @@
+// Copyright (C) The Lightning Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package lightning
 
 import (
@@ -42,6 +46,7 @@ type importer struct {
        outputStats         string
        matchChromosome     *regexp.Regexp
        encoder             *gob.Encoder
+       retainAfterEncoding bool // keep imported genomes/refseqs in memory after writing to disk
        batchArgs
 }
 
@@ -180,7 +185,7 @@ func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error {
                Client:      arvadosClientFromEnv,
                ProjectUUID: cmd.projectUUID,
                APIAccess:   true,
-               RAM:         700000000000,
+               RAM:         350000000000,
                VCPUs:       96,
                Priority:    cmd.priority,
                KeepCache:   1,
@@ -229,7 +234,7 @@ func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error {
        return nil
 }
 
-func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, []importStats, error) {
+func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string, isRef bool) (tileSeq, []importStats, error) {
        var input io.ReadCloser
        input, err := open(infile)
        if err != nil {
@@ -244,7 +249,7 @@ func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, []
                }
                defer input.Close()
        }
-       return tilelib.TileFasta(infile, input, cmd.matchChromosome)
+       return tilelib.TileFasta(infile, input, cmd.matchChromosome, isRef)
 }
 
 func (cmd *importer) loadTagLibrary() (*tagLibrary, error) {
@@ -276,8 +281,8 @@ func (cmd *importer) loadTagLibrary() (*tagLibrary, error) {
 
 var (
        vcfFilenameRe    = regexp.MustCompile(`\.vcf(\.gz)?$`)
-       fasta1FilenameRe = regexp.MustCompile(`\.1\.fa(sta)?(\.gz)?$`)
-       fasta2FilenameRe = regexp.MustCompile(`\.2\.fa(sta)?(\.gz)?$`)
+       fasta1FilenameRe = regexp.MustCompile(`\.1\.fa(sta)?(\.fa(sta)?)?(\.gz)?$`)
+       fasta2FilenameRe = regexp.MustCompile(`\.2\.fa(sta)?(\.fa(sta)?)?(\.gz)?$`)
        fastaFilenameRe  = regexp.MustCompile(`\.fa(sta)?(\.gz)?$`)
 )
 
@@ -342,35 +347,34 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error {
                if fasta1FilenameRe.MatchString(infile) {
                        todo <- func() error {
                                defer phases.Done()
-                               log.Printf("%s starting", infile)
+                               log.Printf("%s (sample.1) starting tiling", infile)
                                defer log.Printf("%s done", infile)
-                               tseqs, stats, err := cmd.tileFasta(tilelib, infile)
+                               tseqs, stats, err := cmd.tileFasta(tilelib, infile, false)
                                allstats[idx*2] = stats
                                var kept, dropped int
                                variants[0], kept, dropped = tseqs.Variants()
-                               log.Printf("%s found %d unique tags plus %d repeats", infile, kept, dropped)
+                               log.Printf("%s (sample.1) found %d unique tags plus %d repeats", infile, kept, dropped)
                                return err
                        }
-                       infile2 := fasta1FilenameRe.ReplaceAllString(infile, `.2.fa$1$2`)
+                       infile2 := fasta1FilenameRe.ReplaceAllString(infile, `.2.fa$1$2$4`)
                        todo <- func() error {
                                defer phases.Done()
-                               log.Printf("%s starting", infile2)
+                               log.Printf("%s (sample.2) starting tiling", infile2)
                                defer log.Printf("%s done", infile2)
-                               tseqs, stats, err := cmd.tileFasta(tilelib, infile2)
+                               tseqs, stats, err := cmd.tileFasta(tilelib, infile2, false)
                                allstats[idx*2+1] = stats
                                var kept, dropped int
                                variants[1], kept, dropped = tseqs.Variants()
-                               log.Printf("%s found %d unique tags plus %d repeats", infile2, kept, dropped)
-
+                               log.Printf("%s (sample.2) found %d unique tags plus %d repeats", infile2, kept, dropped)
                                return err
                        }
                } else if fastaFilenameRe.MatchString(infile) {
                        todo <- func() error {
                                defer phases.Done()
                                defer phases.Done()
-                               log.Printf("%s starting", infile)
+                               log.Printf("%s (reference) starting tiling", infile)
                                defer log.Printf("%s done", infile)
-                               tseqs, stats, err := cmd.tileFasta(tilelib, infile)
+                               tseqs, stats, err := cmd.tileFasta(tilelib, infile, true)
                                allstats[idx*2] = stats
                                if err != nil {
                                        return err
@@ -379,7 +383,17 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error {
                                for _, tseq := range tseqs {
                                        totlen += len(tseq)
                                }
-                               log.Printf("%s tiled %d seqs, total len %d", infile, len(tseqs), totlen)
+                               log.Printf("%s (reference) tiled %d seqs, total len %d", infile, len(tseqs), totlen)
+
+                               if cmd.retainAfterEncoding {
+                                       tilelib.mtx.Lock()
+                                       if tilelib.refseqs == nil {
+                                               tilelib.refseqs = map[string]map[string][]tileLibRef{}
+                                       }
+                                       tilelib.refseqs[infile] = tseqs
+                                       tilelib.mtx.Unlock()
+                               }
+
                                return cmd.encoder.Encode(LibraryEntry{
                                        CompactSequences: []CompactSequence{{Name: infile, TileSequences: tseqs}},
                                })
@@ -411,8 +425,9 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error {
                        if len(errs) > 0 {
                                return
                        }
+                       variants := flatten(variants)
                        err := cmd.encoder.Encode(LibraryEntry{
-                               CompactGenomes: []CompactGenome{{Name: infile, Variants: flatten(variants)}},
+                               CompactGenomes: []CompactGenome{{Name: infile, Variants: variants}},
                        })
                        if err != nil {
                                select {
@@ -420,12 +435,20 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error {
                                default:
                                }
                        }
+                       if cmd.retainAfterEncoding {
+                               tilelib.mtx.Lock()
+                               if tilelib.compactGenomes == nil {
+                                       tilelib.compactGenomes = make(map[string][]tileVariantID)
+                               }
+                               tilelib.compactGenomes[infile] = variants
+                               tilelib.mtx.Unlock()
+                       }
                }()
        }
        go close(todo)
        var tileJobs sync.WaitGroup
        var running int64
-       for i := 0; i < runtime.GOMAXPROCS(-1)*2; i++ {
+       for i := 0; i < runtime.GOMAXPROCS(-1); i++ {
                tileJobs.Add(1)
                atomic.AddInt64(&running, 1)
                go func() {
@@ -517,7 +540,7 @@ func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (t
                return
        }
        defer consensus.Wait()
-       tileseq, stats, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout, cmd.matchChromosome)
+       tileseq, stats, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout, cmd.matchChromosome, false)
        if err != nil {
                return
        }