Skip to content

Add drivers into the sqlsource package #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 85 additions & 0 deletions client/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package client

import (
"os"

"github.com/Lilibuth12/sqlsource/domain"
"github.com/Lilibuth12/sqlsource/driver"
"github.com/Sirupsen/logrus"
"github.com/tj/go-sync/semaphore"
)

// InitSchema ...
func InitSchema(app *driver.Base, config *domain.Config, fileName string) {
logrus.Info("will output schema to ", fileName)
schemaFile, err := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
logrus.Error(err)
return
}
defer schemaFile.Close()

// Initialize DB connection.
if err := app.Driver.Init(config); err != nil {
logrus.Error(err)
return
}

description, err := app.Driver.Describe()
if err != nil {
logrus.Error(err)
return
}
if err := description.Save(schemaFile); err != nil {
logrus.Error(err)
return
}

schemaFile.Sync()
logrus.Infof("Saved to `%s`", schemaFile.Name())
}

// ParseSchema ...
func ParseSchema(fileName string) (*domain.Description, error) {
// We must not be in init mode at this point, begin uploading source data.
schemaFile, err := os.OpenFile(fileName, os.O_RDONLY, 0644)
if err != nil {
return nil, err
}
defer schemaFile.Close()

return domain.NewDescriptionFromReader(schemaFile)
}

// Sync ...
func Sync(app *driver.Base, config *domain.Config, description *domain.Description, concurrency int, setWrapper domain.ObjectPublisher) error {
// Initialize DB connection.
if err := app.Driver.Init(config); err != nil {
logrus.Error(err)
return err
}

// Launch goroutines to scan the documents in each collection.
sem := make(semaphore.Semaphore, concurrency)

for table := range description.Iter() {
sem.Acquire()
go func(table *domain.Table) {
defer sem.Release()
logrus.WithFields(logrus.Fields{"table": table.TableName, "schema": table.SchemaName}).Info("Scan started")
if err := app.ScanTable(table, setWrapper); err != nil {
logrus.Error(err)
}
logrus.WithFields(logrus.Fields{"table": table.TableName, "schema": table.SchemaName}).Info("Scan finished")
}(table)
}

sem.Wait()

// Log status
for table := range description.Iter() {
logrus.WithFields(logrus.Fields{"schema": table.SchemaName, "table": table.TableName, "count": table.State.ScannedRows}).Info("Sync Finished")
}

return nil
}
2 changes: 1 addition & 1 deletion driver/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"fmt"
"strings"

"github.com/Lilibuth12/sqlsource/domain"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to remove this reference, should be using the sqlsource/domain in this repo.

log "github.com/Sirupsen/logrus"
"github.com/jmoiron/sqlx"
"github.com/segment-sources/sqlsource/domain"
"github.com/segmentio/go-snakecase"
"github.com/segmentio/objects-go"
)
Expand Down
110 changes: 110 additions & 0 deletions driver/mysql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package driver

import (
"fmt"
"strings"

"github.com/Lilibuth12/sqlsource/domain"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again

"github.com/Sirupsen/logrus"
"github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
)

type MySQL struct {
Connection *sqlx.DB
}

func (m *MySQL) Init(c *domain.Config) error {
config := mysql.Config{
User: c.Username,
Passwd: c.Password,
DBName: c.Database,
Net: "tcp",
Addr: c.Hostname + ":" + c.Port,
Params: map[string]string{},
}

for _, option := range c.ExtraOptions {
splitEq := strings.Split(option, "=")
if len(splitEq) != 2 {
continue
}
config.Params[splitEq[0]] = splitEq[1]
}

db, err := sqlx.Connect("mysql", config.FormatDSN())
if err != nil {
return err
}

m.Connection = db

return nil
}

func (m *MySQL) Scan(t *domain.Table) (*sqlx.Rows, error) {
query := fmt.Sprintf("SELECT %s FROM `%s`.`%s`", mysqlColumnsToSQL(t), t.SchemaName, t.TableName)
logrus.Debugf("Executing query: %v", query)

// Note: We have to get a Statement so that the MySQL driver
// will use its binary protocol during the scan, and do proper
// type conversion of incoming results.
stmt, err := m.Connection.Preparex(query)
if err != nil {
return nil, err
}

return stmt.Queryx()
}

func (m *MySQL) Transform(row map[string]interface{}) map[string]interface{} {
// The MySQL driver returns text and date columns as []byte instead of string.
for k, v := range row {
switch val := v.(type) {
case []byte:
row[k] = string(val)
}
}

return row
}

func mysqlColumnsToSQL(t *domain.Table) string {
var c []string
for _, column := range t.Columns {
c = append(c, fmt.Sprintf("`%s`", column))
}

return strings.Join(c, ", ")
}

func (m *MySQL) Describe() (*domain.Description, error) {
describeQuery := `
SELECT table_schema, table_name, column_name, CASE column_key WHEN 'PRI' THEN true ELSE false END as is_primary_key
FROM information_schema.columns
WHERE table_schema = DATABASE()
`

res := domain.NewDescription()

rows, err := m.Connection.Queryx(describeQuery)
if err != nil {
return nil, err
}

defer rows.Close()

for rows.Next() {
row := &tableDescriptionRow{}
if err := rows.StructScan(row); err != nil {
return nil, err
}
res.AddColumn(&domain.Column{Name: row.ColumnName, Schema: row.SchemaName, Table: row.TableName, IsPrimaryKey: row.IsPrimary})
}

if err := rows.Err(); err != nil {
return nil, err
}

return res, nil
}
91 changes: 91 additions & 0 deletions driver/postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package driver

import (
"bytes"
"fmt"
"strings"

"github.com/Lilibuth12/sqlsource/domain"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

"github.com/Sirupsen/logrus"
_ "github.com/jackc/pgx/stdlib"
"github.com/jmoiron/sqlx"
)

type Postgres struct {
Connection *sqlx.DB
}

func (p *Postgres) Init(c *domain.Config) error {
var extraOptions bytes.Buffer
if len(c.ExtraOptions) > 0 {
extraOptions.WriteRune('?')
extraOptions.WriteString(strings.Join(c.ExtraOptions, "&"))
}

connectionString := fmt.Sprintf(
"postgres://%s:%s@%s:%s/%s%s",
c.Username, c.Password, c.Hostname, c.Port, c.Database, extraOptions.String(),
)

db, err := sqlx.Connect("pgx", connectionString)
if err != nil {
return err
}

p.Connection = db

return nil
}

func (p *Postgres) Scan(t *domain.Table) (*sqlx.Rows, error) {
query := fmt.Sprintf("SELECT %s FROM %q.%q", t.ColumnToSQL(), t.SchemaName, t.TableName)
logrus.Debugf("Executing query: %v", query)

return p.Connection.Queryx(query)
}

func (p *Postgres) Transform(row map[string]interface{}) map[string]interface{} {
return row
}

func (p *Postgres) Describe() (*domain.Description, error) {
describeQuery := `
with o_1 as (SELECT
_s.nspname AS table_schema,
_t.relname AS table_name,
c.conkey AS column_positions
FROM pg_catalog.pg_constraint c
LEFT JOIN pg_catalog.pg_class _t ON c.conrelid = _t.oid
LEFT JOIN pg_catalog.pg_class referenced_table ON c.confrelid = referenced_table.oid
LEFT JOIN pg_catalog.pg_namespace _s ON _t.relnamespace = _s.oid
LEFT JOIN pg_catalog.pg_namespace referenced_schema ON referenced_table.relnamespace = referenced_schema.oid
WHERE c.contype = 'p')
select c.table_catalog, c.table_schema, c.table_name, c.column_name, CASE WHEN c.ordinal_position = ANY(o_1.column_positions) THEN true ELSE false END as "is_primary_key"
FROM o_1 INNER JOIN information_schema.columns c
ON o_1.table_schema = c.table_schema
AND o_1.table_name = c.table_name;
`

res := domain.NewDescription()

rows, err := p.Connection.Queryx(describeQuery)
if err != nil {
return nil, err
}

defer rows.Close()

for rows.Next() {
row := &tableDescriptionRow{}
if err := rows.StructScan(row); err != nil {
return nil, err
}
res.AddColumn(&domain.Column{Name: row.ColumnName, Schema: row.SchemaName, Table: row.TableName, IsPrimaryKey: row.IsPrimary})
}

if err := rows.Err(); err != nil {
return nil, err
}

return res, nil
}
9 changes: 9 additions & 0 deletions driver/table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package driver

type tableDescriptionRow struct {
Catalog string `db:"table_catalog"`
SchemaName string `db:"table_schema"`
TableName string `db:"table_name"`
ColumnName string `db:"column_name"`
IsPrimary bool `db:"is_primary_key"`
}
Loading