Skip to content

Commit b21d989

Browse files
committed
Initial implementation (extracted from dbumper)
1 parent 8a514fd commit b21d989

23 files changed

+779
-1
lines changed

.github/workflows/build.yml

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
name: build
2+
3+
on:
4+
push:
5+
branches: [ main ]
6+
pull_request:
7+
branches: [ main ]
8+
workflow_dispatch:
9+
inputs:
10+
tag:
11+
description: 'Tag to create'
12+
required: true
13+
default: 'v0.0.0'
14+
15+
jobs:
16+
17+
build:
18+
name: Build & Test
19+
runs-on: ubuntu-latest
20+
steps:
21+
- name: Set up Go 1.x
22+
uses: actions/setup-go@v2
23+
with:
24+
go-version: ^1.x
25+
26+
- name: Check out code
27+
uses: actions/checkout@v2
28+
29+
- name: Test
30+
run: go test -v -coverprofile=coverage.txt ./...
31+
32+
- name: Upload Coverage
33+
uses: codecov/codecov-action@v1
34+
continue-on-error: true
35+
with:
36+
token: ${{secrets.CODECOV_TOKEN}}
37+
file: ./coverage.txt
38+
fail_ci_if_error: false
39+
40+
release:
41+
if: github.event_name == 'workflow_dispatch'
42+
43+
name: Release
44+
runs-on: ubuntu-latest
45+
steps:
46+
- uses: actions/checkout@v2
47+
48+
- name: Checkout with tags
49+
run: git fetch --prune --unshallow --tags
50+
51+
- name: Create release
52+
run: |
53+
git log --format="%C(auto) %H %s" `git tag --sort=-committerdate | head -1`...HEAD > changelog.txt
54+
echo ${{ secrets.GITHUB_TOKEN }} | gh auth login --with-token
55+
gh release create ${{ github.event.inputs.tag }} -t ${{ github.event.inputs.tag }} -F changelog.txt

README.md

+46-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,47 @@
11
# dbump
2-
Go database schema migrator library (See dbumper tool)
2+
3+
# dbumper
4+
5+
[![build-img]][build-url]
6+
[![pkg-img]][pkg-url]
7+
[![reportcard-img]][reportcard-url]
8+
[![coverage-img]][coverage-url]
9+
10+
Go database schema migrator library (See [dbumper](https://github.com/cristalhq/dbumper) tool).
11+
12+
## Rationale
13+
14+
TODO
15+
16+
## Features
17+
18+
* Simple.
19+
20+
## Install
21+
22+
Go version 1.16+
23+
24+
```
25+
go get github.com/cristalhq/dbumper
26+
```
27+
28+
## Example
29+
30+
TODO
31+
32+
## Documentation
33+
34+
See [these docs][pkg-url].
35+
36+
## License
37+
38+
[MIT License](LICENSE).
39+
40+
[build-img]: https://github.com/cristalhq/dbumper/workflows/build/badge.svg
41+
[build-url]: https://github.com/cristalhq/dbumper/actions
42+
[pkg-img]: https://pkg.go.dev/badge/cristalhq/dbumper
43+
[pkg-url]: https://pkg.go.dev/github.com/cristalhq/dbumper
44+
[reportcard-img]: https://goreportcard.com/badge/cristalhq/dbumper
45+
[reportcard-url]: https://goreportcard.com/report/cristalhq/dbumper
46+
[coverage-img]: https://codecov.io/gh/cristalhq/dbumper/branch/main/graph/badge.svg
47+
[coverage-url]: https://codecov.io/gh/cristalhq/dbumper

db_clickhouse.go

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package dbump
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
)
7+
8+
// MigratorClickHouse to migrate ClickHouse.
9+
type MigratorClickHouse struct {
10+
db *sql.DB
11+
versionTable string
12+
}
13+
14+
// NewMigratorClickHouse instantiates new MigratorClickHouse.
15+
func NewMigratorClickHouse(db *sql.DB) *MigratorClickHouse {
16+
return &MigratorClickHouse{
17+
db: db,
18+
versionTable: "_schema_version",
19+
}
20+
}
21+
22+
// Lock is a method for Migrator interface.
23+
func (ch *MigratorClickHouse) Lock(ctx context.Context) error {
24+
// TODO: currently no-op
25+
return nil
26+
}
27+
28+
// Unlock is a method for Migrator interface.
29+
func (ch *MigratorClickHouse) Unlock(ctx context.Context) error {
30+
// TODO: currently no-op
31+
return nil
32+
}
33+
34+
// Version is a method for Migrator interface.
35+
func (ch *MigratorClickHouse) Version(ctx context.Context) (version int, err error) {
36+
row := ch.db.QueryRowContext(ctx, "SELECT version FROM "+ch.versionTable)
37+
err = row.Scan(&version)
38+
return version, err
39+
}
40+
41+
// SetVersion is a method for Migrator interface.
42+
func (ch *MigratorClickHouse) SetVersion(ctx context.Context, version int) error {
43+
_, err := ch.db.ExecContext(ctx, "UPDATE "+ch.versionTable+" SET version = $1", version)
44+
return err
45+
}
46+
47+
// Exec is a method for Migrator interface.
48+
func (ch *MigratorClickHouse) Exec(ctx context.Context, query string, args ...interface{}) error {
49+
_, err := ch.db.ExecContext(ctx, query)
50+
return err
51+
}

db_mysql.go

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package dbump
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
)
7+
8+
// MigratorMySQL to migrate MySQL.
9+
type MigratorMySQL struct {
10+
db *sql.DB
11+
versionTable string
12+
}
13+
14+
// NewMigratorMySQL instantiates new MigratorMySQL.
15+
func NewMigratorMySQL(db *sql.DB) *MigratorMySQL {
16+
return &MigratorMySQL{
17+
db: db,
18+
versionTable: "_schema_version",
19+
}
20+
}
21+
22+
// Lock is a method for Migrator interface.
23+
func (my *MigratorMySQL) Lock(ctx context.Context) error {
24+
_, err := my.db.ExecContext(ctx, `SELECT GET_LOCK(?, 10)`, lockNum)
25+
return err
26+
}
27+
28+
// Unlock is a method for Migrator interface.
29+
func (my *MigratorMySQL) Unlock(ctx context.Context) error {
30+
_, err := my.db.ExecContext(ctx, "SELECT RELEASE_LOCK(?)", lockNum)
31+
return err
32+
}
33+
34+
// Version is a method for Migrator interface.
35+
func (my *MigratorMySQL) Version(ctx context.Context) (version int, err error) {
36+
row := my.db.QueryRowContext(ctx, "SELECT version FROM "+my.versionTable)
37+
err = row.Scan(&version)
38+
return version, err
39+
}
40+
41+
// SetVersion is a method for Migrator interface.
42+
func (my *MigratorMySQL) SetVersion(ctx context.Context, version int) error {
43+
_, err := my.db.ExecContext(ctx, "UPDATE "+my.versionTable+" SET version = $1", version)
44+
return err
45+
}
46+
47+
// Exec is a method for Migrator interface.
48+
func (my *MigratorMySQL) Exec(ctx context.Context, query string, args ...interface{}) error {
49+
_, err := my.db.ExecContext(ctx, query)
50+
return err
51+
}

db_postgres.go

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package dbump
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
)
7+
8+
// MigratorPostgres to migrate Postgres.
9+
type MigratorPostgres struct {
10+
db *sql.DB
11+
versionTable string
12+
}
13+
14+
// NewMigratorPostgres instantiates new MigratorPostgres.
15+
func NewMigratorPostgres(db *sql.DB) *MigratorPostgres {
16+
return &MigratorPostgres{
17+
db: db,
18+
versionTable: "_schema_version",
19+
}
20+
}
21+
22+
// Lock is a method for Migrator interface.
23+
func (pg *MigratorPostgres) Lock(ctx context.Context) error {
24+
_, err := pg.db.ExecContext(ctx, "SELECT pg_advisory_lock($1)", lockNum)
25+
return err
26+
}
27+
28+
// Unlock is a method for Migrator interface.
29+
func (pg *MigratorPostgres) Unlock(ctx context.Context) error {
30+
_, err := pg.db.ExecContext(ctx, "SELECT pg_advisory_unlock($1)", lockNum)
31+
return err
32+
}
33+
34+
// Version is a method for Migrator interface.
35+
func (pg *MigratorPostgres) Version(ctx context.Context) (version int, err error) {
36+
row := pg.db.QueryRowContext(ctx, "SELECT version FROM "+pg.versionTable)
37+
err = row.Scan(&version)
38+
return version, err
39+
}
40+
41+
// SetVersion is a method for Migrator interface.
42+
func (pg *MigratorPostgres) SetVersion(ctx context.Context, version int) error {
43+
_, err := pg.db.ExecContext(ctx, "UPDATE "+pg.versionTable+" SET version = $1", version)
44+
return err
45+
}
46+
47+
// Exec is a method for Migrator interface.
48+
func (pg *MigratorPostgres) Exec(ctx context.Context, query string, args ...interface{}) error {
49+
_, err := pg.db.ExecContext(ctx, query)
50+
return err
51+
}

