Compare commits
No commits in common. "main" and "v1.7.0" have entirely different histories.
3 changed files with 51 additions and 67 deletions
|
|
@ -1,11 +1,8 @@
|
|||
package dbschema
|
||||
|
||||
import (
|
||||
// External
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
|
||||
// Standard
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
|
|
@ -16,10 +13,10 @@ func newDatabase(host string, port int, dbName, user, pass string) (dbase Databa
|
|||
dbase.Username = user
|
||||
dbase.Password = pass
|
||||
|
||||
dbase.db, err = pgxpool.New(context.Background(), dbase.sqlConnString())
|
||||
dbase.db, err = sql.Open("postgres", dbase.sqlConnString())
|
||||
return
|
||||
}// }}}
|
||||
func databaseFromInstance(db *pgxpool.Pool) (dbase Database, err error) {
|
||||
func databaseFromInstance(db *sql.DB) (dbase Database, err error) {
|
||||
dbase.db = db
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,7 +21,10 @@ package dbschema
|
|||
|
||||
import (
|
||||
// External
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
_ "github.com/lib/pq"
|
||||
|
||||
// Standard
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
// An upgrader verifies the schema for one or more databases and upgrades them if possible.
|
||||
|
|
@ -39,7 +42,7 @@ type Database struct {
|
|||
Username string
|
||||
Password string
|
||||
|
||||
db *pgxpool.Pool
|
||||
db *sql.DB
|
||||
upgrader *Upgrader
|
||||
}
|
||||
|
||||
|
|
|
|||
46
upgrader.go
46
upgrader.go
|
|
@ -2,12 +2,10 @@ package dbschema
|
|||
|
||||
import (
|
||||
// External
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgconn"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/lib/pq"
|
||||
|
||||
// Standard
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
|
|
@ -51,18 +49,17 @@ func (upgrader *Upgrader) Version(dbName string) (version int, err error) { // {
|
|||
|
||||
func (dbase Database) createSchemaTable() (err error) {// {{{
|
||||
dbase.upgrader.logCallback("create", fmt.Sprintf("%s, %s.schema", dbase.DbName, dbase.upgrader.schema))
|
||||
_, err = dbase.db.Exec(context.Background(), `CREATE SCHEMA "` + dbase.upgrader.schema + `"`)
|
||||
_, err = dbase.db.Exec(`CREATE SCHEMA "`+dbase.upgrader.schema+`"`)
|
||||
|
||||
// Error code 42P06 "duplicate_schema" is an OK error,
|
||||
// table can still be missing and created.
|
||||
pqErr, _ := err.(*pgconn.PgError)
|
||||
pqErr, _ := err.(*pq.Error)
|
||||
if pqErr != nil && pqErr.Code != "42P06" {
|
||||
return
|
||||
}
|
||||
|
||||
_, err = dbase.db.Exec(
|
||||
context.Background(),
|
||||
`CREATE TABLE "` + dbase.upgrader.schema + `"."schema" (
|
||||
_, err = dbase.db.Exec(`
|
||||
CREATE TABLE "`+dbase.upgrader.schema+`"."schema" (
|
||||
version int4 NOT NULL,
|
||||
updated timestamp NOT NULL DEFAULT NOW(),
|
||||
|
||||
|
|
@ -72,14 +69,13 @@ func (dbase Database) createSchemaTable() (err error) { // {{{
|
|||
return
|
||||
}// }}}
|
||||
func (dbase Database) appendSchemaVersion(version int) (err error) {// {{{
|
||||
_, err = dbase.db.Exec(context.Background(), `INSERT INTO `+dbase.upgrader.schema+`.schema(version) VALUES($1)`, version)
|
||||
_, err = dbase.db.Exec(`INSERT INTO `+dbase.upgrader.schema+`.schema(version) VALUES($1)`, version)
|
||||
return
|
||||
}// }}}
|
||||
|
||||
func (dbase Database) verifySchemaTable() (err error) {// {{{
|
||||
var rows pgx.Rows
|
||||
var rows *sql.Rows
|
||||
if rows, err = dbase.db.Query(
|
||||
context.Background(),
|
||||
`SELECT EXISTS (
|
||||
SELECT FROM pg_catalog.pg_class c
|
||||
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
|
||||
|
|
@ -104,11 +100,11 @@ func (dbase Database) verifySchemaTable() (err error) { // {{{
|
|||
}// }}}
|
||||
func (dbase Database) verifySchemaEntry() (err error) {// {{{
|
||||
var version int
|
||||
var row pgx.Row
|
||||
row = dbase.db.QueryRow(context.Background(), `SELECT version FROM `+dbase.upgrader.schema+`.schema LIMIT 1`)
|
||||
var row *sql.Row
|
||||
row = dbase.db.QueryRow(`SELECT version FROM `+dbase.upgrader.schema+`.schema LIMIT 1`)
|
||||
|
||||
err = row.Scan(&version)
|
||||
if err == pgx.ErrNoRows {
|
||||
if err == sql.ErrNoRows {
|
||||
dbase.upgrader.logCallback("initiate version", dbase.DbName)
|
||||
err = dbase.appendSchemaVersion(0)
|
||||
}
|
||||
|
|
@ -116,9 +112,8 @@ func (dbase Database) verifySchemaEntry() (err error) { // {{{
|
|||
return
|
||||
}// }}}
|
||||
func (dbase Database) Version() (version int, err error) {// {{{
|
||||
var rows pgx.Rows
|
||||
var rows *sql.Rows
|
||||
rows, err = dbase.db.Query(
|
||||
context.Background(),
|
||||
`SELECT version FROM `+dbase.upgrader.schema+`.schema ORDER BY version DESC LIMIT 1`,
|
||||
)
|
||||
if err != nil {
|
||||
|
|
@ -150,19 +145,8 @@ func (upgrader Upgrader) AddDatabase(host string, port int, dbName, user, pass s
|
|||
err = db.verifySchemaEntry()
|
||||
return
|
||||
}// }}}
|
||||
func (upgrader Upgrader) AddDatabaseInstance(sqlDB *pgxpool.Pool, dbName string) (db Database, err error) { // {{{
|
||||
db, err = databaseFromInstance(sqlDB)
|
||||
|
||||
db.upgrader = &upgrader
|
||||
|
||||
upgrader.databases[dbName] = db
|
||||
|
||||
if err = db.verifySchemaTable(); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = db.verifySchemaEntry()
|
||||
return
|
||||
func (upgrader Upgrader) AddDatabaseInstance(sqlDB *sql.DB) (db Database, err error) {// {{{
|
||||
return databaseFromInstance(sqlDB)
|
||||
}// }}}
|
||||
|
||||
// Run executes the actual schema updates until there are no more available.
|
||||
|
|
@ -184,7 +168,7 @@ func (upgrader Upgrader) Run() (err error) { // {{{
|
|||
}
|
||||
|
||||
upgrader.logCallback("exec", fmt.Sprintf("%s.%s: %d", dbName, upgrader.schema, version))
|
||||
if _, err = dbase.db.Exec(context.Background(), string(sql)); err != nil {
|
||||
if _, err = dbase.db.Exec(string(sql)); err != nil {
|
||||
return
|
||||
}
|
||||
if err = dbase.appendSchemaVersion(version); err != nil {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue