18 "git.arvados.org/arvados.git/sdk/go/arvados"
19 "github.com/kshedden/gonpy"
20 "github.com/sirupsen/logrus"
21 log "github.com/sirupsen/logrus"
24 type exportNumpy struct {
28 func (cmd *exportNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
32 fmt.Fprintf(stderr, "%s\n", err)
35 flags := flag.NewFlagSet("", flag.ContinueOnError)
36 flags.SetOutput(stderr)
37 pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
38 runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
39 projectUUID := flags.String("project", "", "project `UUID` for output data")
40 priority := flags.Int("priority", 500, "container request priority")
41 inputFilename := flags.String("i", "-", "input `file`")
42 outputFilename := flags.String("o", "-", "output `file`")
43 annotationsFilename := flags.String("output-annotations", "", "output `file` for tile variant annotations csv")
44 librefsFilename := flags.String("output-onehot2tilevar", "", "when using -one-hot, create csv `file` mapping column# to tag# and variant#")
45 labelsFilename := flags.String("output-labels", "", "output `file` for genome labels csv")
46 onehot := flags.Bool("one-hot", false, "recode tile variants as one-hot")
47 chunks := flags.Int("chunks", 1, "split output into `N` numpy files")
48 cmd.filter.Flags(flags)
49 err = flags.Parse(args)
50 if err == flag.ErrHelp {
53 } else if err != nil {
59 log.Println(http.ListenAndServe(*pprof, nil))
64 if *outputFilename != "-" {
65 err = errors.New("cannot specify output file in container mode: not implemented")
68 runner := arvadosContainerRunner{
69 Name: "lightning export-numpy",
70 Client: arvados.NewClientFromEnv(),
71 ProjectUUID: *projectUUID,
78 err = runner.TranslatePaths(inputFilename)
82 runner.Args = []string{"export-numpy", "-local=true",
83 fmt.Sprintf("-one-hot=%v", *onehot),
85 "-o", "/mnt/output/matrix.npy",
86 "-output-annotations", "/mnt/output/annotations.csv",
87 "-output-onehot2tilevar", "/mnt/output/onehot2tilevar.csv",
88 "-output-labels", "/mnt/output/labels.csv",
89 "-max-variants", fmt.Sprintf("%d", cmd.filter.MaxVariants),
90 "-min-coverage", fmt.Sprintf("%f", cmd.filter.MinCoverage),
91 "-max-tag", fmt.Sprintf("%d", cmd.filter.MaxTag),
92 "-chunks", fmt.Sprintf("%d", *chunks),
95 output, err = runner.Run()
99 fmt.Fprintln(stdout, output+"/matrix.npy")
103 var input io.ReadCloser
104 if *inputFilename == "-" {
105 input = ioutil.NopCloser(stdin)
107 input, err = open(*inputFilename)
113 input = ioutil.NopCloser(bufio.NewReaderSize(input, 8*1024*1024))
114 tilelib := &tileLibrary{
116 retainTileSequences: true,
117 compactGenomes: map[string][]tileVariantID{},
119 err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil)
128 log.Info("filtering")
129 cmd.filter.Apply(tilelib)
133 if *annotationsFilename != "" {
134 log.Infof("writing annotations")
135 var annow io.WriteCloser
136 annow, err = os.OpenFile(*annotationsFilename, os.O_CREATE|os.O_WRONLY, 0666)
141 err = (&annotatecmd{maxTileSize: 5000}).exportTileDiffs(annow, tilelib)
151 log.Info("building lowqual map")
152 lowqual := lowqual(tilelib)
153 names := cgnames(tilelib)
155 if *labelsFilename != "" {
156 log.Infof("writing labels to %s", *labelsFilename)
158 f, err = os.OpenFile(*labelsFilename, os.O_CREATE|os.O_WRONLY, 0777)
163 _, outBasename := path.Split(*outputFilename)
164 for i, name := range names {
165 _, err = fmt.Fprintf(f, "%d,%q,%q\n", i, trimFilenameForLabel(name), outBasename)
167 err = fmt.Errorf("write %s: %w", *labelsFilename, err)
173 err = fmt.Errorf("close %s: %w", *labelsFilename, err)
178 chunksize := (len(tilelib.variant) + *chunks - 1) / *chunks
179 for chunk := 0; chunk < *chunks; chunk++ {
180 log.Infof("preparing chunk %d of %d", chunk+1, *chunks+1)
181 tagstart := chunk * chunksize
182 tagend := tagstart + chunksize
183 if tagend > len(tilelib.variant) {
184 tagend = len(tilelib.variant)
186 out, rows, cols := cgs2array(tilelib, names, lowqual, tagstart, tagend)
188 var npw *gonpy.NpyWriter
189 var output io.WriteCloser
190 fnm := *outputFilename
192 output = nopCloser{stdout}
195 if strings.HasSuffix(fnm, ".npy") {
196 fnm = fmt.Sprintf("%s.%d.npy", fnm[:len(fnm)-4], chunk)
198 fnm = fmt.Sprintf("%s.%d", fnm, chunk)
201 output, err = os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0777)
207 bufw := bufio.NewWriter(output)
208 npw, err = gonpy.NewWriter(nopCloser{bufw})
213 log.Info("recoding to onehot")
214 recoded, librefs, recodedcols := recodeOnehot(out, cols)
215 out, cols = recoded, recodedcols
216 if *librefsFilename != "" {
217 log.Infof("writing onehot column mapping")
218 err = cmd.writeLibRefs(*librefsFilename, tilelib, librefs)
224 log.WithFields(logrus.Fields{
228 }).Info("writing numpy")
229 npw.Shape = []int{rows, cols}
243 func (*exportNumpy) writeLibRefs(fnm string, tilelib *tileLibrary, librefs []tileLibRef) error {
244 f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0666)
249 for i, libref := range librefs {
250 _, err = fmt.Fprintf(f, "%d,%d,%d\n", i, libref.Tag, libref.Variant)
258 func cgnames(tilelib *tileLibrary) (cgnames []string) {
259 for name := range tilelib.compactGenomes {
260 cgnames = append(cgnames, name)
262 sort.Slice(cgnames, func(i, j int) bool {
263 return trimFilenameForLabel(cgnames[i]) < trimFilenameForLabel(cgnames[j])
268 func lowqual(tilelib *tileLibrary) (lowqual []map[tileVariantID]bool) {
269 lowqual = make([]map[tileVariantID]bool, len(tilelib.variant))
270 for tag, variants := range tilelib.variant {
272 for varidx, hash := range variants {
273 if len(tilelib.seq[hash]) == 0 {
275 lq = map[tileVariantID]bool{}
278 lq[tileVariantID(varidx+1)] = true
285 func cgs2array(tilelib *tileLibrary, names []string, lowqual []map[tileVariantID]bool, tagstart, tagend int) (data []int16, rows, cols int) {
286 rows = len(tilelib.compactGenomes)
287 cols = (tagend - tagstart) * 2
288 data = make([]int16, rows*cols)
289 for row, name := range names {
290 cg := tilelib.compactGenomes[name]
291 if tagstart*2 >= len(cg) {
298 for i, v := range cg {
299 if v > 0 && lowqual[tagstart+i/2][v] {
300 data[row*cols+i] = -1
302 data[row*cols+i] = int16(v)
309 func recodeOnehot(in []int16, incols int) (out []int16, librefs []tileLibRef, outcols int) {
310 rows := len(in) / incols
311 maxvalue := make([]int16, incols)
312 for row := 0; row < rows; row++ {
313 for col := 0; col < incols; col++ {
314 if v := in[row*incols+col]; maxvalue[col] < v {
319 outcol := make([]int, incols)
321 for incol, maxv := range maxvalue {
322 outcol[incol] = outcols
326 for v := 1; v <= int(maxv); v++ {
327 librefs = append(librefs, tileLibRef{Tag: tagID(incol), Variant: tileVariantID(v)})
331 log.Printf("recodeOnehot: dropped %d input cols with zero maxvalue", dropped)
333 out = make([]int16, rows*outcols)
334 for inidx, row := 0, 0; row < rows; row++ {
335 outrow := out[row*outcols:]
336 for col := 0; col < incols; col++ {
337 if v := in[inidx]; v > 0 {
338 outrow[outcol[col]+int(v)-1] = 1
346 type nopCloser struct {
350 func (nopCloser) Close() error { return nil }
352 func trimFilenameForLabel(s string) string {
353 if i := strings.LastIndex(s, "/"); i >= 0 {
356 s = strings.TrimSuffix(s, ".gz")
357 s = strings.TrimSuffix(s, ".fa")
358 s = strings.TrimSuffix(s, ".fasta")
359 s = strings.TrimSuffix(s, ".1")
360 s = strings.TrimSuffix(s, ".2")
361 s = strings.TrimSuffix(s, ".gz")
362 s = strings.TrimSuffix(s, ".vcf")