147 lines
3.2 KiB
Go
147 lines
3.2 KiB
Go
package dbschema
|
|
|
|
import (
|
|
// Standard
|
|
"database/sql"
|
|
"fmt"
|
|
)
|
|
|
|
func defaultCallback(topic, msg string) {// {{{
|
|
fmt.Printf("[%s] %s\n", topic, msg)
|
|
}// }}}
|
|
func NewUpgrader(host string, port int, dbName, user, pass string) (upgrader Upgrader, err error) {// {{{
|
|
upgrader.logCallback = defaultCallback
|
|
upgrader.databases = map[string]Database{}
|
|
return
|
|
}// }}}
|
|
|
|
func (upgrader *Upgrader) SetLogCallback(callback func(string, string)) {// {{{
|
|
upgrader.logCallback = callback
|
|
}// }}}
|
|
func (upgrader *Upgrader) SetSqlCallback(callback func(string, int) ([]byte, bool)) {// {{{
|
|
upgrader.sqlCallback = callback
|
|
}// }}}
|
|
|
|
func (dbase Database) verifySchemaTable() (err error) {// {{{
|
|
var rows *sql.Rows
|
|
if rows, err = dbase.db.Query(
|
|
`SELECT EXISTS (
|
|
SELECT FROM pg_catalog.pg_class c
|
|
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
|
|
WHERE n.nspname = '_db'
|
|
AND c.relname = 'schema'
|
|
)`,
|
|
); err != nil {
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
var exists bool
|
|
rows.Next()
|
|
if err = rows.Scan(&exists); err != nil {
|
|
return
|
|
}
|
|
|
|
if !exists {
|
|
dbase.upgrader.logCallback("create", fmt.Sprintf("%s, _db.schema", dbase.DbName))
|
|
dbase.db.Exec(`CREATE SCHEMA "_db"`)
|
|
|
|
if _, err = dbase.db.Exec(`
|
|
CREATE TABLE "_db"."schema" (
|
|
version int4 NOT NULL,
|
|
updated timestamp NOT NULL DEFAULT NOW(),
|
|
|
|
CONSTRAINT schema_pk PRIMARY KEY (version)
|
|
)`,
|
|
); err != nil {
|
|
return
|
|
}
|
|
}
|
|
return
|
|
}// }}}
|
|
func (dbase Database) verifySchemaEntry() (err error) {// {{{
|
|
var rows *sql.Rows
|
|
rows, err = dbase.db.Query(`SELECT version FROM _db.schema LIMIT 1`)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
if !rows.Next() {
|
|
dbase.upgrader.logCallback("initiate version", dbase.DbName)
|
|
_, err = dbase.db.Exec(`INSERT INTO _db.schema(version) VALUES(0)`)
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
return
|
|
}// }}}
|
|
func (dbase Database) version() (version int, err error) {// {{{
|
|
var rows *sql.Rows
|
|
rows, err = dbase.db.Query(
|
|
`SELECT version FROM _db.schema ORDER BY version DESC LIMIT 1`,
|
|
)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
if rows.Next() {
|
|
err = rows.Scan(&version)
|
|
} else {
|
|
err = fmt.Errorf(`Database "%s" is missing an entry in _db.schema`, dbase.DbName)
|
|
}
|
|
return
|
|
}// }}}
|
|
|
|
func (upgrader Upgrader) AddDatabase(host string, port int, dbName, user, pass string) (err error) {// {{{
|
|
var db Database
|
|
if db, err = newDatabase(host, port, dbName, user, pass); err != nil {
|
|
return
|
|
}
|
|
db.upgrader = &upgrader
|
|
|
|
upgrader.databases[dbName] = db
|
|
|
|
if err = db.verifySchemaTable(); err != nil {
|
|
return
|
|
}
|
|
|
|
err = db.verifySchemaEntry()
|
|
return
|
|
}// }}}
|
|
func (upgrader Upgrader) Run() (err error) {// {{{
|
|
var version int
|
|
|
|
for dbName, dbase := range upgrader.databases {
|
|
version, err = dbase.version()
|
|
if err != nil {
|
|
return
|
|
}
|
|
upgrader.logCallback("version", fmt.Sprintf("%s: %d", dbName, version))
|
|
|
|
for {
|
|
version++
|
|
sql, found := upgrader.sqlCallback(dbName, version)
|
|
if !found {
|
|
break
|
|
}
|
|
|
|
upgrader.logCallback("exec", fmt.Sprintf("%s: %d", dbName, version))
|
|
if _, err = dbase.db.Exec(string(sql)); err != nil {
|
|
return
|
|
}
|
|
_, err = dbase.db.Exec(`
|
|
INSERT INTO _db.schema(version)
|
|
VALUES($1)
|
|
`, version)
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}// }}}
|
|
|
|
// vim: foldmethod=marker
|