20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "github.com/kshedden/gonpy"
22 "github.com/sirupsen/logrus"
23 log "github.com/sirupsen/logrus"
26 type exportNumpy struct {
30 func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
34 fmt.Fprintf(stderr, "%s\n", err)
37 flags := flag.NewFlagSet("", flag.ContinueOnError)
38 flags.SetOutput(stderr)
39 pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
40 runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
41 projectUUID := flags.String("project", "", "project `UUID` for output data")
42 priority := flags.Int("priority", 500, "container request priority")
43 inputFilename := flags.String("i", "-", "input `file`")
44 outputFilename := flags.String("o", "-", "output `file`")
45 annotationsFilename := flags.String("output-annotations", "", "output `file` for tile variant annotations csv")
46 librefsFilename := flags.String("output-onehot2tilevar", "", "when using -one-hot, create csv `file` mapping column# to tag# and variant#")
47 labelsFilename := flags.String("output-labels", "", "output `file` for genome labels csv")
48 regionsFilename := flags.String("regions", "", "only output columns/annotations that intersect regions in specified bed `file`")
49 expandRegions := flags.Int("expand-regions", 0, "expand specified regions by `N` base pairs on each side`")
50 onehot := flags.Bool("one-hot", false, "recode tile variants as one-hot")
51 chunks := flags.Int("chunks", 1, "split output into `N` numpy files")
52 cmd.filter.Flags(flags)
53 err = flags.Parse(args)
54 if err == flag.ErrHelp {
57 } else if err != nil {
63 log.Println(http.ListenAndServe(*pprof, nil))
68 if *outputFilename != "-" {
69 err = errors.New("cannot specify output file in container mode: not implemented")
72 runner := arvadosContainerRunner{
73 Name: "lightning export-numpy",
74 Client: arvados.NewClientFromEnv(),
75 ProjectUUID: *projectUUID,
82 err = runner.TranslatePaths(inputFilename, regionsFilename)
86 runner.Args = []string{"export-numpy", "-local=true",
87 fmt.Sprintf("-one-hot=%v", *onehot),
89 "-o", "/mnt/output/matrix.npy",
90 "-output-annotations", "/mnt/output/annotations.csv",
91 "-output-onehot2tilevar", "/mnt/output/onehot2tilevar.csv",
92 "-output-labels", "/mnt/output/labels.csv",
93 "-regions", *regionsFilename,
94 "-expand-regions", fmt.Sprintf("%d", *expandRegions),
95 "-max-variants", fmt.Sprintf("%d", cmd.filter.MaxVariants),
96 "-min-coverage", fmt.Sprintf("%f", cmd.filter.MinCoverage),
97 "-max-tag", fmt.Sprintf("%d", cmd.filter.MaxTag),
98 "-chunks", fmt.Sprintf("%d", *chunks),
101 output, err = runner.Run()
105 fmt.Fprintln(stdout, output+"/matrix.npy")
109 var input io.ReadCloser
110 if *inputFilename == "-" {
111 input = ioutil.NopCloser(stdin)
113 input, err = open(*inputFilename)
119 input = ioutil.NopCloser(bufio.NewReaderSize(input, 8*1024*1024))
120 tilelib := &tileLibrary{
122 retainTileSequences: true,
123 compactGenomes: map[string][]tileVariantID{},
125 err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil)
134 log.Info("filtering")
135 cmd.filter.Apply(tilelib)
139 log.Info("building lowqual map")
140 lowqual := lowqual(tilelib)
141 names := cgnames(tilelib)
143 if *labelsFilename != "" {
144 log.Infof("writing labels to %s", *labelsFilename)
146 f, err = os.OpenFile(*labelsFilename, os.O_CREATE|os.O_WRONLY, 0777)
151 _, outBasename := path.Split(*outputFilename)
152 for i, name := range names {
153 _, err = fmt.Fprintf(f, "%d,%q,%q\n", i, trimFilenameForLabel(name), outBasename)
155 err = fmt.Errorf("write %s: %w", *labelsFilename, err)
161 err = fmt.Errorf("close %s: %w", *labelsFilename, err)
166 log.Info("determining which tiles intersect given regions")
167 dropTiles, err := chooseTiles(tilelib, *regionsFilename, *expandRegions)
172 if *annotationsFilename != "" {
173 log.Info("writing annotations")
174 var annow io.WriteCloser
175 annow, err = os.OpenFile(*annotationsFilename, os.O_CREATE|os.O_WRONLY, 0666)
180 err = (&annotatecmd{maxTileSize: 5000, dropTiles: dropTiles}).exportTileDiffs(annow, tilelib)
190 chunksize := (len(tilelib.variant) + *chunks - 1) / *chunks
191 for chunk := 0; chunk < *chunks; chunk++ {
192 log.Infof("preparing chunk %d of %d", chunk+1, *chunks)
193 tagstart := chunk * chunksize
194 tagend := tagstart + chunksize
195 if tagend > len(tilelib.variant) {
196 tagend = len(tilelib.variant)
198 out, rows, cols := cgs2array(tilelib, names, lowqual, dropTiles, tagstart, tagend)
200 var npw *gonpy.NpyWriter
201 var output io.WriteCloser
202 fnm := *outputFilename
204 output = nopCloser{stdout}
207 if strings.HasSuffix(fnm, ".npy") {
208 fnm = fmt.Sprintf("%s.%d.npy", fnm[:len(fnm)-4], chunk)
210 fnm = fmt.Sprintf("%s.%d", fnm, chunk)
213 output, err = os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0777)
219 bufw := bufio.NewWriter(output)
220 npw, err = gonpy.NewWriter(nopCloser{bufw})
225 log.Info("recoding to onehot")
226 recoded, librefs, recodedcols := recodeOnehot(out, cols)
227 out, cols = recoded, recodedcols
228 if *librefsFilename != "" {
229 log.Infof("writing onehot column mapping")
230 err = cmd.writeLibRefs(*librefsFilename, tilelib, librefs)
236 log.WithFields(logrus.Fields{
240 }).Info("writing numpy")
241 npw.Shape = []int{rows, cols}
255 func (*exportNumpy) writeLibRefs(fnm string, tilelib *tileLibrary, librefs []tileLibRef) error {
256 f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0666)
261 for i, libref := range librefs {
262 _, err = fmt.Fprintf(f, "%d,%d,%d\n", i, libref.Tag, libref.Variant)
270 func cgnames(tilelib *tileLibrary) (cgnames []string) {
271 for name := range tilelib.compactGenomes {
272 cgnames = append(cgnames, name)
274 sort.Slice(cgnames, func(i, j int) bool {
275 return trimFilenameForLabel(cgnames[i]) < trimFilenameForLabel(cgnames[j])
280 func lowqual(tilelib *tileLibrary) (lowqual []map[tileVariantID]bool) {
281 lowqual = make([]map[tileVariantID]bool, len(tilelib.variant))
282 for tag, variants := range tilelib.variant {
284 for varidx, hash := range variants {
285 if len(tilelib.seq[hash]) == 0 {
287 lq = map[tileVariantID]bool{}
290 lq[tileVariantID(varidx+1)] = true
297 func cgs2array(tilelib *tileLibrary, names []string, lowqual []map[tileVariantID]bool, dropTiles []bool, tagstart, tagend int) (data []int16, rows, cols int) {
298 rows = len(tilelib.compactGenomes)
299 for tag := tagstart; tag < tagend; tag++ {
300 if len(dropTiles) <= tag || !dropTiles[tag] {
304 data = make([]int16, rows*cols)
305 for row, name := range names {
306 cg := tilelib.compactGenomes[name]
308 for tag := tagstart; tag < tagend && tag*2+1 < len(cg); tag++ {
309 if len(dropTiles) > tag && dropTiles[tag] {
312 for phase := 0; phase < 2; phase++ {
314 if v > 0 && lowqual[tag][v] {
315 data[row*cols+outidx] = -1
317 data[row*cols+outidx] = int16(v)
326 func chooseTiles(tilelib *tileLibrary, regionsFilename string, expandRegions int) (drop []bool, err error) {
327 if regionsFilename == "" {
330 rfile, err := zopen(regionsFilename)
335 regions, err := ioutil.ReadAll(rfile)
340 log.Print("chooseTiles: building mask")
342 for _, line := range bytes.Split(regions, []byte{'\n'}) {
343 if bytes.HasPrefix(line, []byte{'#'}) {
346 fields := bytes.Split(line, []byte{'\t'})
350 refseqname := string(fields[0])
351 if strings.HasPrefix(refseqname, "chr") {
352 refseqname = refseqname[3:]
354 start, err1 := strconv.Atoi(string(fields[1]))
355 end, err2 := strconv.Atoi(string(fields[2]))
356 if err1 == nil && err2 == nil {
359 start, err1 = strconv.Atoi(string(fields[3]))
360 end, err2 = strconv.Atoi(string(fields[4]))
361 if err1 == nil && err2 == nil {
365 err = fmt.Errorf("cannot parse input line as BED or GFF/GTF: %q", line)
369 mask.Add(refseqname, start-expandRegions, end+expandRegions)
371 log.Print("chooseTiles: mask.Freeze")
374 tagset := tilelib.taglib.Tags()
375 if len(tagset) == 0 {
376 err = errors.New("cannot choose tiles by region in a library without tags")
379 taglen := len(tagset[0])
381 log.Print("chooseTiles: check ref tiles")
382 // Find position+size of each reference tile, and if it
383 // intersects any of the desired regions, set drop[tag]=false.
385 // (Note it doesn't quite work to do the more obvious thing --
386 // start with drop=false and change to true when ref tiles
387 // intersect target regions -- because that would give us
388 // drop=false for tiles that don't appear at all in the
391 // TODO: (optionally?) don't drop tags for which some tile
392 // variants are spanning tiles, i.e., where the reference tile
393 // does not intersect the desired regions, but a spanning tile
394 // from a genome does.
395 drop = make([]bool, len(tilelib.variant))
396 for i := range drop {
399 for refname, refseqs := range tilelib.refseqs {
400 for refseqname, reftiles := range refseqs {
401 if strings.HasPrefix(refseqname, "chr") {
402 refseqname = refseqname[3:]
405 for _, libref := range reftiles {
406 if libref.Variant < 1 {
407 err = fmt.Errorf("reference %q seq %q uses variant zero at tag %d", refname, refseqname, libref.Tag)
410 seq := tilelib.TileVariantSequence(libref)
411 if len(seq) < taglen {
412 err = fmt.Errorf("reference %q seq %q uses tile %d variant %d with sequence len %d < taglen %d", refname, refseqname, libref.Tag, libref.Variant, len(seq), taglen)
416 tileend = tilestart + len(seq) - taglen
417 if mask.Check(refseqname, tilestart, tileend) {
418 drop[libref.Tag] = false
424 log.Print("chooseTiles: done")
428 func recodeOnehot(in []int16, incols int) (out []int16, librefs []tileLibRef, outcols int) {
429 rows := len(in) / incols
430 maxvalue := make([]int16, incols)
431 for row := 0; row < rows; row++ {
432 for col := 0; col < incols; col++ {
433 if v := in[row*incols+col]; maxvalue[col] < v {
438 outcol := make([]int, incols)
440 for incol, maxv := range maxvalue {
441 outcol[incol] = outcols
445 for v := 1; v <= int(maxv); v++ {
446 librefs = append(librefs, tileLibRef{Tag: tagID(incol), Variant: tileVariantID(v)})
450 log.Printf("recodeOnehot: dropped %d input cols with zero maxvalue", dropped)
452 out = make([]int16, rows*outcols)
453 for inidx, row := 0, 0; row < rows; row++ {
454 outrow := out[row*outcols:]
455 for col := 0; col < incols; col++ {
456 if v := in[inidx]; v > 0 {
457 outrow[outcol[col]+int(v)-1] = 1
465 type nopCloser struct {
469 func (nopCloser) Close() error { return nil }
471 func trimFilenameForLabel(s string) string {
472 if i := strings.LastIndex(s, "/"); i >= 0 {
475 s = strings.TrimSuffix(s, ".gz")
476 s = strings.TrimSuffix(s, ".fa")
477 s = strings.TrimSuffix(s, ".fasta")
478 s = strings.TrimSuffix(s, ".1")
479 s = strings.TrimSuffix(s, ".2")
480 s = strings.TrimSuffix(s, ".gz")
481 s = strings.TrimSuffix(s, ".vcf")