// Copyright (C) The Lightning Authors. All rights reserved. // // SPDX-License-Identifier: AGPL-3.0 package lightning import ( "bufio" "context" "errors" "flag" "fmt" "io" "io/ioutil" "net/http" _ "net/http/pprof" "os" "strings" "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/james-bowman/nlp" "github.com/kshedden/gonpy" log "github.com/sirupsen/logrus" "gonum.org/v1/gonum/mat" ) type pythonPCA struct{} func (cmd *pythonPCA) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { var err error defer func() { if err != nil { fmt.Fprintf(stderr, "%s\n", err) } }() flags := flag.NewFlagSet("", flag.ContinueOnError) flags.SetOutput(stderr) projectUUID := flags.String("project", "", "project `UUID` for output data") inputFilename := flags.String("i", "-", "input `file`") priority := flags.Int("priority", 500, "container request priority") err = flags.Parse(args) if err == flag.ErrHelp { err = nil return 0 } else if err != nil { return 2 } runner := arvadosContainerRunner{ Name: "lightning pca", Client: arvados.NewClientFromEnv(), ProjectUUID: *projectUUID, RAM: 440000000000, VCPUs: 1, Priority: *priority, } err = runner.TranslatePaths(inputFilename) if err != nil { return 1 } runner.Prog = "python3" runner.Args = []string{"-c", `import sys import scipy from sklearn.decomposition import PCA scipy.save(sys.argv[2], PCA(n_components=4).fit_transform(scipy.load(sys.argv[1])))`, *inputFilename, "/mnt/output/pca.npy"} var output string output, err = runner.Run() if err != nil { return 1 } fmt.Fprintln(stdout, output+"/pca.npy") return 0 } type goPCA struct { filter filter } func (cmd *goPCA) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { var err error defer func() { if err != nil { fmt.Fprintf(stderr, "%s\n", err) } }() flags := flag.NewFlagSet("", flag.ContinueOnError) flags.SetOutput(stderr) pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`") runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)") projectUUID := flags.String("project", "", "project `UUID` for output data") priority := flags.Int("priority", 500, "container request priority") inputFilename := flags.String("i", "-", "input `file`") outputFilename := flags.String("o", "-", "output `file`") components := flags.Int("components", 4, "number of components") onehot := flags.Bool("one-hot", false, "recode tile variants as one-hot") cmd.filter.Flags(flags) err = flags.Parse(args) if err == flag.ErrHelp { err = nil return 0 } else if err != nil { return 2 } if *pprof != "" { go func() { log.Println(http.ListenAndServe(*pprof, nil)) }() } if !*runlocal { if *outputFilename != "-" { err = errors.New("cannot specify output file in container mode: not implemented") return 1 } runner := arvadosContainerRunner{ Name: "lightning pca-go", Client: arvados.NewClientFromEnv(), ProjectUUID: *projectUUID, RAM: 300000000000, // maybe 10x input size? VCPUs: 16, Priority: *priority, } err = runner.TranslatePaths(inputFilename) if err != nil { return 1 } runner.Args = []string{"pca-go", "-local=true", fmt.Sprintf("-one-hot=%v", *onehot), "-i", *inputFilename, "-o", "/mnt/output/pca.npy"} runner.Args = append(runner.Args, cmd.filter.Args()...) var output string output, err = runner.Run() if err != nil { return 1 } fmt.Fprintln(stdout, output+"/pca.npy") return 0 } var input io.ReadCloser if *inputFilename == "-" { input = ioutil.NopCloser(stdin) } else { input, err = os.Open(*inputFilename) if err != nil { return 1 } defer input.Close() } log.Print("reading") tilelib := &tileLibrary{ retainNoCalls: true, compactGenomes: map[string][]tileVariantID{}, } err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz")) if err != nil { return 1 } err = input.Close() if err != nil { return 1 } log.Info("filtering") cmd.filter.Apply(tilelib) log.Info("tidying") tilelib.Tidy() log.Print("converting cgs to array") data, rows, cols := cgs2array(tilelib, cgnames(tilelib), lowqual(tilelib), nil, 0, len(tilelib.variant)) if *onehot { log.Printf("recode one-hot: %d rows, %d cols", rows, cols) data, _, cols = recodeOnehot(data, cols) } tilelib = nil log.Printf("creating matrix backed by array: %d rows, %d cols", rows, cols) mtx := array2matrix(rows, cols, data).T() log.Print("fitting") transformer := nlp.NewPCA(*components) transformer.Fit(mtx) log.Printf("transforming") mtx, err = transformer.Transform(mtx) if err != nil { return 1 } mtx = mtx.T() rows, cols = mtx.Dims() log.Printf("copying result to numpy output array: %d rows, %d cols", rows, cols) out := make([]float64, rows*cols) for i := 0; i < rows; i++ { for j := 0; j < cols; j++ { out[i*cols+j] = mtx.At(i, j) } } var output io.WriteCloser if *outputFilename == "-" { output = nopCloser{stdout} } else { output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777) if err != nil { return 1 } defer output.Close() } bufw := bufio.NewWriter(output) npw, err := gonpy.NewWriter(nopCloser{bufw}) if err != nil { return 1 } npw.Shape = []int{rows, cols} log.Printf("writing numpy: %d rows, %d cols", rows, cols) npw.WriteFloat64(out) err = bufw.Flush() if err != nil { return 1 } err = output.Close() if err != nil { return 1 } log.Print("done") return 0 } func array2matrix(rows, cols int, data []int16) mat.Matrix { floatdata := make([]float64, rows*cols) for i, v := range data { floatdata[i] = float64(v) } return mat.NewDense(rows, cols, floatdata) }