)
type annotatecmd struct {
- dropTiles []bool
- variantHash bool
- maxTileSize int
- tag2tagid map[string]tagID
+ dropTiles []bool
+ variantHash bool
+ maxTileSize int
+ tag2tagid map[string]tagID
+ reportAnnotation func(tag tagID, outcol int, variant tileVariantID, refname string, seqname string, pdi hgvs.Variant)
}
func (cmd *annotatecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
outcol := outcol
refstart, ok := tilestart[tag]
if !ok {
- // Tag didn't place on this
- // reference sequence. (It
- // might place on the same
- // chromosome in a genome
- // anyway, but we don't output
- // the annotations that would
- // result.)
+ // Tag didn't place on this reference
+ // sequence. (It might place on the same
+ // chromosome in a genome anyway, but we don't
+ // output the annotations that would result.)
// outch <- fmt.Sprintf("%d,%d,-1%s\n", tag, outcol, refnamefield)
continue
}
varid = fmt.Sprintf("%d", variant)
}
outch <- fmt.Sprintf("%d,%d,%s%s,%s:g.%s\n", tag, outcol, varid, refnamefield, seqname, diff.String())
+ if cmd.reportAnnotation != nil {
+ cmd.reportAnnotation(tag, outcol, variant, refname, seqname, diff)
+ }
}
}()
}
"net/url"
"os"
"regexp"
- "runtime"
"strings"
"sync"
"time"
subscribedUUID := ""
defer func() {
if subscribedUUID != "" {
+ log.Printf("unsubscribe container UUID: %s", subscribedUUID)
client.Unsubscribe(logch, subscribedUUID)
}
}()
fmt.Fprint(os.Stderr, neednewline)
neednewline = ""
if subscribedUUID != "" {
+ log.Printf("unsubscribe container UUID: %s", subscribedUUID)
client.Unsubscribe(logch, subscribedUUID)
}
+ log.Printf("subscribe container UUID: %s", cr.ContainerUUID)
client.Subscribe(logch, cr.ContainerUUID)
subscribedUUID = cr.ContainerUUID
}
var (
arvadosClientFromEnv = arvados.NewClientFromEnv()
+ keepClient *keepclient.KeepClient
siteFS arvados.CustomFileSystem
siteFSMtx sync.Mutex
)
return nil, err
}
ac.Client = arvados.DefaultSecureClient
- kc := keepclient.New(ac)
+ keepClient = keepclient.New(ac)
// Don't use keepclient's default short timeouts.
- kc.HTTPClient = arvados.DefaultSecureClient
- // Guess max concurrent readers, hope to avoid cache
- // thrashing.
- kc.BlockCache = &keepclient.BlockCache{MaxBlocks: runtime.NumCPU() * 3}
- siteFS = arvadosClientFromEnv.SiteFileSystem(kc)
+ keepClient.HTTPClient = arvados.DefaultSecureClient
+ keepClient.BlockCache = &keepclient.BlockCache{MaxBlocks: 4}
+ siteFS = arvadosClientFromEnv.SiteFileSystem(keepClient)
+ } else {
+ keepClient.BlockCache.MaxBlocks++
}
log.Infof("reading %q from %s using Arvados client", fnm[len(mnt):], uuid)
"-ref=testdata/ref.fasta",
}, &buffer, &output, os.Stderr)
c.Check(exited, check.Equals, 0)
- c.Check(output.String(), check.Equals, `chr1.1_3delinsGGC 1 0
+ c.Check(sortLines(output.String()), check.Equals, sortLines(`chr1.1_3delinsGGC 1 0
chr1.41_42delinsAA 1 0
chr1.161A>T 1 0
chr1.178A>T 1 0
chr2.315C>A 1 0
chr2.470_472del 1 0
chr2.471_472delinsAA 1 0
-`)
+`))
labels, err := ioutil.ReadFile(tmpdir + "/labels.csv")
c.Check(err, check.IsNil)
c.Check(string(labels), check.Equals, `0,"input1","-"
"net/http"
_ "net/http/pprof"
"os"
- "path"
"sort"
"strconv"
"strings"
+ "sync"
+ "sync/atomic"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/arvados/lightning/hgvs"
"github.com/kshedden/gonpy"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
projectUUID := flags.String("project", "", "project `UUID` for output data")
priority := flags.Int("priority", 500, "container request priority")
inputFilename := flags.String("i", "-", "input `file`")
- outputFilename := flags.String("o", "-", "output `file`")
+ outputDir := flags.String("output-dir", "/tmp", "output `directory`")
annotationsFilename := flags.String("output-annotations", "", "output `file` for tile variant annotations csv")
librefsFilename := flags.String("output-onehot2tilevar", "", "when using -one-hot, create csv `file` mapping column# to tag# and variant#")
labelsFilename := flags.String("output-labels", "", "output `file` for genome labels csv")
}
if !*runlocal {
- if *outputFilename != "-" {
- err = errors.New("cannot specify output file in container mode: not implemented")
- return 1
- }
runner := arvadosContainerRunner{
Name: "lightning export-numpy",
Client: arvados.NewClientFromEnv(),
return 1
}
runner.Args = []string{"export-numpy", "-local=true",
+ "-pprof", ":6000",
fmt.Sprintf("-one-hot=%v", *onehot),
"-i", *inputFilename,
- "-o", "/mnt/output/matrix.npy",
+ "-output-dir", "/mnt/output",
"-output-annotations", "/mnt/output/annotations.csv",
"-output-onehot2tilevar", "/mnt/output/onehot2tilevar.csv",
"-output-labels", "/mnt/output/labels.csv",
return 1
}
defer f.Close()
- _, outBasename := path.Split(*outputFilename)
+ outBasename := "matrix.npy"
for i, name := range names {
_, err = fmt.Fprintf(f, "%d,%q,%q\n", i, trimFilenameForLabel(name), outBasename)
if err != nil {
return 1
}
+ annotation2tvs := map[string]map[hgvs.Variant][]tileLibRef{}
if *annotationsFilename != "" {
log.Info("writing annotations")
var annow io.WriteCloser
return 1
}
defer annow.Close()
- err = (&annotatecmd{maxTileSize: 5000, dropTiles: dropTiles}).exportTileDiffs(annow, tilelib)
+ var mtx sync.Mutex
+ err = (&annotatecmd{
+ maxTileSize: 5000,
+ dropTiles: dropTiles,
+ reportAnnotation: func(tag tagID, _ int, variant tileVariantID, refname string, seqname string, pdi hgvs.Variant) {
+ mtx.Lock()
+ defer mtx.Unlock()
+ if annotation2tvs[seqname] == nil {
+ annotation2tvs[seqname] = map[hgvs.Variant][]tileLibRef{}
+ }
+ annotation2tvs[seqname][pdi] = append(annotation2tvs[seqname][pdi], tileLibRef{Tag: tag, Variant: variant})
+ },
+ }).exportTileDiffs(annow, tilelib)
if err != nil {
return 1
}
}
}
+ var lastErr atomic.Value
+ var wg sync.WaitGroup
+ for seqname, pdivars := range annotation2tvs {
+ seqname, pdivars := seqname, pdivars
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ log.Infof("choosing hgvs columns for seq %s", seqname)
+ var pdis []hgvs.Variant
+ for pdi, librefs := range pdivars {
+ // Include this HGVS column if it was
+ // seen in a variant of any
+ // non-dropped tile.
+ for _, libref := range librefs {
+ if int(libref.Tag) >= len(dropTiles) || !dropTiles[libref.Tag] {
+ pdis = append(pdis, pdi)
+ break
+ }
+ }
+ }
+ sort.Slice(pdis, func(i, j int) bool {
+ if cmp := pdis[i].Position - pdis[j].Position; cmp != 0 {
+ return cmp < 0
+ } else if pdis[i].Ref != pdis[j].Ref {
+ return pdis[i].Ref < pdis[j].Ref
+ } else {
+ return pdis[i].New < pdis[j].New
+ }
+ })
+ log.Infof("writing column labels for seq %s", seqname)
+ var buf bytes.Buffer
+ for _, pdi := range pdis {
+ fmt.Fprintf(&buf, "%s:g.%s\n", seqname, pdi.String())
+ }
+ err := ioutil.WriteFile(*outputDir+"/"+seqname+".columns.csv", buf.Bytes(), 0777)
+ if err != nil {
+ lastErr.Store(err)
+ return
+ }
+
+ log.Infof("building hgvs matrix for seq %s", seqname)
+ data := make([]int8, len(names)*len(pdis)*2)
+ for row, name := range names {
+ cg := tilelib.compactGenomes[name]
+ rowstart := row * len(pdis) * 2
+ for col, pdi := range pdis {
+ for _, libref := range pdivars[pdi] {
+ if len(cg) <= int(libref.Tag)*2+1 {
+ continue
+ }
+ for phase := 0; phase < 2; phase++ {
+ if cg[int(libref.Tag)*2+phase] == libref.Variant {
+ data[rowstart+col*2+phase] = 1
+ }
+ }
+ }
+ }
+ }
+ log.Infof("writing hgvs numpy for seq %s", seqname)
+ f, err := os.OpenFile(*outputDir+"/"+seqname+".npy", os.O_CREATE|os.O_WRONLY, 0777)
+ if err != nil {
+ lastErr.Store(err)
+ return
+ }
+ defer f.Close()
+ npw, err := gonpy.NewWriter(f)
+ if err != nil {
+ lastErr.Store(err)
+ return
+ }
+ npw.Shape = []int{len(names), len(pdis) * 2}
+ npw.WriteInt8(data)
+ // gonpy closes f and ignores errors, doh.
+ // err = f.Close()
+ // if err != nil {
+ // lastErr.Store(err)
+ // return
+ // }
+ }()
+ }
+ wg.Wait()
+ if e, ok := lastErr.Load().(error); ok {
+ err = e
+ return 1
+ }
+
chunksize := (len(tilelib.variant) + *chunks - 1) / *chunks
for chunk := 0; chunk < *chunks; chunk++ {
log.Infof("preparing chunk %d of %d", chunk+1, *chunks)
var npw *gonpy.NpyWriter
var output io.WriteCloser
- fnm := *outputFilename
- if fnm == "-" {
- output = nopCloser{stdout}
- } else {
- if *chunks > 1 {
- if strings.HasSuffix(fnm, ".npy") {
- fnm = fmt.Sprintf("%s.%d.npy", fnm[:len(fnm)-4], chunk)
- } else {
- fnm = fmt.Sprintf("%s.%d", fnm, chunk)
- }
- }
- output, err = os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0777)
- if err != nil {
- return 1
- }
- defer output.Close()
+ fnm := *outputDir + "/matrix.npy"
+ if *chunks > 1 {
+ fnm = fmt.Sprintf("%s/matrix.%d.npy", *outputDir, chunk)
+ }
+ output, err = os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0777)
+ if err != nil {
+ return 1
}
+ defer output.Close()
bufw := bufio.NewWriter(output)
npw, err = gonpy.NewWriter(nopCloser{bufw})
if err != nil {
var buffer bytes.Buffer
exited := (&importer{}).RunCommand("import", []string{"-local=true", "-tag-library", "testdata/tags", "-output-tiles", "-save-incomplete-tiles", "testdata/a.1.fasta", "testdata/tinyref.fasta"}, &bytes.Buffer{}, &buffer, os.Stderr)
c.Assert(exited, check.Equals, 0)
- var output bytes.Buffer
- exited = (&exportNumpy{}).RunCommand("export-numpy", []string{"-local=true", "-output-annotations", tmpdir + "/annotations.csv", "-regions", tmpdir + "/chr1-12-100.bed"}, &buffer, &output, os.Stderr)
+ exited = (&exportNumpy{}).RunCommand("export-numpy", []string{"-local=true", "-output-dir", tmpdir, "-output-annotations", tmpdir + "/annotations.csv", "-regions", tmpdir + "/chr1-12-100.bed"}, &buffer, os.Stderr, os.Stderr)
c.Check(exited, check.Equals, 0)
- npy, err := gonpy.NewReader(&output)
+ f, err := os.Open(tmpdir + "/matrix.npy")
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ npy, err := gonpy.NewReader(f)
c.Assert(err, check.IsNil)
variants, err := npy.GetInt16()
c.Assert(err, check.IsNil)
c.Check(code, check.Equals, 0)
c.Check(hgvsout.Len() > 0, check.Equals, true)
c.Logf("%s", hgvsout.String())
- c.Check(hgvsout.String(), check.Equals, `chr1:g.1_3delinsGGC .
+ c.Check(sortLines(hgvsout.String()), check.Equals, sortLines(`chr1:g.1_3delinsGGC .
chr1:g.[41_42delinsAA];[41=] .
chr1:g.[161=];[161A>T] .
chr1:g.[178=];[178A>T] .
chr2:g.[315C>A];[315=] .
chr2:g.[470_472del];[470=] .
chr2:g.[471=];[471_472delinsAA] .
-`)
+`))
vcfout := &bytes.Buffer{}
code = (&exporter{}).RunCommand("lightning export", []string{"-local", "-ref", "testdata/ref.fasta", "-output-format", "vcf", "-i", tmpdir + "/merged.gob", "-output-bed", tmpdir + "/export.bed"}, bytes.NewReader(nil), vcfout, os.Stderr)
c.Check(code, check.Equals, 0)
c.Check(vcfout.Len() > 0, check.Equals, true)
c.Logf("%s", vcfout.String())
- c.Check(vcfout.String(), check.Equals, `chr1 1 NNN GGC 1/1 0/0
+ c.Check(sortLines(vcfout.String()), check.Equals, sortLines(`chr1 1 NNN GGC 1/1 0/0
chr1 41 TT AA 1/0 0/0
chr1 161 A T 0/1 0/0
chr1 178 A T 0/1 0/0
chr2 315 C A 1/0 0/0
chr2 469 GTGG G 1/0 0/0
chr2 471 GG AA 0/1 0/0
-`)
+`))
bedout, err := ioutil.ReadFile(tmpdir + "/export.bed")
c.Check(err, check.IsNil)
c.Logf("%s", string(bedout))
- c.Check(string(bedout), check.Equals, `chr1 0 248 0 1000 . 0 224
+ c.Check(sortLines(string(bedout)), check.Equals, sortLines(`chr1 0 248 0 1000 . 0 224
chr1 224 372 1 1000 . 248 348
chr1 348 496 2 1000 . 372 472
chr1 472 572 3 1000 . 496 572
chr2 224 372 5 750 . 248 348
chr2 348 496 6 1000 . 372 472
chr2 472 572 7 1000 . 496 572
-`)
+`))
annotateout := &bytes.Buffer{}
code = (&annotatecmd{}).RunCommand("lightning annotate", []string{"-local", "-variant-hash=true", "-i", tmpdir + "/merged.gob"}, bytes.NewReader(nil), annotateout, os.Stderr)