55f25361bc4b6c7e7772a41d24ca0e321bd3aa72
[lightning.git] / gvcf2numpy.go
1 package main
2
3 import (
4         "bufio"
5         "compress/gzip"
6         "flag"
7         "fmt"
8         "io"
9         "log"
10         "os"
11         "os/exec"
12         "path/filepath"
13         "runtime"
14         "sort"
15         "strings"
16         "sync"
17
18         "github.com/kshedden/gonpy"
19 )
20
21 type gvcf2numpy struct {
22         tagLibraryFile string
23         refFile        string
24         output         io.Writer
25 }
26
27 func (cmd *gvcf2numpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
28         var err error
29         defer func() {
30                 if err != nil {
31                         fmt.Fprintf(stderr, "%s\n", err)
32                 }
33         }()
34         flags := flag.NewFlagSet("", flag.ContinueOnError)
35         flags.SetOutput(stderr)
36         flags.StringVar(&cmd.tagLibraryFile, "tag-library", "", "tag library fasta `file`")
37         flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
38         err = flags.Parse(args)
39         if err == flag.ErrHelp {
40                 err = nil
41                 return 0
42         } else if err != nil {
43                 return 2
44         } else if cmd.refFile == "" || cmd.tagLibraryFile == "" {
45                 fmt.Fprintln(os.Stderr, "cannot run without -tag-library and -ref arguments")
46                 return 2
47         } else if flags.NArg() == 0 {
48                 flags.Usage()
49                 return 2
50         }
51         cmd.output = stdout
52
53         infiles, err := listVCFFiles(flags.Args())
54         if err != nil {
55                 return 1
56         }
57
58         log.Printf("tag library %s load starting", cmd.tagLibraryFile)
59         f, err := os.Open(cmd.tagLibraryFile)
60         if err != nil {
61                 return 1
62         }
63         var rdr io.ReadCloser = f
64         if strings.HasSuffix(cmd.tagLibraryFile, ".gz") {
65                 rdr, err = gzip.NewReader(f)
66                 if err != nil {
67                         err = fmt.Errorf("%s: gzip: %s", cmd.tagLibraryFile, err)
68                         return 1
69                 }
70         }
71         var taglib tagLibrary
72         err = taglib.Load(rdr)
73         if err != nil {
74                 return 1
75         }
76         if taglib.Len() < 1 {
77                 err = fmt.Errorf("cannot tile: tag library is empty")
78                 return 1
79         }
80         log.Printf("tag library %s load done", cmd.tagLibraryFile)
81
82         tilelib := tileLibrary{taglib: &taglib}
83         tseqs, err := cmd.tileGVCFs(&tilelib, infiles)
84         if err != nil {
85                 return 1
86         }
87         err = cmd.printVariants(tseqs)
88         if err != nil {
89                 return 1
90         }
91         return 0
92 }
93
94 func listVCFFiles(paths []string) (files []string, err error) {
95         for _, path := range paths {
96                 if fi, err := os.Stat(path); err != nil {
97                         return nil, fmt.Errorf("%s: stat failed: %s", path, err)
98                 } else if !fi.IsDir() {
99                         files = append(files, path)
100                         continue
101                 }
102                 d, err := os.Open(path)
103                 if err != nil {
104                         return nil, fmt.Errorf("%s: open failed: %s", path, err)
105                 }
106                 defer d.Close()
107                 names, err := d.Readdirnames(0)
108                 if err != nil {
109                         return nil, fmt.Errorf("%s: readdir failed: %s", path, err)
110                 }
111                 sort.Strings(names)
112                 for _, name := range names {
113                         if strings.HasSuffix(name, ".vcf") || strings.HasSuffix(name, ".vcf.gz") {
114                                 files = append(files, filepath.Join(path, name))
115                         }
116                 }
117                 d.Close()
118         }
119         for _, file := range files {
120                 if _, err := os.Stat(file + ".csi"); err == nil {
121                         continue
122                 } else if _, err = os.Stat(file + ".tbi"); err == nil {
123                         continue
124                 } else {
125                         return nil, fmt.Errorf("%s: cannot read without .tbi or .csi index file", file)
126                 }
127         }
128         return
129 }
130
131 func (cmd *gvcf2numpy) tileGVCFs(tilelib *tileLibrary, infiles []string) ([]tileSeq, error) {
132         limit := make(chan bool, runtime.NumCPU())
133         errs := make(chan error)
134         tseqs := make([]tileSeq, len(infiles)*2)
135         var wg sync.WaitGroup
136         for i, infile := range infiles {
137                 for phase := 0; phase < 2; phase++ {
138                         wg.Add(1)
139                         go func(i int, infile string, phase int) {
140                                 defer wg.Done()
141                                 limit <- true
142                                 defer func() { <-limit }()
143                                 log.Printf("%s phase %d starting", infile, phase+1)
144                                 defer log.Printf("%s phase %d done", infile, phase+1)
145                                 var err error
146                                 tseqs[i*2+phase], err = cmd.tileGVCF(tilelib, infile, phase)
147                                 if err != nil {
148                                         select {
149                                         case errs <- err:
150                                         default:
151                                         }
152                                         return
153                                 }
154                         }(i, infile, phase)
155                 }
156         }
157         go func() {
158                 wg.Wait()
159                 close(errs)
160         }()
161         if err := <-errs; err != nil {
162                 return nil, err
163         }
164         return tseqs, nil
165 }
166
167 func (cmd *gvcf2numpy) printVariants(tseqs []tileSeq) error {
168         maxtag := tagID(-1)
169         for _, tseq := range tseqs {
170                 for _, path := range tseq {
171                         for _, tvar := range path {
172                                 if maxtag < tvar.tag {
173                                         maxtag = tvar.tag
174                                 }
175                         }
176                 }
177         }
178         out := make([]int32, len(tseqs)*int(maxtag+1))
179         for i := 0; i < len(tseqs)/2; i++ {
180                 for phase := 0; phase < 2; phase++ {
181                         for _, path := range tseqs[i*2+phase] {
182                                 for _, tvar := range path {
183                                         out[2*int(tvar.tag)+phase] = int32(tvar.variant)
184                                 }
185                         }
186                 }
187         }
188         w := bufio.NewWriter(cmd.output)
189         npw, err := gonpy.NewWriter(nopCloser{w})
190         if err != nil {
191                 return err
192         }
193         npw.Shape = []int{len(tseqs) / 2, 2 * (int(maxtag) + 1)}
194         npw.WriteInt32(out)
195         return w.Flush()
196 }
197
198 type nopCloser struct {
199         io.Writer
200 }
201
202 func (nopCloser) Close() error { return nil }
203
204 func (cmd *gvcf2numpy) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, err error) {
205         args := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase + 1), infile}
206         indexsuffix := ".tbi"
207         if _, err := os.Stat(infile + ".csi"); err == nil {
208                 indexsuffix = ".csi"
209         }
210         if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err == nil && len(out) > 0 {
211                 args = append([]string{
212                         "docker", "run", "--rm",
213                         "--log-driver=none",
214                         "--volume=" + infile + ":" + infile + ":ro",
215                         "--volume=" + infile + indexsuffix + ":" + infile + indexsuffix + ":ro",
216                         "--volume=" + cmd.refFile + ":" + cmd.refFile + ":ro",
217                         "lightning-runtime",
218                 }, args...)
219         }
220         consensus := exec.Command(args[0], args[1:]...)
221         consensus.Stderr = os.Stderr
222         stdout, err := consensus.StdoutPipe()
223         if err != nil {
224                 return
225         }
226         err = consensus.Start()
227         if err != nil {
228                 return
229         }
230         tileseq, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout)
231         if err != nil {
232                 return
233         }
234         err = stdout.Close()
235         if err != nil {
236                 return
237         }
238         err = consensus.Wait()
239         if err != nil {
240                 err = fmt.Errorf("%s phase %d: bcftools: %s", infile, phase, err)
241                 return
242         }
243         return
244 }