import (
"bufio"
+ "context"
"errors"
"flag"
"fmt"
"net/http"
_ "net/http/pprof"
"os"
+ "path"
"sort"
+ "strings"
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/kshedden/gonpy"
log "github.com/sirupsen/logrus"
)
-type exportNumpy struct{}
+type exportNumpy struct {
+ filter filter
+}
func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
var err error
priority := flags.Int("priority", 500, "container request priority")
inputFilename := flags.String("i", "-", "input `file`")
outputFilename := flags.String("o", "-", "output `file`")
+ annotationsFilename := flags.String("output-annotations", "", "output `file` for tile variant annotations csv")
+ librefsFilename := flags.String("output-onehot2tilevar", "", "when using -one-hot, create csv `file` mapping column# to tag# and variant#")
+ labelsFilename := flags.String("output-labels", "", "output `file` for genome labels csv")
onehot := flags.Bool("one-hot", false, "recode tile variants as one-hot")
+ cmd.filter.Flags(flags)
err = flags.Parse(args)
if err == flag.ErrHelp {
err = nil
Name: "lightning export-numpy",
Client: arvados.NewClientFromEnv(),
ProjectUUID: *projectUUID,
- RAM: 128000000000,
- VCPUs: 2,
+ RAM: 450000000000,
+ VCPUs: 32,
Priority: *priority,
+ KeepCache: 1,
+ APIAccess: true,
}
err = runner.TranslatePaths(inputFilename)
if err != nil {
return 1
}
- runner.Args = []string{"export-numpy", "-local=true", fmt.Sprintf("-one-hot=%v", *onehot), "-i", *inputFilename, "-o", "/mnt/output/library.npy"}
+ runner.Args = []string{"export-numpy", "-local=true",
+ fmt.Sprintf("-one-hot=%v", *onehot),
+ "-i", *inputFilename,
+ "-o", "/mnt/output/matrix.npy",
+ "-output-annotations", "/mnt/output/annotations.csv",
+ "-output-onehot2tilevar", "/mnt/output/onehot2tilevar.csv",
+ "-output-labels", "/mnt/output/labels.csv",
+ "-max-variants", fmt.Sprintf("%d", cmd.filter.MaxVariants),
+ "-min-coverage", fmt.Sprintf("%f", cmd.filter.MinCoverage),
+ "-max-tag", fmt.Sprintf("%d", cmd.filter.MaxTag),
+ }
var output string
output, err = runner.Run()
if err != nil {
return 1
}
- fmt.Fprintln(stdout, output+"/library.npy")
+ fmt.Fprintln(stdout, output+"/matrix.npy")
return 0
}
if *inputFilename == "-" {
input = ioutil.NopCloser(stdin)
} else {
- input, err = os.Open(*inputFilename)
+ input, err = open(*inputFilename)
if err != nil {
return 1
}
defer input.Close()
}
- cgs, err := ReadCompactGenomes(input)
+ input = ioutil.NopCloser(bufio.NewReaderSize(input, 8*1024*1024))
+ tilelib := &tileLibrary{
+ retainNoCalls: true,
+ retainTileSequences: true,
+ compactGenomes: map[string][]tileVariantID{},
+ }
+ err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil)
if err != nil {
return 1
}
if err != nil {
return 1
}
- sort.Slice(cgs, func(i, j int) bool { return cgs[i].Name < cgs[j].Name })
- out, rows, cols := cgs2array(cgs)
+ log.Info("filtering")
+ cmd.filter.Apply(tilelib)
+ log.Info("tidying")
+ tilelib.Tidy()
+
+ if *annotationsFilename != "" {
+ log.Infof("writing annotations")
+ var annow io.WriteCloser
+ annow, err = os.OpenFile(*annotationsFilename, os.O_CREATE|os.O_WRONLY, 0666)
+ if err != nil {
+ return 1
+ }
+ defer annow.Close()
+ err = (&annotatecmd{maxTileSize: 5000}).exportTileDiffs(annow, tilelib)
+ if err != nil {
+ return 1
+ }
+ err = annow.Close()
+ if err != nil {
+ return 1
+ }
+ }
+
+ log.Info("building numpy array")
+ out, rows, cols, names := cgs2array(tilelib)
+ if *labelsFilename != "" {
+ log.Infof("writing labels to %s", *labelsFilename)
+ var f *os.File
+ f, err = os.OpenFile(*labelsFilename, os.O_CREATE|os.O_WRONLY, 0777)
+ if err != nil {
+ return 1
+ }
+ defer f.Close()
+ _, outBasename := path.Split(*outputFilename)
+ for i, name := range names {
+ _, err = fmt.Fprintf(f, "%d,%q,%q\n", i, trimFilenameForLabel(name), outBasename)
+ if err != nil {
+ err = fmt.Errorf("write %s: %w", *labelsFilename, err)
+ return 1
+ }
+ }
+ err = f.Close()
+ if err != nil {
+ err = fmt.Errorf("close %s: %w", *labelsFilename, err)
+ return 1
+ }
+ }
+
+ log.Info("writing numpy file")
var output io.WriteCloser
if *outputFilename == "-" {
output = nopCloser{stdout}
return 1
}
if *onehot {
- out, cols = recodeOnehot(out, cols)
+ log.Info("recoding to onehot")
+ recoded, librefs, recodedcols := recodeOnehot(out, cols)
+ out, cols = recoded, recodedcols
+ if *librefsFilename != "" {
+ log.Infof("writing onehot column mapping")
+ err = cmd.writeLibRefs(*librefsFilename, tilelib, librefs)
+ if err != nil {
+ return 1
+ }
+ }
}
+ log.Info("writing numpy")
npw.Shape = []int{rows, cols}
- npw.WriteUint16(out)
+ npw.WriteInt16(out)
err = bufw.Flush()
if err != nil {
return 1
return 0
}
-func cgs2array(cgs []CompactGenome) (data []uint16, rows, cols int) {
- rows = len(cgs)
- for _, cg := range cgs {
- if cols < len(cg.Variants) {
- cols = len(cg.Variants)
+func (*exportNumpy) writeLibRefs(fnm string, tilelib *tileLibrary, librefs []tileLibRef) error {
+ f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0666)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ for i, libref := range librefs {
+ _, err = fmt.Fprintf(f, "%d,%d,%d\n", i, libref.Tag, libref.Variant)
+ if err != nil {
+ return err
}
}
- data = make([]uint16, rows*cols)
- for row, cg := range cgs {
- for i, v := range cg.Variants {
- data[row*cols+i] = uint16(v)
+ return f.Close()
+}
+
+func cgs2array(tilelib *tileLibrary) (data []int16, rows, cols int, cgnames []string) {
+ for name := range tilelib.compactGenomes {
+ cgnames = append(cgnames, name)
+ }
+ sort.Slice(cgnames, func(i, j int) bool {
+ return trimFilenameForLabel(cgnames[i]) < trimFilenameForLabel(cgnames[j])
+ })
+
+ rows = len(tilelib.compactGenomes)
+ for _, cg := range tilelib.compactGenomes {
+ if cols < len(cg) {
+ cols = len(cg)
+ }
+ }
+
+ // flag low-quality tile variants so we can change to -1 below
+ lowqual := make([]map[tileVariantID]bool, cols/2)
+ for tag, variants := range tilelib.variant {
+ lq := lowqual[tag]
+ for varidx, hash := range variants {
+ if len(tilelib.seq[hash]) == 0 {
+ if lq == nil {
+ lq = map[tileVariantID]bool{}
+ lowqual[tag] = lq
+ }
+ lq[tileVariantID(varidx+1)] = true
+ }
}
}
+
+ data = make([]int16, rows*cols)
+ for row, name := range cgnames {
+ for i, v := range tilelib.compactGenomes[name] {
+ if v > 0 && lowqual[i/2][v] {
+ data[row*cols+i] = -1
+ } else {
+ data[row*cols+i] = int16(v)
+ }
+ }
+ }
+
return
}
-func recodeOnehot(in []uint16, incols int) ([]uint16, int) {
+func recodeOnehot(in []int16, incols int) (out []int16, librefs []tileLibRef, outcols int) {
rows := len(in) / incols
- maxvalue := make([]uint16, incols)
+ maxvalue := make([]int16, incols)
for row := 0; row < rows; row++ {
for col := 0; col < incols; col++ {
if v := in[row*incols+col]; maxvalue[col] < v {
}
}
outcol := make([]int, incols)
- outcols := 0
- for incol, v := range maxvalue {
+ dropped := 0
+ for incol, maxv := range maxvalue {
outcol[incol] = outcols
- outcols += int(v)
+ if maxv == 0 {
+ dropped++
+ }
+ for v := 1; v <= int(maxv); v++ {
+ librefs = append(librefs, tileLibRef{Tag: tagID(incol), Variant: tileVariantID(v)})
+ outcols++
+ }
}
- out := make([]uint16, rows*outcols)
- for row := 0; row < rows; row++ {
+ log.Printf("recodeOnehot: dropped %d input cols with zero maxvalue", dropped)
+
+ out = make([]int16, rows*outcols)
+ for inidx, row := 0, 0; row < rows; row++ {
+ outrow := out[row*outcols:]
for col := 0; col < incols; col++ {
- if v := in[row*incols+col]; v > 0 {
- out[row*outcols+outcol[col]+int(v)-1] = 1
+ if v := in[inidx]; v > 0 {
+ outrow[outcol[col]+int(v)-1] = 1
}
+ inidx++
}
}
- return out, outcols
+ return
}
type nopCloser struct {
}
func (nopCloser) Close() error { return nil }
+
+func trimFilenameForLabel(s string) string {
+ if i := strings.LastIndex(s, "/"); i >= 0 {
+ s = s[i+1:]
+ }
+ s = strings.TrimSuffix(s, ".gz")
+ s = strings.TrimSuffix(s, ".fa")
+ s = strings.TrimSuffix(s, ".fasta")
+ s = strings.TrimSuffix(s, ".1")
+ s = strings.TrimSuffix(s, ".2")
+ s = strings.TrimSuffix(s, ".gz")
+ s = strings.TrimSuffix(s, ".vcf")
+ return s
+}