// Copyright (C) The Lightning Authors. All rights reserved. // // SPDX-License-Identifier: AGPL-3.0 package lightning import ( "bufio" "flag" "fmt" "io" "net/http" _ "net/http/pprof" "os" "runtime" "sort" "strings" "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/kshedden/gonpy" "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" ) type sliceNumpy struct { filter filter } func (cmd *sliceNumpy) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { var err error defer func() { if err != nil { fmt.Fprintf(stderr, "%s\n", err) } }() flags := flag.NewFlagSet("", flag.ContinueOnError) flags.SetOutput(stderr) pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`") runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)") projectUUID := flags.String("project", "", "project `UUID` for output data") priority := flags.Int("priority", 500, "container request priority") inputDir := flags.String("input-dir", "./in", "input `directory`") outputDir := flags.String("output-dir", "./out", "output `directory`") regionsFilename := flags.String("regions", "", "only output columns/annotations that intersect regions in specified bed `file`") expandRegions := flags.Int("expand-regions", 0, "expand specified regions by `N` base pairs on each side`") cmd.filter.Flags(flags) err = flags.Parse(args) if err == flag.ErrHelp { err = nil return 0 } else if err != nil { return 2 } if *pprof != "" { go func() { log.Println(http.ListenAndServe(*pprof, nil)) }() } if !*runlocal { runner := arvadosContainerRunner{ Name: "lightning slice-numpy", Client: arvados.NewClientFromEnv(), ProjectUUID: *projectUUID, RAM: 150000000000, VCPUs: 32, Priority: *priority, KeepCache: 1, APIAccess: true, } err = runner.TranslatePaths(inputDir, regionsFilename) if err != nil { return 1 } runner.Args = []string{"slice-numpy", "-local=true", "-pprof", ":6060", "-input-dir", *inputDir, "-output-dir", "/mnt/output", "-regions", *regionsFilename, "-expand-regions", fmt.Sprintf("%d", *expandRegions), } runner.Args = append(runner.Args, cmd.filter.Args()...) var output string output, err = runner.Run() if err != nil { return 1 } fmt.Fprintln(stdout, output) return 0 } infiles, err := allGobFiles(*inputDir) if err != nil { return 1 } if len(infiles) == 0 { err = fmt.Errorf("no input files found in %s", *inputDir) return 1 } sort.Strings(infiles) var cgnames []string refseqs := map[string]map[string][]tileLibRef{} in0, err := open(infiles[0]) if err != nil { return 1 } DecodeLibrary(in0, strings.HasSuffix(infiles[0], ".gz"), func(ent *LibraryEntry) error { for _, cseq := range ent.CompactSequences { refseqs[cseq.Name] = cseq.TileSequences } for _, cg := range ent.CompactGenomes { cgnames = append(cgnames, cg.Name) } return nil }) if err != nil { return 1 } in0.Close() sort.Strings(cgnames) { labelsFilename := *outputDir + "/labels.csv" log.Infof("writing labels to %s", labelsFilename) var f *os.File f, err = os.Create(labelsFilename) if err != nil { return 1 } defer f.Close() for i, name := range cgnames { _, err = fmt.Fprintf(f, "%d,%q\n", i, trimFilenameForLabel(name)) if err != nil { err = fmt.Errorf("write %s: %w", labelsFilename, err) return 1 } } err = f.Close() if err != nil { err = fmt.Errorf("close %s: %w", labelsFilename, err) return 1 } } log.Info("building list of reference tiles to load") // TODO: more efficient if we had saved all ref tiles in slice0 reftiledata := map[tileLibRef]*[]byte{} for _, ref := range refseqs { for _, cseq := range ref { for _, libref := range cseq { reftiledata[libref] = new([]byte) } } } log.Info("loading reference tiles from all slices") throttle := throttle{Max: runtime.NumCPU()} for _, infile := range infiles { infile := infile throttle.Go(func() error { defer log.Infof("%s: done", infile) f, err := open(infile) if err != nil { return err } defer f.Close() return DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error { for _, tv := range ent.TileVariants { libref := tileLibRef{tv.Tag, tv.Variant} if dst, ok := reftiledata[libref]; ok { *dst = tv.Sequence } } return nil }) }) } throttle.Wait() log.Info("TODO: determining which tiles intersect given regions") log.Info("generating annotations and numpy matrix for each slice") for infileIdx, infile := range infiles { infileIdx, infile := infileIdx, infile throttle.Go(func() error { defer log.Infof("%s: done", infile) seq := map[tileLibRef][]byte{} cgs := make(map[string]CompactGenome, len(cgnames)) f, err := open(infile) if err != nil { return err } defer f.Close() err = DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error { for _, tv := range ent.TileVariants { seq[tileLibRef{tv.Tag, tv.Variant}] = tv.Sequence } for _, cg := range ent.CompactGenomes { cgs[cg.Name] = cg } return nil }) if err != nil { return err } log.Infof("TODO: %s: filtering", infile) log.Infof("TODO: %s: tidying", infile) log.Infof("TODO: %s: lowqual to -1", infile) annotationsFilename := fmt.Sprintf("%s/matrix.%04d.annotations.csv", *outputDir, infileIdx) log.Infof("%s: writing annotations to %s", infile, annotationsFilename) annow, err := os.Create(annotationsFilename) if err != nil { return err } // for libref, seq := range seq { // // TODO: diff from ref. // } err = annow.Close() if err != nil { return err } log.Infof("%s: preparing numpy", infile) tagstart := cgs[cgnames[0]].StartTag tagend := cgs[cgnames[0]].EndTag rows := len(cgnames) cols := 2 * int(tagend-tagstart) out := make([]int16, rows*cols) for row, name := range cgnames { out := out[row*cols:] for col, v := range cgs[name].Variants { if v == 0 { // out[col] = 0 } else if _, ok := seq[tileLibRef{tagstart + tagID(col/2), v}]; ok { out[col] = int16(v) } else { out[col] = -1 } } } fnm := fmt.Sprintf("%s/matrix.%04d.npy", *outputDir, infileIdx) output, err := os.Create(fnm) if err != nil { return err } defer output.Close() bufw := bufio.NewWriter(output) npw, err := gonpy.NewWriter(nopCloser{bufw}) if err != nil { return err } log.WithFields(logrus.Fields{ "filename": fnm, "rows": rows, "cols": cols, }).Info("writing numpy") npw.Shape = []int{rows, cols} npw.WriteInt16(out) err = bufw.Flush() if err != nil { return err } err = output.Close() if err != nil { return err } return nil }) } if err = throttle.Wait(); err != nil { return 1 } return 0 } func (*sliceNumpy) writeLibRefs(fnm string, tilelib *tileLibrary, librefs []tileLibRef) error { f, err := os.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0666) if err != nil { return err } defer f.Close() for i, libref := range librefs { _, err = fmt.Fprintf(f, "%d,%d,%d\n", i, libref.Tag, libref.Variant) if err != nil { return err } } return f.Close() }