Allow importing all-hom (reference) data from single fasta file.
[lightning.git] / vcf2fasta.go
index 6083e4c04130b00a1fe2b3df82f1da4994466d37..2f02e44a28b8b9188ba8700c4ce15c88a08647c5 100644 (file)
@@ -1,8 +1,10 @@
 package main
 
 import (
+       "bufio"
        "bytes"
        "compress/gzip"
+       "context"
        "errors"
        "flag"
        "fmt"
@@ -13,7 +15,9 @@ import (
        "os"
        "os/exec"
        "path/filepath"
+       "regexp"
        "runtime"
+       "strconv"
        "strings"
        "sync"
        "syscall"
@@ -112,7 +116,10 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
                                },
                        },
                }
-               err = runner.TranslatePaths(&cmd.refFile)
+               if cmd.mask {
+                       runner.RAM += int64(cmd.vcpus) << 31
+               }
+               err = runner.TranslatePaths(&cmd.refFile, &cmd.genomeFile)
                if err != nil {
                        return 1
                }
@@ -123,7 +130,12 @@ func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, st
                                return 1
                        }
                }
-               runner.Args = append([]string{"vcf2fasta", "-local=true", "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask), "-gvcf-regions.py", "/gvcf_regions.py", "-output-dir", "/mnt/output"}, inputs...)
+               runner.Args = append([]string{"vcf2fasta",
+                       "-local=true",
+                       "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask),
+                       "-genome", cmd.genomeFile,
+                       "-gvcf-regions.py", "/gvcf_regions.py",
+                       "-output-dir", "/mnt/output"}, inputs...)
                var output string
                output, err = runner.Run()
                if err != nil {
@@ -200,6 +212,9 @@ func maybeInDocker(args, mountfiles []string) []string {
 }
 
 func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
        _, basename := filepath.Split(infile)
        outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase))
        outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY, 0777)
@@ -215,22 +230,97 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
        var wg sync.WaitGroup
        errs := make(chan error, 2)
        if cmd.mask {
-               if cmd.genomeFile == "" {
-                       return errors.New("cannot apply mask without -genome argument")
+               chrSize := map[string]int{}
+
+               vcffile, err := os.Open(infile)
+               if err != nil {
+                       return err
+               }
+               defer vcffile.Close()
+               var rdr io.Reader = vcffile
+               if strings.HasSuffix(infile, ".gz") {
+                       rdr, err = gzip.NewReader(vcffile)
+                       if err != nil {
+                               return err
+                       }
+               }
+               contigre := regexp.MustCompile(`([^=,]*)=([^>,]*)`)
+               scanner := bufio.NewScanner(rdr)
+               for scanner.Scan() {
+                       if s := scanner.Text(); !strings.HasPrefix(s, "##") {
+                               break
+                       } else if !strings.HasPrefix(s, "##contig=<") {
+                               continue
+                       } else {
+                               kv := map[string]string{}
+                               for _, m := range contigre.FindAllStringSubmatch(s[10:], -1) {
+                                       kv[m[1]] = m[2]
+                               }
+                               if kv["ID"] != "" && kv["length"] != "" {
+                                       chrSize[kv["ID"]], _ = strconv.Atoi(kv["length"])
+                               }
+                       }
+               }
+               if err = scanner.Err(); err != nil {
+                       return fmt.Errorf("error scanning input file %q: %s", infile, err)
                }
                var regions bytes.Buffer
-               bedargs := []string{"python", "-", "--gvcf_type", "gatk", infile}
-               bed := exec.Command(bedargs[0], bedargs[1:]...)
+               bedargs := []string{"python2", "-", "--gvcf_type", "gatk", infile}
+               bed := exec.CommandContext(ctx, bedargs[0], bedargs[1:]...)
                bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
                bed.Stdout = &regions
                bed.Stderr = cmd.stderr
                log.Printf("running %v", bed.Args)
-               err := bed.Run()
+               err = bed.Run()
                log.Printf("exited %v", bed.Args)
                if err != nil {
                        return fmt.Errorf("gvcf_regions: %s", err)
                }
 
+               if cmd.genomeFile != "" {
+                       // Read chromosome sizes from genome file in
+                       // case any weren't specified in the VCF
+                       // header.
+                       genomeFile, err := os.Open(cmd.genomeFile)
+                       if err != nil {
+                               return fmt.Errorf("error opening genome file %q: %s", cmd.genomeFile, err)
+                       }
+                       scanner := bufio.NewScanner(genomeFile)
+                       for scanner.Scan() {
+                               var chr string
+                               var size int
+                               _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size)
+                               if err != nil {
+                                       return fmt.Errorf("error parsing genome file %q: %s", cmd.genomeFile, err)
+                               }
+                               if chrSize[chr] == 0 {
+                                       chrSize[chr] = size
+                               }
+                       }
+                       if err = scanner.Err(); err != nil {
+                               return fmt.Errorf("error scanning genome file %q: %s", cmd.genomeFile, err)
+                       }
+               }
+
+               // "bedtools complement" expects the chromosome sizes
+               // ("genome file") to appear in the same order as the
+               // chromosomes in the input vcf, so we need to sort
+               // them.
+               scanner = bufio.NewScanner(bytes.NewBuffer(append([]byte(nil), regions.Bytes()...)))
+               var sortedGenomeFile bytes.Buffer
+               for scanner.Scan() {
+                       var chr string
+                       var size int
+                       _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size)
+                       if err != nil {
+                               return fmt.Errorf("error parsing gvcf_regions output: %s", err)
+                       }
+                       if size, ok := chrSize[chr]; ok {
+                               fmt.Fprintf(&sortedGenomeFile, "%s\t%d\n", chr, size)
+                               delete(chrSize, chr)
+                       }
+               }
+
                // The bcftools --mask argument needs to end in ".bed"
                // in order to be parsed as a BED file, so we need to
                // use a named pipe instead of stdin.
