X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/865e5c1e3730117870eb1e485d553383626b882f..HEAD:/lib/ctrlctx/db.go diff --git a/lib/ctrlctx/db.go b/lib/ctrlctx/db.go index 127be489df..d33fd8ab53 100644 --- a/lib/ctrlctx/db.go +++ b/lib/ctrlctx/db.go @@ -10,8 +10,11 @@ import ( "sync" "git.arvados.org/arvados.git/lib/controller/api" + "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/ctxlog" "github.com/jmoiron/sqlx" + + // sqlx needs lib/pq to talk to PostgreSQL _ "github.com/lib/pq" ) @@ -66,7 +69,7 @@ type finishFunc func(*error) // commit or rollback the transaction, if any. // // func example(ctx context.Context) (err error) { -// ctx, finishtx := New(ctx, dber) +// ctx, finishtx := New(ctx, getdb) // defer finishtx(&err) // // ... // tx, err := CurrentTx(ctx) @@ -106,6 +109,26 @@ func New(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) (co } } +// NewTx starts a new transaction. The caller is responsible for +// calling Commit or Rollback. This is suitable for database queries +// that are separate from the API transaction (see CurrentTx), e.g., +// ones that will be committed even if the API call fails, or held +// open after the API call finishes. +func NewTx(ctx context.Context) (*sqlx.Tx, error) { + txn, ok := ctx.Value(contextKeyTransaction).(*transaction) + if !ok { + return nil, ErrNoTransaction + } + db, err := txn.getdb(ctx) + if err != nil { + return nil, err + } + return db.Beginx() +} + +// CurrentTx returns a transaction that will be committed after the +// current API call completes, or rolled back if the current API call +// returns an error. func CurrentTx(ctx context.Context) (*sqlx.Tx, error) { txn, ok := ctx.Value(contextKeyTransaction).(*transaction) if !ok { @@ -120,3 +143,45 @@ func CurrentTx(ctx context.Context) (*sqlx.Tx, error) { }) return txn.tx, txn.err } + +var errDBConnection = errors.New("database connection error") + +type DBConnector struct { + PostgreSQL arvados.PostgreSQL + pgdb *sqlx.DB + mtx sync.Mutex +} + +func (dbc *DBConnector) GetDB(ctx context.Context) (*sqlx.DB, error) { + dbc.mtx.Lock() + defer dbc.mtx.Unlock() + if dbc.pgdb != nil { + return dbc.pgdb, nil + } + db, err := sqlx.Open("postgres", dbc.PostgreSQL.Connection.String()) + if err != nil { + ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect failed") + return nil, errDBConnection + } + if p := dbc.PostgreSQL.ConnectionPool; p > 0 { + db.SetMaxOpenConns(p) + } + if err := db.Ping(); err != nil { + ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect succeeded but ping failed") + db.Close() + return nil, errDBConnection + } + dbc.pgdb = db + return db, nil +} + +func (dbc *DBConnector) Close() error { + dbc.mtx.Lock() + defer dbc.mtx.Unlock() + var err error + if dbc.pgdb != nil { + err = dbc.pgdb.Close() + dbc.pgdb = nil + } + return err +}