19 "git.arvados.org/arvados.git/sdk/go/arvados"
20 log "github.com/sirupsen/logrus"
23 type vcf2fasta struct {
27 gvcfRegionsPyData []byte
34 func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
38 fmt.Fprintf(stderr, "%s\n", err)
41 flags := flag.NewFlagSet("", flag.ContinueOnError)
42 flags.SetOutput(stderr)
43 flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
44 flags.BoolVar(&cmd.mask, "mask", false, "mask uncalled regions (default: output hom ref)")
45 flags.StringVar(&cmd.gvcfRegionsPy, "gvcf-regions.py", "https://raw.githubusercontent.com/lijiayong/gvcf_regions/master/gvcf_regions.py", "source of gvcf_regions.py")
46 flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data")
47 flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
48 flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
49 flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
50 priority := flags.Int("priority", 500, "container request priority")
51 pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
52 err = flags.Parse(args)
53 if err == flag.ErrHelp {
56 } else if err != nil {
58 } else if cmd.refFile == "" {
59 err = errors.New("reference data (-ref) not specified")
61 } else if flags.NArg() == 0 {
68 log.Println(http.ListenAndServe(*pprof, nil))
73 if cmd.outputDir != "" {
74 err = errors.New("cannot specify output dir in non-local mode")
79 infiles, err = listInputFiles(flags.Args())
83 if cmd.vcpus = len(infiles) * 2; cmd.vcpus > 32 {
87 runner := arvadosContainerRunner{
88 Name: "lightning vcf2fasta",
89 Client: arvados.NewClientFromEnv(),
90 ProjectUUID: cmd.projectUUID,
91 RAM: 2<<30 + int64(cmd.vcpus)<<28,
95 err = runner.TranslatePaths(&cmd.refFile)
99 inputs := flags.Args()
100 for i := range inputs {
101 err = runner.TranslatePaths(&inputs[i])
106 runner.Args = append([]string{"vcf2fasta", "-local=true", "-ref", cmd.refFile, fmt.Sprintf("-mask=%v", cmd.mask), "-gvcf-regions.py", cmd.gvcfRegionsPy, "-output-dir", "/mnt/output"}, inputs...)
108 output, err = runner.Run()
112 fmt.Fprintln(stdout, output)
117 var resp *http.Response
118 resp, err = http.Get(cmd.gvcfRegionsPy)
122 if resp.StatusCode != http.StatusOK {
123 err = fmt.Errorf("get %q: http status %d", cmd.gvcfRegionsPy, resp.StatusCode)
126 cmd.gvcfRegionsPyData, err = ioutil.ReadAll(resp.Body)
128 err = fmt.Errorf("get %q: read body: %s", cmd.gvcfRegionsPy, err)
133 infiles, err := listInputFiles(flags.Args())
142 todo := make(chan job)
144 for _, infile := range infiles {
145 for phase := 1; phase <= 2; phase++ {
146 todo <- job{vcffile: infile, phase: phase}
152 done := make(chan error, runtime.NumCPU()*2)
153 var wg sync.WaitGroup
154 for i := 0; i < runtime.NumCPU(); i++ {
158 for job := range todo {
160 // a different worker encountered an error
163 err := cmd.vcf2fasta(job.vcffile, job.phase)
165 done <- fmt.Errorf("%s phase %d: %s", job.vcffile, job.phase, err)
183 func maybeInDocker(args, mountfiles []string) []string {
184 if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err != nil || len(out) == 0 {
187 dockerrun := []string{
188 "docker", "run", "--rm",
191 for _, f := range mountfiles {
192 dockerrun = append(dockerrun, "--volume="+f+":"+f+":ro")
194 dockerrun = append(dockerrun, "lightning-runtime")
195 dockerrun = append(dockerrun, args...)
199 func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
200 _, basename := filepath.Split(infile)
201 outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase))
202 outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0777)
204 return fmt.Errorf("error opening output file: %s", err)
207 gzipw := gzip.NewWriter(outf)
210 var maskfile *os.File // reading side of a pipe if we're running bedtools, otherwise nil
212 var wg sync.WaitGroup
213 errs := make(chan error, 3)
215 bedr, bedw, err := os.Pipe()
219 bedargs := []string{"python", "-", "--gvcf_type", "gatk", infile}
220 bed := exec.Command(bedargs[0], bedargs[1:]...)
221 bed.Stdin = bytes.NewBuffer(cmd.gvcfRegionsPyData)
229 bedcompr, bedcompw, err := os.Pipe()
233 bedcompargs := []string{"bedtools", "complement", "-i", "/dev/stdin", "-g", cmd.refFile}
234 bedcompargs = maybeInDocker(bedcompargs, []string{cmd.refFile, infile})
235 bedcomp := exec.Command(bedcompargs[0], bedcompargs[1:]...)
237 bedcomp.Stdout = bedcompw
241 errs <- bedcomp.Run()
246 consargs := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase)}
248 consargs = append(consargs, "--mask", "/dev/fd/3")
250 consargs = append(consargs, infile)
251 indexsuffix := ".tbi"
252 if _, err := os.Stat(infile + ".csi"); err == nil {
255 consargs = maybeInDocker(consargs, []string{infile, infile + indexsuffix, cmd.refFile})
257 consensus := exec.Command(consargs[0], consargs[1:]...)
258 consensus.Stderr = os.Stderr
259 consensus.Stdout = gzipw
261 consensus.ExtraFiles = []*os.File{maskfile}
263 err = consensus.Start()
267 err = consensus.Wait()