@@ -244,6 +334,17 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
                if err != nil {
                        return fmt.Errorf("mkfifo: %s", err)
                }
+
+               // bedtools complement can't seem to read from a pipe
+               // reliably -- "Error: line number 1 of file
+               // /dev/stdin has 1 fields, but 3 were expected." --
+               // so we stage to a temp file.
+               regionsFile := filepath.Join(tempdir, "gvcf_regions.bed")
+               err = ioutil.WriteFile(regionsFile, regions.Bytes(), 0644)
+               if err != nil {
+                       return err
+               }
+
                wg.Add(1)
                go func() {
                        defer wg.Done()
@@ -255,10 +356,10 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
                        }
                        defer maskfifow.Close()
 
-                       bedcompargs := []string{"bedtools", "complement", "-i", "/dev/stdin", "-g", cmd.genomeFile}
+                       bedcompargs := []string{"bedtools", "complement", "-i", regionsFile, "-g", "/dev/stdin"}
                        bedcompargs = maybeInDocker(bedcompargs, []string{cmd.genomeFile})
-                       bedcomp := exec.Command(bedcompargs[0], bedcompargs[1:]...)
-                       bedcomp.Stdin = &regions
+                       bedcomp := exec.CommandContext(ctx, bedcompargs[0], bedcompargs[1:]...)
+                       bedcomp.Stdin = &sortedGenomeFile
                        bedcomp.Stdout = maskfifow
                        bedcomp.Stderr = cmd.stderr
                        log.Printf("running %v", bedcomp.Args)
@@ -294,7 +395,7 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
                }
                consargs = maybeInDocker(consargs, mounts)
 
-               consensus := exec.Command(consargs[0], consargs[1:]...)
+               consensus := exec.CommandContext(ctx, consargs[0], consargs[1:]...)
                consensus.Stderr = os.Stderr
                consensus.Stdout = gzipw
                consensus.Stderr = cmd.stderr
@@ -319,6 +420,7 @@ func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
 
        for err := range errs {
                if err != nil {
+                       cancel()
                        wg.Wait()
                        return err
                }