Compare commits
5 commits
Author | SHA1 | Date | |
---|---|---|---|
b83c7b1e17 | |||
08df526c02 | |||
95335125d3 | |||
2cb694f534 | |||
d290a67800 |
3 changed files with 98 additions and 45 deletions
|
@ -19,11 +19,11 @@ func newDatabase(host string, port int, dbName, user, pass string) (dbase Databa
|
|||
|
||||
func (dbase Database) sqlConnString() string {// {{{
|
||||
return fmt.Sprintf(
|
||||
"postgresql://%s:%s@%s:%d/%s?sslmode=disable",
|
||||
dbase.Username,
|
||||
dbase.Password,
|
||||
"host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
|
||||
dbase.Host,
|
||||
dbase.Port,
|
||||
dbase.Username,
|
||||
dbase.Password,
|
||||
dbase.DbName,
|
||||
)
|
||||
}// }}}
|
||||
|
|
21
schema.go
21
schema.go
|
@ -1,3 +1,22 @@
|
|||
/*
|
||||
Package dbschema is used to keep the SQL schema up to date.
|
||||
|
||||
func sqlProvider(dbName string, version int) (sql []byte, found bool) {
|
||||
// read an SQL file and return the contents
|
||||
return
|
||||
}
|
||||
|
||||
upgrader := dbschema.NewUpgrader()
|
||||
upgrader.SetSqlCallback(sqlProvider)
|
||||
|
||||
if err = upgrader.AddDatabase("127.0.0.1", 5432, "foo", "postgres", "password"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err = upgrader.Run(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
*/
|
||||
package dbschema
|
||||
|
||||
import (
|
||||
|
@ -8,7 +27,9 @@ import (
|
|||
"database/sql"
|
||||
)
|
||||
|
||||
// An upgrader verifies the schema for one or more databases and upgrades them if possible.
|
||||
type Upgrader struct {
|
||||
schema string
|
||||
databases map[string]Database
|
||||
logCallback func(string, string)
|
||||
sqlCallback func(string, int) ([]byte, bool)
|
||||
|
|
110
upgrader.go
110
upgrader.go
|
@ -1,6 +1,9 @@
|
|||
package dbschema
|
||||
|
||||
import (
|
||||
// External
|
||||
"github.com/lib/pq"
|
||||
|
||||
// Standard
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
@ -9,18 +12,66 @@ import (
|
|||
func defaultCallback(topic, msg string) {// {{{
|
||||
fmt.Printf("[%s] %s\n", topic, msg)
|
||||
}// }}}
|
||||
func NewUpgrader(host string, port int, dbName, user, pass string) (upgrader Upgrader, err error) {// {{{
|
||||
|
||||
// NewUpgrader creates an upgrader with an empty list of databases.
|
||||
func NewUpgrader(schema ...string) (upgrader Upgrader) {// {{{
|
||||
// Using a variadic function for backward compatibility.
|
||||
if len(schema) > 0 {
|
||||
upgrader.schema = schema[0]
|
||||
} else {
|
||||
upgrader.schema = "_db"
|
||||
}
|
||||
|
||||
upgrader.logCallback = defaultCallback
|
||||
upgrader.databases = map[string]Database{}
|
||||
return
|
||||
}// }}}
|
||||
|
||||
// SetLogCallback allows to set a callback for custom logging.
|
||||
func (upgrader *Upgrader) SetLogCallback(callback func(string, string)) {// {{{
|
||||
upgrader.logCallback = callback
|
||||
}// }}}
|
||||
// SetSqlCallback is required for providing the SQL schema updates.
|
||||
func (upgrader *Upgrader) SetSqlCallback(callback func(string, int) ([]byte, bool)) {// {{{
|
||||
upgrader.sqlCallback = callback
|
||||
}// }}}
|
||||
// Version returns the current dbschema version for the given database name.
|
||||
func (upgrader *Upgrader) Version(dbName string) (version int, err error) {// {{{
|
||||
dbase, found := upgrader.databases[dbName]
|
||||
if !found {
|
||||
err = fmt.Errorf("Database %s not previously added to the upgrader", dbName)
|
||||
return
|
||||
}
|
||||
|
||||
version, err = dbase.Version()
|
||||
return
|
||||
}// }}}
|
||||
|
||||
func (dbase Database) createSchemaTable() (err error) {// {{{
|
||||
dbase.upgrader.logCallback("create", fmt.Sprintf("%s, %s.schema", dbase.DbName, dbase.upgrader.schema))
|
||||
_, err = dbase.db.Exec(`CREATE SCHEMA "`+dbase.upgrader.schema+`"`)
|
||||
|
||||
// Error code 42P06 "duplicate_schema" is an OK error,
|
||||
// table can still be missing and created.
|
||||
pqErr, _ := err.(*pq.Error)
|
||||
if pqErr != nil && pqErr.Code != "42P06" {
|
||||
return
|
||||
}
|
||||
|
||||
_, err = dbase.db.Exec(`
|
||||
CREATE TABLE "`+dbase.upgrader.schema+`"."schema" (
|
||||
version int4 NOT NULL,
|
||||
updated timestamp NOT NULL DEFAULT NOW(),
|
||||
|
||||
CONSTRAINT schema_pk PRIMARY KEY (version)
|
||||
)`,
|
||||
)
|
||||
return
|
||||
}// }}}
|
||||
func (dbase Database) appendSchemaVersion(version int) (err error) {// {{{
|
||||
_, err = dbase.db.Exec(`INSERT INTO `+dbase.upgrader.schema+`.schema(version) VALUES($1)`, version)
|
||||
return
|
||||
}// }}}
|
||||
|
||||
func (dbase Database) verifySchemaTable() (err error) {// {{{
|
||||
var rows *sql.Rows
|
||||
|
@ -28,7 +79,7 @@ func (dbase Database) verifySchemaTable() (err error) {// {{{
|
|||
`SELECT EXISTS (
|
||||
SELECT FROM pg_catalog.pg_class c
|
||||
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
|
||||
WHERE n.nspname = '_db'
|
||||
WHERE n.nspname = '`+dbase.upgrader.schema+`'
|
||||
AND c.relname = 'schema'
|
||||
)`,
|
||||
); err != nil {
|
||||
|
@ -41,45 +92,29 @@ func (dbase Database) verifySchemaTable() (err error) {// {{{
|
|||
return
|
||||
}
|
||||
|
||||
if !exists {
|
||||
dbase.upgrader.logCallback("create", fmt.Sprintf("%s, _db.schema", dbase.DbName))
|
||||
dbase.db.Exec(`CREATE SCHEMA "_db"`)
|
||||
|
||||
if _, err = dbase.db.Exec(`
|
||||
CREATE TABLE "_db"."schema" (
|
||||
version int4 NOT NULL,
|
||||
updated timestamp NOT NULL DEFAULT NOW(),
|
||||
|
||||
CONSTRAINT schema_pk PRIMARY KEY (version)
|
||||
)`,
|
||||
); err != nil {
|
||||
if exists {
|
||||
return
|
||||
}
|
||||
}
|
||||
err = dbase.createSchemaTable()
|
||||
return
|
||||
}// }}}
|
||||
func (dbase Database) verifySchemaEntry() (err error) {// {{{
|
||||
var rows *sql.Rows
|
||||
rows, err = dbase.db.Query(`SELECT version FROM _db.schema LIMIT 1`)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
var version int
|
||||
var row *sql.Row
|
||||
row = dbase.db.QueryRow(`SELECT version FROM `+dbase.upgrader.schema+`.schema LIMIT 1`)
|
||||
|
||||
if !rows.Next() {
|
||||
err = row.Scan(&version)
|
||||
if err == sql.ErrNoRows {
|
||||
dbase.upgrader.logCallback("initiate version", dbase.DbName)
|
||||
_, err = dbase.db.Exec(`INSERT INTO _db.schema(version) VALUES(0)`)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = dbase.appendSchemaVersion(0)
|
||||
}
|
||||
|
||||
return
|
||||
}// }}}
|
||||
func (dbase Database) version() (version int, err error) {// {{{
|
||||
func (dbase Database) Version() (version int, err error) {// {{{
|
||||
var rows *sql.Rows
|
||||
rows, err = dbase.db.Query(
|
||||
`SELECT version FROM _db.schema ORDER BY version DESC LIMIT 1`,
|
||||
`SELECT version FROM `+dbase.upgrader.schema+`.schema ORDER BY version DESC LIMIT 1`,
|
||||
)
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -89,13 +124,13 @@ func (dbase Database) version() (version int, err error) {// {{{
|
|||
if rows.Next() {
|
||||
err = rows.Scan(&version)
|
||||
} else {
|
||||
err = fmt.Errorf(`Database "%s" is missing an entry in _db.schema`, dbase.DbName)
|
||||
err = fmt.Errorf(`Database "%s" is missing an entry in `+dbase.upgrader.schema+`.schema`, dbase.DbName)
|
||||
}
|
||||
return
|
||||
}// }}}
|
||||
|
||||
func (upgrader Upgrader) AddDatabase(host string, port int, dbName, user, pass string) (err error) {// {{{
|
||||
var db Database
|
||||
// AddDatabase sets a database up for the Run() function with verifying/creating the _db.schema table.
|
||||
func (upgrader Upgrader) AddDatabase(host string, port int, dbName, user, pass string) (db Database, err error) {// {{{
|
||||
if db, err = newDatabase(host, port, dbName, user, pass); err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -110,15 +145,16 @@ func (upgrader Upgrader) AddDatabase(host string, port int, dbName, user, pass s
|
|||
err = db.verifySchemaEntry()
|
||||
return
|
||||
}// }}}
|
||||
// Run executes the actual schema updates until there are no more available.
|
||||
func (upgrader Upgrader) Run() (err error) {// {{{
|
||||
var version int
|
||||
|
||||
for dbName, dbase := range upgrader.databases {
|
||||
version, err = dbase.version()
|
||||
version, err = dbase.Version()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
upgrader.logCallback("version", fmt.Sprintf("%s: %d", dbName, version))
|
||||
upgrader.logCallback("version", fmt.Sprintf("%s.%s: %d", dbName, upgrader.schema, version))
|
||||
|
||||
for {
|
||||
version++
|
||||
|
@ -127,15 +163,11 @@ func (upgrader Upgrader) Run() (err error) {// {{{
|
|||
break
|
||||
}
|
||||
|
||||
upgrader.logCallback("exec", fmt.Sprintf("%s: %d", dbName, version))
|
||||
upgrader.logCallback("exec", fmt.Sprintf("%s.%s: %d", dbName, upgrader.schema, version))
|
||||
if _, err = dbase.db.Exec(string(sql)); err != nil {
|
||||
return
|
||||
}
|
||||
_, err = dbase.db.Exec(`
|
||||
INSERT INTO _db.schema(version)
|
||||
VALUES($1)
|
||||
`, version)
|
||||
if err != nil {
|
||||
if err = dbase.appendSchemaVersion(version); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue