Skip to content

Commit 53b8ebe

Browse files
author
Pablo Seibelt
committed
Refactoring, support RJDBC, and add rs_create_table function
1 parent 33054be commit 53b8ebe

12 files changed

+332
-61
lines changed

DESCRIPTION

+6-4
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
Package: redshiftTools
22
Type: Package
33
Title: Redshift Tools
4-
Version: 0.2.900
4+
Version: 0.3.900
55
Authors@R: person("Pablo", "Seibelt", email = "[email protected]",
66
role = c("aut", "cre"))
77
Mantainers@R: person("Pablo", "Seibelt", email = "[email protected]",
88
role = c("aut", "cre"))
99
Depends:
10-
R (>= 3.1.2)
10+
R (>= 3.3.0)
1111
Imports:
1212
DBI,
13-
RPostgres,
1413
aws.s3
15-
Description: Tools to upload data to an Amazon Redshift Database with good performance.
14+
Suggests:
15+
RJDBC,
16+
RPostgres
17+
Description: Tools for uploading data into Amazon Redshift from R.
1618
License: MIT + file LICENSE
1719
LazyData: TRUE
1820
RoxygenNote: 6.0.1

NAMESPACE

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Generated by roxygen2: do not edit by hand
22

33
export(rs_create_statement)
4+
export(rs_create_table)
45
export(rs_replace_table)
56
export(rs_upsert_table)
67
importFrom("aws.s3","bucket_exists")

R/create.R

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#' Create a table from scratch, guessing the table schema
2+
#'
3+
#'
4+
#' @param data a data frame
5+
#' @param dbcon an RPostgres connection to the redshift server
6+
#' @param table_name the name of the table to create
7+
#' @param split_files optional parameter to specify amount of files to split into. If not specified will look at amount of slices in Redshift to determine an optimal amount.
8+
#' @param bucket the name of the temporary bucket to load the data. Will look for AWS_BUCKET_NAME on environment if not specified.
9+
#' @param region the region of the bucket. Will look for AWS_DEFAULT_REGION on environment if not specified.
10+
#' @param access_key the access key with permissions for the bucket. Will look for AWS_ACCESS_KEY_ID on environment if not specified.
11+
#' @param secret_key the secret key with permissions fot the bucket. Will look for AWS_SECRET_ACCESS_KEY on environment if not specified.
12+
#' @param iam_role_arn an iam role arn with permissions fot the bucket. Will look for AWS_IAM_ROLE_ARN on environment if not specified. This is ignoring access_key and secret_key if set.
13+
#' @param wlm_slots amount of WLM slots to use for this bulk load http://docs.aws.amazon.com/redshift/latest/dg/tutorial-configuring-workload-management.html
14+
#' @param sortkeys Column or columns to sort the table by
15+
#' @param sortkey_style Sortkey style, can be compound or interleaved http://docs.aws.amazon.com/redshift/latest/dg/t_Sorting_data-compare-sort-styles.html
16+
#' @param distkey Distkey column, can only be one, if chosen the table is distributed among clusters according to a hash of this column's value.
17+
#' @param distkey_style Distkey style, can be even or all, for the key distribution use the distkey parameter. http://docs.aws.amazon.com/redshift/latest/dg/t_Distributing_data.html
18+
#' @param compression Add encoding for columns whose compression algorithm is easy to guess, for the rest you should upload it to Redshift and run ANALYZE COMPRESSION
19+
#'
20+
#' @examples
21+
#' library(DBI)
22+
#'
23+
#' a=data.frame(a=seq(1,10000), b=seq(10000,1))
24+
#'
25+
#'\dontrun{
26+
#' con <- dbConnect(RPostgres::Postgres(), dbname="dbname",
27+
#' host='my-redshift-url.amazon.com', port='5439',
28+
#' user='myuser', password='mypassword',sslmode='require')
29+
#'
30+
#' rs_create_table(data=a, dbcon=con, table_name='testTable',
31+
#' bucket="my-bucket", split_files=4)
32+
#'
33+
#' }
34+
#' @export
35+
rs_create_table = function(
36+
data,
37+
dbcon,
38+
table_name=deparse(substitute(data)),
39+
split_files,
40+
bucket=Sys.getenv('AWS_BUCKET_NAME'),
41+
region=Sys.getenv('AWS_DEFAULT_REGION'),
42+
access_key=Sys.getenv('AWS_ACCESS_KEY_ID'),
43+
secret_key=Sys.getenv('AWS_SECRET_ACCESS_KEY'),
44+
iam_role_arn=Sys.getenv('AWS_IAM_ROLE_ARN'),
45+
wlm_slots=1,
46+
sortkeys,
47+
sortkey_style='compound',
48+
distkey,
49+
distkey_style='even',
50+
compression=T
51+
)
52+
{
53+
54+
tableSchema = rs_create_statement(data, table_name = table_name, sortkeys=sortkeys,
55+
sortkey_style = sortkey_style, distkey=distkey, distkey_style = distkey_style,
56+
compression = compression)
57+
58+
queryStmt(dbcon, tableSchema)
59+
60+
return(rs_replace_table(data, dbcon, table_name, split_files, bucket, region, access_key, secret_key, iam_role_arn, wlm_slots))
61+
62+
}