dbump.go

+127
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package dbump
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sort"
7+
)
8+
9+
// MigrationDelimiter separates apply and rollback queries inside a migration step/file.
10+
const MigrationDelimiter = `--- apply above / rollback below ---`
11+
12+
// to prevent multiple migrations running at the same time
13+
const lockNum int64 = 777_777_777
14+
15+
// Migrator represents DB over which we will run migration queries.
16+
type Migrator interface {
17+
Lock(ctx context.Context) error
18+
Unlock(ctx context.Context) error
19+
20+
Version(ctx context.Context) (version int, err error)
21+
SetVersion(ctx context.Context, version int) error
22+
23+
Exec(ctx context.Context, query string, args ...interface{}) error
24+
}
25+
26+
// Loader returns migrations to be applied on a DB.
27+
type Loader interface {
28+
Load() ([]*Migration, error)
29+
}
30+
31+
// Migration represents migration step that will be runned on DB.
32+
type Migration struct {
33+
ID int // ID of the migration, unique, positive, starts from 1.
34+
Name string // Name of the migration
35+
Apply string // Apply query
36+
Rollback string // Rollback query
37+
}
38+
39+
// Run the Migrator with migration queries provided by the Loader.
40+
func Run(ctx context.Context, m Migrator, l Loader) error {
41+
ms, err := loadMigrations(l.Load())
42+
if err != nil {
43+
return err
44+
}
45+
return runMigration(ctx, m, ms)
46+
}
47+
48+
func loadMigrations(ms []*Migration, err error) ([]*Migration, error) {
49+
if err != nil {
50+
return nil, err
51+
}
52+
53+
sort.SliceStable(ms, func(i, j int) bool {
54+
return ms[i].ID < ms[j].ID
55+
})
56+
57+
for i, m := range ms {
58+
switch want := i + 1; {
59+
case m.ID < want:
60+
return nil, fmt.Errorf("duplicate migration number: %d (%s)", m.ID, m.Name)
61+
case m.ID > want:
62+
return nil, fmt.Errorf("missing migration number: %d (have %d)", want, m.ID)
63+
default:
64+
// pass
65+
}
66+
}
67+
return ms, nil
68+
}
69+
70+
func runMigration(ctx context.Context, m Migrator, ms []*Migration) error {
71+
if err := m.Lock(ctx); err != nil {
72+
return err
73+
}
74+
75+
var err error
76+
defer func() {
77+
if errUnlock := m.Unlock(ctx); err == nil && errUnlock != nil {
78+
err = errUnlock
79+
}
80+
}()
81+
82+
err = runLockedMigration(ctx, m, ms)
83+
return err
84+
}
85+
86+
func runLockedMigration(ctx context.Context, m Migrator, ms []*Migration) error {
87+
currentVersion, err := m.Version(ctx)
88+
if err != nil {
89+
return err
90+
}
91+
92+
// TODO: configure
93+
targetVersion := len(ms)
94+
switch {
95+
case targetVersion < 0 || len(ms) < targetVersion:
96+
fallthrough
97+
case currentVersion < 0 || len(ms) < currentVersion:
98+
return fmt.Errorf("target version %d is outside of range 0..%d ", targetVersion, len(ms))
99+
}
100+
101+
direction := 1
102+
if currentVersion > targetVersion {
103+
direction = -1
104+
}
105+
106+
for currentVersion != targetVersion {
107+
current := ms[currentVersion]
108+
sequence := current.ID
109+
query := current.Apply
110+
111+
if direction == -1 {
112+
current = ms[currentVersion-1]
113+
sequence = current.ID - 1
114+
query = current.Rollback
115+
}
116+
117+
if err := m.Exec(ctx, query); err != nil {
118+
return err
119+
}
120+
121+
if err := m.SetVersion(ctx, sequence); err != nil {
122+
return err
123+
}
124+
currentVersion += direction
125+
}
126+
return nil
127+
}

0 commit comments

Comments
 (0)