17 "git.arvados.org/arvados.git/sdk/go/arvados"
18 log "github.com/sirupsen/logrus"
21 type vcf2fasta struct {
29 func (cmd *vcf2fasta) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
33 fmt.Fprintf(stderr, "%s\n", err)
36 flags := flag.NewFlagSet("", flag.ContinueOnError)
37 flags.SetOutput(stderr)
38 flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
39 flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for containers and output data")
40 flags.StringVar(&cmd.outputDir, "output-dir", "", "output directory")
41 flags.IntVar(&cmd.vcpus, "vcpus", 0, "number of VCPUs to request for arvados container (default: 2*number of input files, max 32)")
42 flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
43 pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
44 err = flags.Parse(args)
45 if err == flag.ErrHelp {
48 } else if err != nil {
50 } else if cmd.refFile == "" {
51 err = errors.New("reference data (-ref) not specified")
53 } else if flags.NArg() == 0 {
60 log.Println(http.ListenAndServe(*pprof, nil))
65 if cmd.outputDir != "" {
66 err = errors.New("cannot specify output dir in non-local mode")
71 infiles, err = listInputFiles(flags.Args())
75 if cmd.vcpus = len(infiles) * 2; cmd.vcpus > 32 {
79 runner := arvadosContainerRunner{
80 Name: "lightning vcf2fasta",
81 Client: arvados.NewClientFromEnv(),
82 ProjectUUID: cmd.projectUUID,
83 RAM: 2<<30 + int64(cmd.vcpus)<<28,
86 err = runner.TranslatePaths(&cmd.refFile)
90 inputs := flags.Args()
91 for i := range inputs {
92 err = runner.TranslatePaths(&inputs[i])
97 runner.Args = append([]string{"vcf2fasta", "-local=true", "-ref", cmd.refFile, "-output-dir", "/mnt/output"}, inputs...)
99 output, err = runner.Run()
103 fmt.Fprintln(stdout, output)
107 infiles, err := listInputFiles(flags.Args())
116 todo := make(chan job)
118 for _, infile := range infiles {
119 for phase := 1; phase <= 2; phase++ {
120 todo <- job{vcffile: infile, phase: phase}
126 done := make(chan error, runtime.NumCPU()*2)
127 var wg sync.WaitGroup
128 for i := 0; i < runtime.NumCPU(); i++ {
132 for job := range todo {
134 // a different worker encountered an error
137 err := cmd.vcf2fasta(job.vcffile, job.phase)
139 done <- fmt.Errorf("%s phase %d: %s", job.vcffile, job.phase, err)
157 func (cmd *vcf2fasta) vcf2fasta(infile string, phase int) error {
158 args := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase), infile}
159 indexsuffix := ".tbi"
160 if _, err := os.Stat(infile + ".csi"); err == nil {
163 if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err == nil && len(out) > 0 {
164 args = append([]string{
165 "docker", "run", "--rm",
167 "--volume=" + infile + ":" + infile + ":ro",
168 "--volume=" + infile + indexsuffix + ":" + infile + indexsuffix + ":ro",
169 "--volume=" + cmd.refFile + ":" + cmd.refFile + ":ro",
174 _, basename := filepath.Split(infile)
175 outfile := filepath.Join(cmd.outputDir, fmt.Sprintf("%s.%d.fasta.gz", basename, phase))
176 outf, err := os.OpenFile(outfile, os.O_CREATE|os.O_WRONLY|os.O_EXCL, 0777)
178 return fmt.Errorf("error opening output file: %s", err)
181 gzipw := gzip.NewWriter(outf)
184 consensus := exec.Command(args[0], args[1:]...)
185 consensus.Stderr = os.Stderr
186 consensus.Stdout = gzipw
187 err = consensus.Start()
191 err = consensus.Wait()