type call struct {
tile int
variant int
- sequence []byte
position int
deletion []byte
insertion []byte
}
- var allcalls []*call
+ allcalls := map[string][]*call{}
var mtx sync.Mutex
- throttle := throttle{Max: runtime.GOMAXPROCS(0)}
+ thr := throttle{Max: runtime.GOMAXPROCS(0)}
log.Print("reading input files")
for _, fi := range fis {
if !strings.HasSuffix(fi.Name(), "annotations.csv") {
continue
}
filename := *inputDir + "/" + fi.Name()
- throttle.Acquire()
- go func() {
- defer throttle.Release()
+ thr.Go(func() error {
log.Printf("reading %s", filename)
buf, err := ioutil.ReadFile(filename)
if err != nil {
- throttle.Report(fmt.Errorf("%s: %s", filename, err))
- return
+ return fmt.Errorf("%s: %s", filename, err)
}
lines := bytes.Split(buf, []byte{'\n'})
- calls := make([]*call, 0, len(lines))
+ calls := map[string][]*call{}
for lineIdx, line := range lines {
if len(line) == 0 {
continue
}
- if lineIdx & ^0xfff == 0 && throttle.Err() != nil {
- return
+ if lineIdx & ^0xfff == 0 && thr.Err() != nil {
+ return nil
}
fields := bytes.Split(line, []byte{','})
if len(fields) != 8 {
- throttle.Report(fmt.Errorf("%s line %d: wrong number of fields (%d != %d): %q", fi.Name(), lineIdx+1, len(fields), 8, line))
- return
+ return fmt.Errorf("%s line %d: wrong number of fields (%d != %d): %q", fi.Name(), lineIdx+1, len(fields), 8, line)
}
tile, _ := strconv.ParseInt(string(fields[0]), 10, 64)
variant, _ := strconv.ParseInt(string(fields[2]), 10, 64)
position, _ := strconv.ParseInt(string(fields[5]), 10, 64)
- calls = append(calls, &call{
+ seq := string(fields[4])
+ if calls[seq] == nil {
+ calls[seq] = make([]*call, 0, len(lines)/50)
+ }
+ calls[seq] = append(calls[seq], &call{
tile: int(tile),
variant: int(variant),
- sequence: append([]byte(nil), fields[4]...),
position: int(position),
deletion: append([]byte(nil), fields[6]...),
insertion: append([]byte(nil), fields[7]...),
})
}
mtx.Lock()
- allcalls = append(allcalls, calls...)
+ for seq, seqcalls := range calls {
+ allcalls[seq] = append(allcalls[seq], seqcalls...)
+ }
mtx.Unlock()
- }()
- }
- throttle.Wait()
- if throttle.Err() != nil {
- log.Print(throttle.Err())
- return 1
+ return nil
+ })
}
- log.Print("sorting")
- sort.Slice(allcalls, func(i, j int) bool {
- ii, jj := allcalls[i], allcalls[j]
- if cmp := bytes.Compare(ii.sequence, jj.sequence); cmp != 0 {
- return cmp < 0
- }
- if cmp := ii.position - jj.position; cmp != 0 {
- return cmp < 0
- }
- if cmp := len(ii.deletion) - len(jj.deletion); cmp != 0 {
- return cmp < 0
- }
- if cmp := bytes.Compare(ii.insertion, jj.insertion); cmp != 0 {
- return cmp < 0
- }
- if cmp := ii.tile - jj.tile; cmp != 0 {
- return cmp < 0
- }
- return ii.variant < jj.variant
- })
-
- vcfFilename := *outputDir + "/annotations.vcf"
- log.Printf("writing %s", vcfFilename)
- f, err := os.Create(vcfFilename)
+ err = thr.Wait()
if err != nil {
return 1
}
- defer f.Close()
- bufw := bufio.NewWriterSize(f, 1<<20)
- _, err = fmt.Fprintf(bufw, `##fileformat=VCFv4.0
+ thr = throttle{Max: len(allcalls)}
+ for seq, seqcalls := range allcalls {
+ seq, seqcalls := seq, seqcalls
+ thr.Go(func() error {
+ log.Printf("%s: sorting", seq)
+ sort.Slice(seqcalls, func(i, j int) bool {
+ ii, jj := seqcalls[i], seqcalls[j]
+ if cmp := ii.position - jj.position; cmp != 0 {
+ return cmp < 0
+ }
+ if cmp := len(ii.deletion) - len(jj.deletion); cmp != 0 {
+ return cmp < 0
+ }
+ if cmp := bytes.Compare(ii.insertion, jj.insertion); cmp != 0 {
+ return cmp < 0
+ }
+ if cmp := ii.tile - jj.tile; cmp != 0 {
+ return cmp < 0
+ }
+ return ii.variant < jj.variant
+ })
+
+ vcfFilename := fmt.Sprintf("%s/annotations.%s.vcf", *outputDir, seq)
+ log.Printf("%s: writing %s", seq, vcfFilename)
+
+ f, err := os.Create(vcfFilename)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ bufw := bufio.NewWriterSize(f, 1<<20)
+ _, err = fmt.Fprintf(bufw, `##fileformat=VCFv4.0
##INFO=<ID=TV,Number=.,Type=String,Description="tile-variant">
-#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO
+#CHROM POS ID REF ALT QUAL FILTER INFO
`)
- if err != nil {
- return 1
- }
- placeholder := []byte{'.'}
- for i := 0; i < len(allcalls); {
- call := allcalls[i]
- i++
- info := fmt.Sprintf("TV=,%d-%d,", call.tile, call.variant)
- for i < len(allcalls) &&
- bytes.Equal(call.sequence, allcalls[i].sequence) &&
- call.position == allcalls[i].position &&
- len(call.deletion) == len(allcalls[i].deletion) &&
- bytes.Equal(call.insertion, allcalls[i].insertion) {
- call = allcalls[i]
- i++
- info += fmt.Sprintf("%d-%d,", call.tile, call.variant)
- }
- deletion := call.deletion
- if len(deletion) == 0 {
- deletion = placeholder
- }
- insertion := call.insertion
- if len(insertion) == 0 {
- insertion = placeholder
- }
- _, err = fmt.Fprintf(bufw, "%s\t%d\t.\t%s\t%s\t.\t.\t%s\n", call.sequence, call.position, deletion, insertion, info)
- if err != nil {
- return 1
- }
- }
- err = bufw.Flush()
- if err != nil {
- return 1
+ if err != nil {
+ return err
+ }
+ placeholder := []byte{'.'}
+ for i := 0; i < len(seqcalls); {
+ call := seqcalls[i]
+ i++
+ info := fmt.Sprintf("TV=,%d-%d,", call.tile, call.variant)
+ for i < len(seqcalls) &&
+ call.position == seqcalls[i].position &&
+ len(call.deletion) == len(seqcalls[i].deletion) &&
+ bytes.Equal(call.insertion, seqcalls[i].insertion) {
+ call = seqcalls[i]
+ i++
+ info += fmt.Sprintf("%d-%d,", call.tile, call.variant)
+ }
+ deletion := call.deletion
+ if len(deletion) == 0 {
+ deletion = placeholder
+ }
+ insertion := call.insertion
+ if len(insertion) == 0 {
+ insertion = placeholder
+ }
+ _, err = fmt.Fprintf(bufw, "%s\t%d\t.\t%s\t%s\t.\t.\t%s\n", seq, call.position, deletion, insertion, info)
+ if err != nil {
+ return err
+ }
+ }
+ err = bufw.Flush()
+ if err != nil {
+ return err
+ }
+ err = f.Close()
+ if err != nil {
+ return err
+ }
+ log.Printf("%s: done", seq)
+ return nil
+ })
}
- err = f.Close()
+ err = thr.Wait()
if err != nil {
return 1
}