1 // Copyright (C) The Lightning Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 log "github.com/sirupsen/logrus"
26 type anno2vcf struct {
29 func (cmd *anno2vcf) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
33 fmt.Fprintf(stderr, "%s\n", err)
36 flags := flag.NewFlagSet("", flag.ContinueOnError)
37 flags.SetOutput(stderr)
38 pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
39 runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
40 projectUUID := flags.String("project", "", "project `UUID` for output data")
41 priority := flags.Int("priority", 500, "container request priority")
42 inputDir := flags.String("input-dir", "./in", "input `directory`")
43 outputDir := flags.String("output-dir", "./out", "output `directory`")
44 err = flags.Parse(args)
45 if err == flag.ErrHelp {
48 } else if err != nil {
50 } else if flags.NArg() > 0 {
51 err = fmt.Errorf("errant command line arguments after parsed flags: %v", flags.Args())
57 log.Println(http.ListenAndServe(*pprof, nil))
62 runner := arvadosContainerRunner{
63 Name: "lightning anno2vcf",
64 Client: arvados.NewClientFromEnv(),
65 ProjectUUID: *projectUUID,
72 err = runner.TranslatePaths(inputDir)
76 runner.Args = []string{"anno2vcf", "-local=true",
78 "-input-dir", *inputDir,
79 "-output-dir", "/mnt/output",
82 output, err = runner.Run()
86 fmt.Fprintln(stdout, output)
90 d, err := open(*inputDir)
96 fis, err := d.Readdir(-1)
102 sort.Slice(fis, func(i, j int) bool { return fis[i].Name() < fis[j].Name() })
112 allcalls := map[string][]*call{}
114 thr := throttle{Max: runtime.GOMAXPROCS(0)}
115 log.Print("reading input files")
116 for _, fi := range fis {
117 if !strings.HasSuffix(fi.Name(), "annotations.csv") {
120 filename := *inputDir + "/" + fi.Name()
121 thr.Go(func() error {
122 log.Printf("reading %s", filename)
123 f, err := open(filename)
128 buf, err := io.ReadAll(f)
130 return fmt.Errorf("%s: %s", filename, err)
133 lines := bytes.Split(buf, []byte{'\n'})
134 calls := map[string][]*call{}
135 for lineIdx, line := range lines {
139 if lineIdx&0xff == 0 && thr.Err() != nil {
142 fields := bytes.Split(line, []byte{','})
144 return fmt.Errorf("%s line %d: wrong number of fields (%d < %d): %q", fi.Name(), lineIdx+1, len(fields), 8, line)
148 // "=" reference or ""
149 // non-diffable tile variant
152 tile, _ := strconv.ParseInt(string(fields[0]), 10, 64)
153 variant, _ := strconv.ParseInt(string(fields[2]), 10, 64)
154 position, _ := strconv.ParseInt(string(fields[5]), 10, 64)
155 seq := string(fields[4])
156 if calls[seq] == nil {
157 calls[seq] = make([]*call, 0, len(lines)/50)
161 if (len(del) == 0 || len(ins) == 0) && len(fields) >= 9 {
162 // "123,,AA,T" means 123insAA
163 // preceded by T. We record it
164 // here as "122 T TAA" to
165 // avoid writing an empty
166 // "ref" field in our
167 // VCF. Similarly, we record
168 // deletions as "122 TAA T"
169 // rather than "123 AA .".
170 del = append(append(make([]byte, 0, len(fields[8])+len(del)), fields[8]...), del...)
171 ins = append(append(make([]byte, 0, len(fields[8])+len(ins)), fields[8]...), ins...)
172 position -= int64(len(fields[8]))
174 del = append([]byte(nil), del...)
175 ins = append([]byte(nil), ins...)
177 calls[seq] = append(calls[seq], &call{
179 variant: int(variant),
180 position: int(position),
187 for seq, seqcalls := range calls {
188 allcalls[seq] = append(allcalls[seq], seqcalls...)
198 thr = throttle{Max: len(allcalls)}
199 for seq, seqcalls := range allcalls {
200 seq, seqcalls := seq, seqcalls
201 thr.Go(func() error {
202 log.Printf("%s: sorting", seq)
203 sort.Slice(seqcalls, func(i, j int) bool {
204 ii, jj := seqcalls[i], seqcalls[j]
205 if cmp := ii.position - jj.position; cmp != 0 {
208 if cmp := len(ii.deletion) - len(jj.deletion); cmp != 0 {
211 if cmp := bytes.Compare(ii.insertion, jj.insertion); cmp != 0 {
214 if cmp := ii.tile - jj.tile; cmp != 0 {
217 return ii.variant < jj.variant
220 vcfFilename := fmt.Sprintf("%s/annotations.%s.vcf", *outputDir, seq)
221 log.Printf("%s: writing %s", seq, vcfFilename)
223 f, err := os.Create(vcfFilename)
228 bufw := bufio.NewWriterSize(f, 1<<20)
229 _, err = fmt.Fprintf(bufw, `##fileformat=VCFv4.0
230 ##INFO=<ID=TV,Number=.,Type=String,Description="tile-variant">
231 #CHROM POS ID REF ALT QUAL FILTER INFO
236 placeholder := []byte{'.'}
237 for i := 0; i < len(seqcalls); {
240 info := fmt.Sprintf("TV=,%d-%d,", call.tile, call.variant)
241 for i < len(seqcalls) &&
242 call.position == seqcalls[i].position &&
243 len(call.deletion) == len(seqcalls[i].deletion) &&
244 bytes.Equal(call.insertion, seqcalls[i].insertion) {
247 info += fmt.Sprintf("%d-%d,", call.tile, call.variant)
249 deletion := call.deletion
250 if len(deletion) == 0 {
251 deletion = placeholder
253 insertion := call.insertion
254 if len(insertion) == 0 {
255 insertion = placeholder
257 _, err = fmt.Fprintf(bufw, "%s\t%d\t%s\t%s\t%s\t.\t.\t%s\n", seq, call.position, call.hgvsID, deletion, insertion, info)
270 log.Printf("%s: done", seq)