1 // Copyright (C) The Lightning Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
19 "git.arvados.org/arvados.git/sdk/go/arvados"
20 "github.com/kshedden/gonpy"
21 "github.com/sirupsen/logrus"
22 log "github.com/sirupsen/logrus"
25 type sliceNumpy struct {
29 func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
33 fmt.Fprintf(stderr, "%s\n", err)
36 flags := flag.NewFlagSet("", flag.ContinueOnError)
37 flags.SetOutput(stderr)
38 pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
39 runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
40 projectUUID := flags.String("project", "", "project `UUID` for output data")
41 priority := flags.Int("priority", 500, "container request priority")
42 inputDir := flags.String("input-dir", "./in", "input `directory`")
43 outputDir := flags.String("output-dir", "./out", "output `directory`")
44 regionsFilename := flags.String("regions", "", "only output columns/annotations that intersect regions in specified bed `file`")
45 expandRegions := flags.Int("expand-regions", 0, "expand specified regions by `N` base pairs on each side`")
46 cmd.filter.Flags(flags)
47 err = flags.Parse(args)
48 if err == flag.ErrHelp {
51 } else if err != nil {
57 log.Println(http.ListenAndServe(*pprof, nil))
62 runner := arvadosContainerRunner{
63 Name: "lightning slice-numpy",
64 Client: arvados.NewClientFromEnv(),
65 ProjectUUID: *projectUUID,
72 err = runner.TranslatePaths(inputDir, regionsFilename)
76 runner.Args = []string{"slice-numpy", "-local=true",
78 "-input-dir", *inputDir,
79 "-output-dir", "/mnt/output",
80 "-regions", *regionsFilename,
81 "-expand-regions", fmt.Sprintf("%d", *expandRegions),
83 runner.Args = append(runner.Args, cmd.filter.Args()...)
85 output, err = runner.Run()
89 fmt.Fprintln(stdout, output)
93 infiles, err := allGobFiles(*inputDir)
97 if len(infiles) == 0 {
98 err = fmt.Errorf("no input files found in %s", *inputDir)
101 sort.Strings(infiles)
104 refseqs := map[string]map[string][]tileLibRef{}
105 in0, err := open(infiles[0])
109 DecodeLibrary(in0, strings.HasSuffix(infiles[0], ".gz"), func(ent *LibraryEntry) error {
110 for _, cseq := range ent.CompactSequences {
111 refseqs[cseq.Name] = cseq.TileSequences
113 for _, cg := range ent.CompactGenomes {
114 cgnames = append(cgnames, cg.Name)
122 sort.Strings(cgnames)
125 labelsFilename := *outputDir + "/labels.csv"
126 log.Infof("writing labels to %s", labelsFilename)
128 f, err = os.Create(labelsFilename)
133 for i, name := range cgnames {
134 _, err = fmt.Fprintf(f, "%d,%q\n", i, trimFilenameForLabel(name))
136 err = fmt.Errorf("write %s: %w", labelsFilename, err)
142 err = fmt.Errorf("close %s: %w", labelsFilename, err)
147 log.Info("building list of reference tiles to load") // TODO: more efficient if we had saved all ref tiles in slice0
148 reftiledata := map[tileLibRef]*[]byte{}
149 for _, ref := range refseqs {
150 for _, cseq := range ref {
151 for _, libref := range cseq {
152 reftiledata[libref] = new([]byte)
156 log.Info("loading reference tiles from all slices")
157 throttle := throttle{Max: runtime.NumCPU()}
158 for _, infile := range infiles {
160 throttle.Go(func() error {
161 defer log.Infof("%s: done", infile)
162 f, err := open(infile)
167 return DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
168 for _, tv := range ent.TileVariants {
169 libref := tileLibRef{tv.Tag, tv.Variant}
170 if dst, ok := reftiledata[libref]; ok {
180 log.Info("TODO: determining which tiles intersect given regions")
182 log.Info("generating annotations and numpy matrix for each slice")
183 for infileIdx, infile := range infiles {
184 infileIdx, infile := infileIdx, infile
185 throttle.Go(func() error {
186 defer log.Infof("%s: done", infile)
187 seq := map[tileLibRef][]byte{}
188 cgs := make(map[string]CompactGenome, len(cgnames))
189 f, err := open(infile)
194 err = DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
195 for _, tv := range ent.TileVariants {
196 seq[tileLibRef{tv.Tag, tv.Variant}] = tv.Sequence
198 for _, cg := range ent.CompactGenomes {
207 log.Infof("TODO: %s: filtering", infile)
208 log.Infof("TODO: %s: tidying", infile)
209 log.Infof("TODO: %s: lowqual to -1", infile)
211 annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, infileIdx)
212 log.Infof("%s: writing annotations to %s", infile, annotationsFilename)
213 annow, err := os.Create(annotationsFilename)
217 // for libref, seq := range seq {
218 // // TODO: diff from ref.
225 log.Infof("%s: preparing numpy", infile)
226 tagstart := cgs[cgnames[0]].StartTag
227 tagend := cgs[cgnames[0]].EndTag
229 cols := 2 * int(tagend-tagstart)
230 out := make([]int16, rows*cols)
231 for row, name := range cgnames {
232 out := out[row*cols:]
233 for col, v := range cgs[name].Variants {
236 } else if _, ok := seq[tileLibRef{tagstart + tagID(col/2), v}]; ok {
244 fnm := fmt.Sprintf("%s/matrix.%04d.npy", *outputDir, infileIdx)
245 output, err := os.Create(fnm)
250 bufw := bufio.NewWriter(output)
251 npw, err := gonpy.NewWriter(nopCloser{bufw})
255 log.WithFields(logrus.Fields{
259 }).Info("writing numpy")
260 npw.Shape = []int{rows, cols}
273 if err = throttle.Wait(); err != nil {
279 func (*sliceNumpy) writeLibRefs(fnm string, tilelib *tileLibrary, librefs []tileLibRef) error {
280 f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0666)
285 for i, libref := range librefs {
286 _, err = fmt.Fprintf(f, "%d,%d,%d\n", i, libref.Tag, libref.Variant)