|
| 1 | +#' Upsert redshift table |
| 2 | +#' |
| 3 | +#' Upload a table to S3 and then load it with redshift, replacing rows with the same |
| 4 | +#' keys, and inserting rows with new keys. |
| 5 | +#' The table on redshift has to have the same structure and column ordering to work correctly. |
| 6 | +#' |
| 7 | +#' @param data a data frame |
| 8 | +#' @param dbcon an RPostgres connection to the redshift server |
| 9 | +#' @param tableName the name of the table to replace |
| 10 | +#' @param split_files optional parameter to specify amount of files to split into. If not specified will look at amount of slices in Redshift to determine an optimal amount. |
| 11 | +#' @param keys athis optional vector contains the variables by which to upsert. If not defined, the upsert becomes an append. |
| 12 | +#' @param bucket the name of the temporary bucket to load the data. Will look for AWS_BUCKET_NAME on environment if not specified. |
| 13 | +#' @param region the region of the bucket. Will look for AWS_DEFAULT_REGION on environment if not specified. |
| 14 | +#' @param access_key the access key with permissions for the bucket. Will look for AWS_ACCESS_KEY_ID on environment if not specified. |
| 15 | +#' @param secret_key the secret key with permissions fot the bucket. Will look for AWS_SECRET_ACCESS_KEY on environment if not specified. |
| 16 | +#' @examples |
| 17 | +#' library(DBI) |
| 18 | +#' |
| 19 | +#' a=data.frame(a=seq(1,10000), b=seq(10000,1)) |
| 20 | +#' n=head(a,n=5000) |
| 21 | +#' n$b=n$a |
| 22 | +#' nx=rbind(n, data.frame(a=seq(99999:104000), b=seq(104000:99999))) |
| 23 | +#' |
| 24 | +#'\dontrun{ |
| 25 | +#' con <- dbConnect(RPostgres::Postgres(), dbname="dbname", |
| 26 | +#' host='my-redshift-url.amazon.com', port='5439', |
| 27 | +#' user='myuser', password='mypassword',sslmode='require') |
| 28 | +#' |
| 29 | +#' rs_upsert_table(data=nx, dbcon=con, tableName='testTable', |
| 30 | +#' bucket="my-bucket", split_files=4, keys=c('a')) |
| 31 | +#' |
| 32 | +#'} |
| 33 | +#' @export |
| 34 | +rs_upsert_table = function( |
| 35 | + data, |
| 36 | + dbcon, |
| 37 | + tableName, |
| 38 | + keys, |
| 39 | + split_files, |
| 40 | + bucket=Sys.getenv('AWS_BUCKET_NAME'), |
| 41 | + region=Sys.getenv('AWS_DEFAULT_REGION'), |
| 42 | + access_key=Sys.getenv('AWS_ACCESS_KEY_ID'), |
| 43 | + secret_key=Sys.getenv('AWS_SECRET_ACCESS_KEY') |
| 44 | + ) |
| 45 | + { |
| 46 | + if(missing(split_files)){ |
| 47 | + print("Getting number of slices from Redshift") |
| 48 | + slices = queryDo(dbcon,"select count(*) from stv_slices") |
| 49 | + split_files = unlist(slices[1]*4) |
| 50 | + print(sprintf("%s slices detected, will split into %s files", slices, split_files)) |
| 51 | + } |
| 52 | + split_files = min(split_files, nrow(data)) |
| 53 | + |
| 54 | + prefix = uploadToS3(data, bucket, split_files) |
| 55 | + |
| 56 | + |
| 57 | + |
| 58 | + |
| 59 | + result = tryCatch({ |
| 60 | + stageTable=paste0(sample(letters,16),collapse = "") |
| 61 | + |
| 62 | + queryDo(dbcon, sprintf("create temp table %s (like %s)", stageTable, tableName)) |
| 63 | + |
| 64 | + print("Copying data from S3 into Redshift") |
| 65 | + queryDo(dbcon, sprintf("copy %s from 's3://%s/%s.' region '%s' csv gzip ignoreheader 1 emptyasnull credentials 'aws_access_key_id=%s;aws_secret_access_key=%s';", |
| 66 | + stageTable, |
| 67 | + bucket, |
| 68 | + prefix, |
| 69 | + region, |
| 70 | + access_key, |
| 71 | + secret_key |
| 72 | + )) |
| 73 | + if(!missing(keys)){ |
| 74 | + print("Deleting rows with same keys") |
| 75 | + keysCond = paste(stageTable,".",keys, "=", tableName,".",keys, sep="") |
| 76 | + keysWhere = sub(" and $", "", paste0(keysCond, collapse="", sep=" and ")) |
| 77 | + queryDo(dbcon, sprintf('delete from %s using %s where %s', |
| 78 | + tableName, |
| 79 | + stageTable, |
| 80 | + keysWhere |
| 81 | + )) |
| 82 | + } |
| 83 | + print("Insert new rows") |
| 84 | + queryDo(dbcon, sprintf('insert into %s select * from %s', tableName, stageTable)) |
| 85 | + |
| 86 | + queryDo(sprintf("drop table %s", stageTable)) |
| 87 | + print("Commiting") |
| 88 | + queryDo(dbcon, "COMMIT;") |
| 89 | + }, warning = function(w) { |
| 90 | + print(w) |
| 91 | + }, error = function(e) { |
| 92 | + print(e$message) |
| 93 | + queryDo(dbcon, 'ROLLBACK;') |
| 94 | + }, finally = { |
| 95 | + print("Deleting temporary files from S3 bucket") |
| 96 | + deletePrefix(prefix, bucket, split_files) |
| 97 | + }) |
| 98 | +} |
0 commit comments