R/internal.R

+25-1
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,29 @@ queryDo = function(dbcon, query){
4545

4646
#' @importFrom DBI dbExecute
4747
queryStmt = function(dbcon, query){
48-
dbExecute(dbcon, query)
48+
if(inherits(dbcon, 'JDBCConnection')){
49+
RJDBC::dbSendUpdate(dbcon, query)
50+
}else{
51+
dbExecute(dbcon, query)
52+
}
53+
}
54+
55+
s3ToRedshift = function(dbcon, table_name, bucket, prefix, region, access_key, secret_key, iam_role_arn){
56+
stageTable=paste0(sample(letters,16),collapse = "")
57+
# Create temporary table for staging data
58+
queryStmt(dbcon, sprintf("create temp table %s (like %s)", stageTable, table_name))
59+
60+
print("Copying data from S3 into Redshift")
61+
copyStr = "copy %s from 's3://%s/%s.' region '%s' csv gzip ignoreheader 1 emptyasnull COMPUPDATE FALSE %s"
62+
63+
# Use IAM Role if available
64+
if (nchar(iam_role_arn) > 0) {
65+
credsStr = sprintf("iam_role '%s'", iam_role_arn)
66+
} else {
67+
credsStr = sprintf("credentials 'aws_access_key_id=%s;aws_secret_access_key=%s'", access_key, secret_key)
68+
}
69+
statement = sprintf(copyStr, stageTable, bucket, prefix, region, credsStr)
70+
queryStmt(dbcon,statement)
71+
72+
return(stageTable)
4973
}

R/redshift-replace.R R/replace.R

+9-17
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
#'
66
#' @param data a data frame
77
#' @param dbcon an RPostgres connection to the redshift server
8-
#' @param tableName the name of the table to replace
8+
#' @param table_name the name of the table to replace
99
#' @param split_files optional parameter to specify amount of files to split into. If not specified will look at amount of slices in Redshift to determine an optimal amount.
1010
#' @param bucket the name of the temporary bucket to load the data. Will look for AWS_BUCKET_NAME on environment if not specified.
1111
#' @param region the region of the bucket. Will look for AWS_DEFAULT_REGION on environment if not specified.
@@ -23,15 +23,15 @@
2323
#' host='my-redshift-url.amazon.com', port='5439',
2424
#' user='myuser', password='mypassword',sslmode='require')
2525
#'
26-
#' rs_replace_table(data=a, dbcon=con, tableName='testTable',
26+
#' rs_replace_table(data=a, dbcon=con, table_name='testTable',
2727
#' bucket="my-bucket", split_files=4)
2828
#'
2929
#' }
3030
#' @export
3131
rs_replace_table = function(
3232
data,
3333
dbcon,
34-
tableName,
34+
table_name,
3535
split_files,
3636
bucket=Sys.getenv('AWS_BUCKET_NAME'),
3737
region=Sys.getenv('AWS_DEFAULT_REGION'),
@@ -62,26 +62,18 @@ rs_replace_table = function(
6262
}
6363

6464
result = tryCatch({
65-
stageTable=paste0(sample(letters,16),collapse = "")
65+
stageTable=s3ToRedshift(dbcon, table_name, bucket, prefix, region, access_key, secret_key, iam_role_arn)
6666

67-
queryStmt(dbcon, sprintf("create temp table %s (like %s)", stageTable, tableName))
68-
69-
print("Copying data from S3 into Redshift")
70-
copyStr = "copy %s from 's3://%s/%s.' region '%s' csv gzip ignoreheader 1 emptyasnull COMPUPDATE FALSE"
71-
if (nchar(iam_role_arn) > 0) {
72-
copyStr = paste(copyStr, sprintf("iam_role '%s'", iam_role_arn), sep=" ")
73-
} else {
74-
copyStr = paste(copyStr, sprintf("credentials 'aws_access_key_id=%s;aws_secret_access_key=%s'", access_key, secret_key), sep=" ")
67+
# Use a single transaction if using RJDBC
68+
if(inherits(dbcon, 'RJDBC')){
69+
queryStmt(dbcon, 'begin')
7570
}
76-
statement = sprintf(copyStr, stageTable, bucket, prefix, region)
77-
queryStmt(dbcon, statement)
78-
7971

8072
print("Deleting target table for replacement")
81-
queryStmt(dbcon, sprintf("delete from %s", tableName))
73+
queryStmt(dbcon, sprintf("delete from %s", table_name))
8274

8375
print("Insert new rows")
84-
queryStmt(dbcon, sprintf('insert into %s select * from %s', tableName, stageTable))
76+
queryStmt(dbcon, sprintf('insert into %s select * from %s', table_name, stageTable))
8577

8678
print("Drop staging table")
8779
queryStmt(dbcon, sprintf("drop table %s", stageTable))

R/redshift-upsert.R R/upsert.R

+12-21
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
#'
77
#' @param data a data frame
88
#' @param dbcon an RPostgres connection to the redshift server
9-
#' @param tableName the name of the table to replace
9+
#' @param table_name the name of the table to replace
1010
#' @param split_files optional parameter to specify amount of files to split into. If not specified will look at amount of slices in Redshift to determine an optimal amount.
1111
#' @param keys athis optional vector contains the variables by which to upsert. If not defined, the upsert becomes an append.
1212
#' @param bucket the name of the temporary bucket to load the data. Will look for AWS_BUCKET_NAME on environment if not specified.
@@ -28,15 +28,15 @@
2828
#' host='my-redshift-url.amazon.com', port='5439',
2929
#' user='myuser', password='mypassword',sslmode='require')
3030
#'
31-
#' rs_upsert_table(data=nx, dbcon=con, tableName='testTable',
31+
#' rs_upsert_table(data=nx, dbcon=con, table_name='testTable',
3232
#' bucket="my-bucket", split_files=4, keys=c('a'))
3333
#'
3434
#'}
3535
#' @export
3636
rs_upsert_table = function(
3737
data,
3838
dbcon,
39-
tableName,
39+
table_name,
4040
keys,
4141
split_files,
4242
bucket=Sys.getenv('AWS_BUCKET_NAME'),
@@ -68,36 +68,27 @@ rs_upsert_table = function(
6868
}
6969

7070
result = tryCatch({
71-
stageTable=paste0(sample(letters,16),collapse = "")
71+
stageTable=s3ToRedshift(dbcon, table_name, bucket, prefix, region, access_key, secret_key, iam_role_arn)
7272

73-
queryStmt(dbcon, sprintf("create temp table %s (like %s)", stageTable, tableName))
74-
75-
print("Copying data from S3 into Redshift")
76-
copyStr = "copy %s from 's3://%s/%s.' region '%s' csv gzip ignoreheader 1 emptyasnull COMPUPDATE FALSE"
77-
78-
# Use iam role if available
79-
if ((nchar(iam_role_arn) > 0)) {
80-
copyStr = paste(copyStr, sprintf("iam_role '%s'", iam_role_arn), sep=" ")
81-
} else {
82-
copyStr = paste(copyStr, sprintf("credentials 'aws_access_key_id=%s;aws_secret_access_key=%s'", access_key, secret_key), sep=" ")
73+
# Use a single transaction if using RJDBC
74+
if(inherits(dbcon, 'RJDBC')){
75+
queryStmt(dbcon, 'begin')
8376
}
84-
statement = sprintf(copyStr, stageTable, bucket, prefix, region)
85-
queryStmt(dbcon,statement)
86-
if(!missing(keys)){
87-
print("Deleting rows with same keys")
8877

78+
if(!missing(keys)){
8979
# where stage.key = table.key and...
90-
keysCond = paste(stageTable,".",keys, "=", tableName,".",keys, sep="")
80+
keysCond = paste(stageTable,".",keys, "=", table_name,".",keys, sep="")
9181
keysWhere = sub(" and $", "", paste0(keysCond, collapse="", sep=" and "))
9282

9383
queryStmt(dbcon, sprintf('delete from %s using %s where %s',
94-
tableName,
84+
table_name,
9585
stageTable,
9686
keysWhere
9787
))
9888
}
89+
9990
print("Insert new rows")
100-
queryStmt(dbcon, sprintf('insert into %s select * from %s', tableName, stageTable))
91+
queryStmt(dbcon, sprintf('insert into %s select * from %s', table_name, stageTable))
10192

10293
print("Drop staging table")
10394
queryStmt(dbcon, sprintf("drop table %s", stageTable))

README.Rmd

+73-4
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,57 @@ To install this package, you'll need to execute these commands:
2323
install.packages(c('devtools', 'httr', 'aws.s3'))
2424
devtools::install_github("RcppCore/Rcpp")
2525
devtools::install_github("r-dbi/DBI")
26-
devtools::install_github("r-dbi/RPostgres")
2726
devtools::install_github("sicarul/redshiftTools")
2827
```
2928

29+
30+
## Drivers
31+
32+
This library supports two official ways of connecting to Amazon Redshift (Others may work, but untested):
33+
34+
### RPostgres
35+
This Postgres library is great, and it works even with Amazon Redshift servers with SSL enabled. The only weak point with Redshift is that it doesn't work with transactions
36+
37+
To use it, please configure like this:
38+
39+
``` r
40+
devtools::install_github("r-dbi/RPostgres")
41+
library(RPostgres)
42+
43+
con <- dbConnect(RPostgres::Postgres(), dbname="dbname",
44+
host='my-redshift-url.amazon.com', port='5439',
45+
user='myuser', password='mypassword',sslmode='require')
46+
test=dbGetQuery('select 1')
47+
```
48+
49+
### RJDBC
50+
If you download the official redshift driver .jar, you can use it with this R library, it's not great in the sense that you can't use it with dplyr for example, since it doesn't implement all the standard DBI interfaces. Though it has the great advantage when uploading data, that it supports transactions, enabling your data to remain consistent while uploading new data.
51+
52+
To use it, please configure like this:
53+
54+
``` r
55+
install.packages('RJDBC')
56+
library(RJDBC)
57+
58+
# Save the driver into a directory
59+
dir.create('~/.redshiftTools')
60+
download.file('http://s3.amazonaws.com/redshift-downloads/drivers/RedshiftJDBC41-1.1.9.1009.jar','~/.redshiftTools/redshift-driver.jar')
61+
62+
# Use Redshift driver
63+
driver <- JDBC("com.amazon.redshift.jdbc41.Driver", "~/.redshiftTools/redshift-driver.jar", identifier.quote="`")
64+
65+
# Create connection
66+
dbname="dbname"
67+
host='my-redshift-url.amazon.com'
68+
port='5439'
69+
user='myuser'
70+
password='mypassword'
71+
ssl='true'
72+
url <- sprintf("jdbc:redshift://%s:%s/%s?tcpKeepAlive=true&ssl=%s&sslfactory=com.amazon.redshift.ssl.NonValidatingFactory", host, port, dbname, ssl)
73+
conn <- dbConnect(driver, url, user, password)
74+
75+
```
76+
3077
## Usage
3178

3279
### Creating tables
@@ -43,7 +90,7 @@ d=rep(as.POSIXct('2017-01-01 20:01:32'), n),
4390
e=rep(as.POSIXlt('2017-01-01 20:01:32'), n),
4491
f=rep(paste0(rep('a', 4000), collapse=''), n) )
4592

46-
cat(rs_create_statement(testdf, tableName='dm_great_table'))
93+
cat(rs_create_statement(testdf, table_name='dm_great_table'))
4794

4895
```
4996

@@ -82,7 +129,29 @@ For example, suppose we have a table to load with 2 integer columns, we could us
82129
host='my-redshift-url.amazon.com', port='5439',
83130
user='myuser', password='mypassword',sslmode='require')
84131

85-
b=rs_replace_table(a, dbcon=con, tableName='mytable', bucket="mybucket", split_files=4)
86-
c=rs_upsert_table(nx, dbcon=con, tableName = 'mytable', split_files=4, bucket="mybucket", keys=c('a'))
132+
b=rs_replace_table(a, dbcon=con, table_name='mytable', bucket="mybucket", split_files=4)
133+
c=rs_upsert_table(nx, dbcon=con, table_name = 'mytable', split_files=4, bucket="mybucket", keys=c('a'))
134+
135+
```
136+
137+
138+
### Creating tables with data
139+
140+
A conjunction of `rs_create_statement` and `rs_replace_table` can be found in `rs_create_table`. You can create a table from scratch from R and upload the contents of the data frame, without needing to write SQL code at all.
141+
142+
143+
``` r
144+
library("aws.s3")
145+
library(RPostgres)
146+
library(redshiftTools)
147+
148+
a=data.frame(a=seq(1,10000), b=seq(10000,1))
149+
150+
con <- dbConnect(RPostgres::Postgres(), dbname="dbname",
151+
host='my-redshift-url.amazon.com', port='5439',
152+
user='myuser', password='mypassword',sslmode='require')
153+
154+
b=rs_create_table(a, dbcon=con, table_name='mytable', bucket="mybucket", split_files=4)
155+
87156

88157
```

0 commit comments

Comments
 (0)