9f28d3b6762cee5b8a0af9e01c8dc6a487524e1e
[lightning.git] / anno2vcf.go
1 // Copyright (C) The Lightning Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lightning
6
7 import (
8         "bufio"
9         "bytes"
10         "flag"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "net/http"
15         _ "net/http/pprof"
16         "os"
17         "runtime"
18         "sort"
19         "strconv"
20         "strings"
21         "sync"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         log "github.com/sirupsen/logrus"
25 )
26
27 type anno2vcf struct {
28 }
29
30 func (cmd *anno2vcf) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
31         var err error
32         defer func() {
33                 if err != nil {
34                         fmt.Fprintf(stderr, "%s\n", err)
35                 }
36         }()
37         flags := flag.NewFlagSet("", flag.ContinueOnError)
38         flags.SetOutput(stderr)
39         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
40         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
41         projectUUID := flags.String("project", "", "project `UUID` for output data")
42         priority := flags.Int("priority", 500, "container request priority")
43         inputDir := flags.String("input-dir", "./in", "input `directory`")
44         outputDir := flags.String("output-dir", "./out", "output `directory`")
45         err = flags.Parse(args)
46         if err == flag.ErrHelp {
47                 err = nil
48                 return 0
49         } else if err != nil {
50                 return 2
51         }
52
53         if *pprof != "" {
54                 go func() {
55                         log.Println(http.ListenAndServe(*pprof, nil))
56                 }()
57         }
58
59         if !*runlocal {
60                 runner := arvadosContainerRunner{
61                         Name:        "lightning anno2vcf",
62                         Client:      arvados.NewClientFromEnv(),
63                         ProjectUUID: *projectUUID,
64                         RAM:         500000000000,
65                         VCPUs:       64,
66                         Priority:    *priority,
67                         KeepCache:   2,
68                         APIAccess:   true,
69                 }
70                 err = runner.TranslatePaths(inputDir)
71                 if err != nil {
72                         return 1
73                 }
74                 runner.Args = []string{"anno2vcf", "-local=true",
75                         "-pprof", ":6060",
76                         "-input-dir", *inputDir,
77                         "-output-dir", "/mnt/output",
78                 }
79                 var output string
80                 output, err = runner.Run()
81                 if err != nil {
82                         return 1
83                 }
84                 fmt.Fprintln(stdout, output)
85                 return 0
86         }
87
88         d, err := open(*inputDir)
89         if err != nil {
90                 log.Print(err)
91                 return 1
92         }
93         defer d.Close()
94         fis, err := d.Readdir(-1)
95         if err != nil {
96                 log.Print(err)
97                 return 1
98         }
99         d.Close()
100
101         type call struct {
102                 tile      int
103                 variant   int
104                 sequence  []byte
105                 position  int
106                 deletion  []byte
107                 insertion []byte
108         }
109         var allcalls []*call
110         var mtx sync.Mutex
111         throttle := throttle{Max: runtime.GOMAXPROCS(0)}
112         log.Print("reading input files")
113         for _, fi := range fis {
114                 if !strings.HasSuffix(fi.Name(), "annotations.csv") {
115                         continue
116                 }
117                 filename := *inputDir + "/" + fi.Name()
118                 throttle.Acquire()
119                 go func() {
120                         defer throttle.Release()
121                         log.Printf("reading %s", filename)
122                         buf, err := ioutil.ReadFile(filename)
123                         if err != nil {
124                                 throttle.Report(fmt.Errorf("%s: %s", filename, err))
125                                 return
126                         }
127                         lines := bytes.Split(buf, []byte{'\n'})
128                         calls := make([]*call, 0, len(lines))
129                         for lineIdx, line := range lines {
130                                 if len(line) == 0 {
131                                         continue
132                                 }
133                                 if lineIdx & ^0xfff == 0 && throttle.Err() != nil {
134                                         return
135                                 }
136                                 fields := bytes.Split(line, []byte{','})
137                                 if len(fields) != 8 {
138                                         throttle.Report(fmt.Errorf("%s line %d: wrong number of fields (%d != %d): %q", fi.Name(), lineIdx+1, len(fields), 8, line))
139                                         return
140                                 }
141                                 tile, _ := strconv.ParseInt(string(fields[0]), 10, 64)
142                                 variant, _ := strconv.ParseInt(string(fields[2]), 10, 64)
143                                 position, _ := strconv.ParseInt(string(fields[5]), 10, 64)
144                                 calls = append(calls, &call{
145                                         tile:      int(tile),
146                                         variant:   int(variant),
147                                         sequence:  append([]byte(nil), fields[4]...),
148                                         position:  int(position),
149                                         deletion:  append([]byte(nil), fields[6]...),
150                                         insertion: append([]byte(nil), fields[7]...),
151                                 })
152                         }
153                         mtx.Lock()
154                         allcalls = append(allcalls, calls...)
155                         mtx.Unlock()
156                 }()
157         }
158         throttle.Wait()
159         if throttle.Err() != nil {
160                 log.Print(throttle.Err())
161                 return 1
162         }
163         log.Print("sorting")
164         sort.Slice(allcalls, func(i, j int) bool {
165                 ii, jj := allcalls[i], allcalls[j]
166                 if cmp := bytes.Compare(ii.sequence, jj.sequence); cmp != 0 {
167                         return cmp < 0
168                 }
169                 if cmp := ii.position - jj.position; cmp != 0 {
170                         return cmp < 0
171                 }
172                 if cmp := len(ii.deletion) - len(jj.deletion); cmp != 0 {
173                         return cmp < 0
174                 }
175                 if cmp := bytes.Compare(ii.insertion, jj.insertion); cmp != 0 {
176                         return cmp < 0
177                 }
178                 if cmp := ii.tile - jj.tile; cmp != 0 {
179                         return cmp < 0
180                 }
181                 return ii.variant < jj.variant
182         })
183
184         vcfFilename := *outputDir + "/annotations.vcf"
185         log.Printf("writing %s", vcfFilename)
186         f, err := os.Create(vcfFilename)
187         if err != nil {
188                 return 1
189         }
190         defer f.Close()
191         bufw := bufio.NewWriterSize(f, 1<<20)
192         _, err = fmt.Fprintf(bufw, `##fileformat=VCFv4.0
193 ##INFO=<ID=TV,Number=.,Type=String,Description="tile-variant">
194 #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO
195 `)
196         if err != nil {
197                 return 1
198         }
199         placeholder := []byte{'.'}
200         for i := 0; i < len(allcalls); {
201                 call := allcalls[i]
202                 i++
203                 info := fmt.Sprintf("TV=,%d-%d,", call.tile, call.variant)
204                 for i < len(allcalls) &&
205                         bytes.Equal(call.sequence, allcalls[i].sequence) &&
206                         call.position == allcalls[i].position &&
207                         len(call.deletion) == len(allcalls[i].deletion) &&
208                         bytes.Equal(call.insertion, allcalls[i].insertion) {
209                         call = allcalls[i]
210                         i++
211                         info += fmt.Sprintf("%d-%d,", call.tile, call.variant)
212                 }
213                 deletion := call.deletion
214                 if len(deletion) == 0 {
215                         deletion = placeholder
216                 }
217                 insertion := call.insertion
218                 if len(insertion) == 0 {
219                         insertion = placeholder
220                 }
221                 _, err = fmt.Fprintf(bufw, "%s\t%d\t.\t%s\t%s\t.\t.\t%s\n", call.sequence, call.position, deletion, insertion, info)
222                 if err != nil {
223                         return 1
224                 }
225         }
226         err = bufw.Flush()
227         if err != nil {
228                 return 1
229         }
230         err = f.Close()
231         if err != nil {
232                 return 1
233         }
234         return 0
235 }