22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 log "github.com/sirupsen/logrus"
26 type vcf2fasta struct {
31 gvcfRegionsPyData []byte
40 func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
44 fmt.Fprintf(stderr, "%s\n", err)
47 flags := flag.NewFlagSet("", flag.ContinueOnError)
48 flags.SetOutput(stderr)
49 flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
50 flags.StringVar(&cmd.genomeFile, "genome", "", "reference genome `file`")
51 flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)")
52 flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py")
53 flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data")
54 flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
55 flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
56 flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
57 priority := flags.Int("priority", 500, "container request priority")
58 pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
59 err = flags.Parse(args)
60 if err == flag.ErrHelp {
63 } else if err != nil {
65 } else if cmd.refFile == "" {
66 err = errors.New("reference data (-ref) not specified")
68 } else if flags.NArg() == 0 {
76 log.Println(http.ListenAndServe(*pprof, nil))
81 err = cmd.loadRegionsPy()
88 if cmd.outputDir != "" {
89 err = errors.New("cannot specify output dir in non-local mode")
94 infiles, err = listInputFiles(flags.Args())
98 if cmd.vcpus = len(infiles) * 2; cmd.vcpus > 32 {
102 runner := arvadosContainerRunner{
103 Name: "lightning vcf2fasta",
104 Client: arvados.NewClientFromEnv(),
105 ProjectUUID: cmd.projectUUID,
106 RAM: 2<<30 + int64(cmd.vcpus)<<28,
109 Mounts: map[string]map[string]interface{}{
110 "/gvcf_regions.py": map[string]interface{}{
112 "content": string(cmd.gvcfRegionsPyData),
116 err = runner.TranslatePaths(&cmd.refFile, &cmd.genomeFile)
120 inputs := flags.Args()
121 for i := range inputs {
122 err = runner.TranslatePaths(&inputs[i])
127 runner.Args = append([]string{"vcf2fasta",
129 "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask),
130 "-genome", cmd.genomeFile,
131 "-gvcf-regions.py", "/gvcf_regions.py",
132 "-output-dir", "/mnt/output"}, inputs...)
134 output, err = runner.Run()
138 fmt.Fprintln(stdout, output)
142 infiles, err := listInputFiles(flags.Args())
151 todo := make(chan job)
153 for _, infile := range infiles {
154 for phase := 1; phase <= 2; phase++ {
155 todo <- job{vcffile: infile, phase: phase}
161 done := make(chan error, runtime.NumCPU()*2)
162 var wg sync.WaitGroup
163 for i := 0; i < runtime.NumCPU(); i++ {
167 for job := range todo {
169 // a different worker encountered an error
172 err := cmd.vcf2fasta(job.vcffile, job.phase)
174 done <- fmt.Errorf("%s phase %d: %s", job.vcffile, job.phase, err)
192 func maybeInDocker(args, mountfiles []string) []string {
193 if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err != nil || len(out) == 0 {
196 dockerrun := []string{
197 "docker", "run", "--rm", "-i",
200 for _, f := range mountfiles {
201 dockerrun = append(dockerrun, "--volume="+f+":"+f+":ro")
203 dockerrun = append(dockerrun, "lightning-runtime")
204 dockerrun = append(dockerrun, args...)
208 func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
209 _, basename := filepath.Split(infile)
210 outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase))
211 outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY, 0777)
213 return fmt.Errorf("error opening output file: %s", err)
216 gzipw := gzip.NewWriter(outf)
219 var maskfifo string // filename of mask fifo if we're running bedtools, otherwise ""
221 var wg sync.WaitGroup
222 errs := make(chan error, 2)
224 if cmd.genomeFile == "" {
225 // TODO: get chromosome sizes from VCF header, ##contig=<ID=chr10,length=133797422>
226 return errors.New("cannot apply mask without -genome argument")
229 var regions bytes.Buffer
230 bedargs := []string{"python2", "-", "--gvcf_type", "gatk", infile}
231 bed := exec.Command(bedargs[0], bedargs[1:]...)
232 bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
233 bed.Stdout = ®ions
234 bed.Stderr = cmd.stderr
235 log.Printf("running %v", bed.Args)
237 log.Printf("exited %v", bed.Args)
239 return fmt.Errorf("gvcf_regions: %s", err)
242 // "bedtools complement" expects the chromosome sizes
243 // ("genome file") to appear in the same order as the
244 // chromosomes in the input vcf, so we need to sort
246 genomeFile, err := os.Open(cmd.genomeFile)
248 return fmt.Errorf("error opening genome file: %s", err)
250 chrSize := map[string]int{}
251 scanner := bufio.NewScanner(genomeFile)
255 _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size)
257 return fmt.Errorf("error parsing genome file: %s", err)
261 if err = scanner.Err(); err != nil {
262 return fmt.Errorf("error scanning genome file: %s", err)
264 scanner = bufio.NewScanner(bytes.NewBuffer(append([]byte(nil), regions.Bytes()...)))
265 var sortedGenomeFile bytes.Buffer
269 _, err := fmt.Sscanf(scanner.Text(), "%s\t%d", &chr, &size)
271 return fmt.Errorf("error parsing gvcf_regions output: %s", err)
273 if size, ok := chrSize[chr]; ok {
274 fmt.Fprintf(&sortedGenomeFile, "%s\t%d\n", chr, size)
279 // The bcftools --mask argument needs to end in ".bed"
280 // in order to be parsed as a BED file, so we need to
281 // use a named pipe instead of stdin.
282 tempdir, err := ioutil.TempDir("", "")
284 return fmt.Errorf("TempDir: %s", err)
286 defer os.RemoveAll(tempdir)
287 maskfifo = filepath.Join(tempdir, "fifo.bed")
288 err = syscall.Mkfifo(maskfifo, 0600)
290 return fmt.Errorf("mkfifo: %s", err)
293 // bedtools complement can't seem to read from a pipe
294 // reliably -- "Error: line number 1 of file
295 // /dev/stdin has 1 fields, but 3 were expected." --
296 // so we stage to a temp file.
297 regionsFile := filepath.Join(tempdir, "gvcf_regions.bed")
298 err = ioutil.WriteFile(regionsFile, regions.Bytes(), 0644)
307 maskfifow, err := os.OpenFile(maskfifo, os.O_WRONLY, 0)
312 defer maskfifow.Close()
314 bedcompargs := []string{"bedtools", "complement", "-i", regionsFile, "-g", "/dev/stdin"}
315 bedcompargs = maybeInDocker(bedcompargs, []string{cmd.genomeFile})
316 bedcomp := exec.Command(bedcompargs[0], bedcompargs[1:]...)
317 bedcomp.Stdin = &sortedGenomeFile
318 bedcomp.Stdout = maskfifow
319 bedcomp.Stderr = cmd.stderr
320 log.Printf("running %v", bedcomp.Args)
322 log.Printf("exited %v", bedcomp.Args)
324 errs <- fmt.Errorf("bedtools complement: %s", err)
327 err = maskfifow.Close()
338 consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)}
340 consargs = append(consargs, "--mask", maskfifo)
342 consargs = append(consargs, infile)
343 indexsuffix := ".tbi"
344 if _, err := os.Stat(infile + ".csi"); err == nil {
347 mounts := []string{infile, infile + indexsuffix, cmd.refFile}
349 mounts = append(mounts, maskfifo)
351 consargs = maybeInDocker(consargs, mounts)
353 consensus := exec.Command(consargs[0], consargs[1:]...)
354 consensus.Stderr = os.Stderr
355 consensus.Stdout = gzipw
356 consensus.Stderr = cmd.stderr
357 log.Printf("running %v", consensus.Args)
358 err = consensus.Run()
360 errs <- fmt.Errorf("bcftools consensus: %s", err)
376 for err := range errs {
385 func (cmd *vcf2fasta) loadRegionsPy() error {
386 if strings.HasPrefix(cmd.gvcfRegionsPy, "http") {
387 resp, err := http.Get(cmd.gvcfRegionsPy)
391 if resp.StatusCode != http.StatusOK {
392 return fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode)
394 buf, err := ioutil.ReadAll(resp.Body)
396 return fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err)
398 cmd.gvcfRegionsPyData = buf
401 buf, err := ioutil.ReadFile(cmd.gvcfRegionsPy)
405 cmd.gvcfRegionsPyData = buf