X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/d106512fe534891cd17ad08d057cb5e0e1f984f4..HEAD:/import.go diff --git a/import.go b/import.go index b45cf98eac..20058dc8a9 100644 --- a/import.go +++ b/import.go @@ -1,3 +1,7 @@ +// Copyright (C) The Lightning Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package lightning import ( @@ -42,6 +46,7 @@ type importer struct { outputStats string matchChromosome *regexp.Regexp encoder *gob.Encoder + retainAfterEncoding bool // keep imported genomes/refseqs in memory after writing to disk batchArgs } @@ -180,7 +185,7 @@ func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error { Client: arvadosClientFromEnv, ProjectUUID: cmd.projectUUID, APIAccess: true, - RAM: 360000000000, + RAM: 350000000000, VCPUs: 96, Priority: cmd.priority, KeepCache: 1, @@ -229,7 +234,7 @@ func (cmd *importer) runBatches(stdout io.Writer, inputs []string) error { return nil } -func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, []importStats, error) { +func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string, isRef bool) (tileSeq, []importStats, error) { var input io.ReadCloser input, err := open(infile) if err != nil { @@ -244,7 +249,7 @@ func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, [] } defer input.Close() } - return tilelib.TileFasta(infile, input, cmd.matchChromosome) + return tilelib.TileFasta(infile, input, cmd.matchChromosome, isRef) } func (cmd *importer) loadTagLibrary() (*tagLibrary, error) { @@ -276,8 +281,8 @@ func (cmd *importer) loadTagLibrary() (*tagLibrary, error) { var ( vcfFilenameRe = regexp.MustCompile(`\.vcf(\.gz)?$`) - fasta1FilenameRe = regexp.MustCompile(`\.1\.fa(sta)?(\.gz)?$`) - fasta2FilenameRe = regexp.MustCompile(`\.2\.fa(sta)?(\.gz)?$`) + fasta1FilenameRe = regexp.MustCompile(`\.1\.fa(sta)?(\.fa(sta)?)?(\.gz)?$`) + fasta2FilenameRe = regexp.MustCompile(`\.2\.fa(sta)?(\.fa(sta)?)?(\.gz)?$`) fastaFilenameRe = regexp.MustCompile(`\.fa(sta)?(\.gz)?$`) ) @@ -342,35 +347,34 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { if fasta1FilenameRe.MatchString(infile) { todo <- func() error { defer phases.Done() - log.Printf("%s starting", infile) + log.Printf("%s (sample.1) starting tiling", infile) defer log.Printf("%s done", infile) - tseqs, stats, err := cmd.tileFasta(tilelib, infile) + tseqs, stats, err := cmd.tileFasta(tilelib, infile, false) allstats[idx*2] = stats var kept, dropped int variants[0], kept, dropped = tseqs.Variants() - log.Printf("%s found %d unique tags plus %d repeats", infile, kept, dropped) + log.Printf("%s (sample.1) found %d unique tags plus %d repeats", infile, kept, dropped) return err } - infile2 := fasta1FilenameRe.ReplaceAllString(infile, `.2.fa$1$2`) + infile2 := fasta1FilenameRe.ReplaceAllString(infile, `.2.fa$1$2$4`) todo <- func() error { defer phases.Done() - log.Printf("%s starting", infile2) + log.Printf("%s (sample.2) starting tiling", infile2) defer log.Printf("%s done", infile2) - tseqs, stats, err := cmd.tileFasta(tilelib, infile2) + tseqs, stats, err := cmd.tileFasta(tilelib, infile2, false) allstats[idx*2+1] = stats var kept, dropped int variants[1], kept, dropped = tseqs.Variants() - log.Printf("%s found %d unique tags plus %d repeats", infile2, kept, dropped) - + log.Printf("%s (sample.2) found %d unique tags plus %d repeats", infile2, kept, dropped) return err } } else if fastaFilenameRe.MatchString(infile) { todo <- func() error { defer phases.Done() defer phases.Done() - log.Printf("%s starting", infile) + log.Printf("%s (reference) starting tiling", infile) defer log.Printf("%s done", infile) - tseqs, stats, err := cmd.tileFasta(tilelib, infile) + tseqs, stats, err := cmd.tileFasta(tilelib, infile, true) allstats[idx*2] = stats if err != nil { return err @@ -379,7 +383,17 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { for _, tseq := range tseqs { totlen += len(tseq) } - log.Printf("%s tiled %d seqs, total len %d", infile, len(tseqs), totlen) + log.Printf("%s (reference) tiled %d seqs, total len %d", infile, len(tseqs), totlen) + + if cmd.retainAfterEncoding { + tilelib.mtx.Lock() + if tilelib.refseqs == nil { + tilelib.refseqs = map[string]map[string][]tileLibRef{} + } + tilelib.refseqs[infile] = tseqs + tilelib.mtx.Unlock() + } + return cmd.encoder.Encode(LibraryEntry{ CompactSequences: []CompactSequence{{Name: infile, TileSequences: tseqs}}, }) @@ -411,8 +425,9 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { if len(errs) > 0 { return } + variants := flatten(variants) err := cmd.encoder.Encode(LibraryEntry{ - CompactGenomes: []CompactGenome{{Name: infile, Variants: flatten(variants)}}, + CompactGenomes: []CompactGenome{{Name: infile, Variants: variants}}, }) if err != nil { select { @@ -420,12 +435,20 @@ func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error { default: } } + if cmd.retainAfterEncoding { + tilelib.mtx.Lock() + if tilelib.compactGenomes == nil { + tilelib.compactGenomes = make(map[string][]tileVariantID) + } + tilelib.compactGenomes[infile] = variants + tilelib.mtx.Unlock() + } }() } go close(todo) var tileJobs sync.WaitGroup var running int64 - for i := 0; i < runtime.GOMAXPROCS(-1)*2; i++ { + for i := 0; i < runtime.GOMAXPROCS(-1); i++ { tileJobs.Add(1) atomic.AddInt64(&running, 1) go func() { @@ -517,7 +540,7 @@ func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (t return } defer consensus.Wait() - tileseq, stats, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout, cmd.matchChromosome) + tileseq, stats, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout, cmd.matchChromosome, false) if err != nil { return }