Skip to content

Commit

Permalink
probabilistic matching
Browse files Browse the repository at this point in the history
  • Loading branch information
rafapereirabr committed Feb 1, 2025
1 parent 13d95b5 commit dfc465f
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 56 deletions.
12 changes: 9 additions & 3 deletions tests/tests_rafa/fts_extension.R
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,17 @@ query_max_similarity <- glue::glue(
DBI::dbGetQuery(con, query_join)
DBI::dbGetQuery(con, query_max_similarity)

w1 <- 'teste 1'
w2 <- 'teste 2'
w1 <- 'PROFESSOR ALFREDO GONCALVES FILGUEIRA'
w2 <- 'RUA PROFESSOR ALFREDO GONCALVES FIGUEIRA'

q <- glue::glue("SELECT jaro_winkler_similarity('{w1}', '{w2}');")
# q <- glue::glue("SELECT jaro_winkler_similarity('{w1}', '{w2}');")
q <- glue::glue("SELECT jaro_winkler_similarity('{w1}', '{w2}') AS similarity;")
q <- glue::glue("SELECT jaro_winkler_similarity('{w1}', '{w2}') AS similarity;")
DBI::dbGetQuery(con, q)



#' damerau_levenshtein
#' jaccard
#' jaro_similarity
#' jaro_winkler_similarity
106 changes: 53 additions & 53 deletions tests/tests_rafa/match_cases_probabilistic.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ match_cases_probabilistic <- function(
){



# read correspondind parquet file
table_name <- paste(key_cols, collapse = "_")
table_name <- gsub('estado_municipio', 'municipio', table_name)
Expand Down Expand Up @@ -51,36 +50,41 @@ match_cases_probabilistic <- function(
# remove logradouro
key_cols <- key_cols[key_cols != 'logradouro_sem_numero']


# Create the JOIN condition by concatenating the key columns
join_condition <- paste(
glue::glue("{y}.{key_cols} = {x}.{key_cols}"),
collapse = ' AND '
)


# # whether to keep all columns in the result
# colunas_encontradas <- ""
# additional_cols <- ""
#
# if (isTRUE(resultado_completo)) {
#
# colunas_encontradas <- paste0(
# glue::glue("{key_cols}_encontrado"),
# collapse = ', ')
#
# colunas_encontradas <- gsub('logradouro_sem_numero_encontrado', 'logradouro_encontrado', colunas_encontradas)
# colunas_encontradas <- gsub('localidade_encontrado', 'localidade_encontrada', colunas_encontradas)
# colunas_encontradas <- paste0(", endereco_encontrado, ", colunas_encontradas)
#
# additional_cols <- paste0(
# glue::glue("filtered_cnefe.{key_cols} AS {key_cols}_encontrado"),
# collapse = ', ')
#
# additional_cols <- gsub('logradouro_sem_numero_encontrado', 'logradouro_encontrado', additional_cols)
# additional_cols <- gsub('localidade_encontrado', 'localidade_encontrada', additional_cols)
# additional_cols <- paste0(", filtered_cnefe.endereco_completo AS endereco_encontrado, ", additional_cols)
#
# }
# whether to keep all columns in the result
colunas_encontradas <- ""
additional_cols <- ""

if (isTRUE(resultado_completo)) {

colunas_encontradas <- paste0(
glue::glue("{key_cols}_encontrado"),
collapse = ', ')

colunas_encontradas <- gsub('logradouro_sem_numero_encontrado', 'logradouro_encontrado', colunas_encontradas)
colunas_encontradas <- gsub('localidade_encontrado', 'localidade_encontrada', colunas_encontradas)
colunas_encontradas <- paste0(", ", colunas_encontradas)

additional_cols <- paste0(
glue::glue("filtered_cnefe.{key_cols} AS {key_cols}_encontrado"),
collapse = ', ')

additional_cols <- gsub('logradouro_sem_numero_encontrado', 'logradouro_encontrado', additional_cols)
additional_cols <- gsub('localidade_encontrado', 'localidade_encontrada', additional_cols)
additional_cols <- paste0(", ", additional_cols)
}

# min cutoff for string match
min_cutoff <- ifelse(match_type == 'pn04', 0.9, 0.7)

# 1st step: match --------------------------------------------------------

# match query
query_match <- glue::glue(
Expand All @@ -90,6 +94,7 @@ match_cases_probabilistic <- function(
{x}.tempidgeocodebr, {y}.lat, {y}.lon,
{x}.logradouro_sem_numero AS logradouro_sem_numero,
{y}.logradouro_sem_numero AS logradouro_sem_numero_cnefe,
{y}.endereco_completo AS endereco_encontrado,
jaro_winkler_similarity({x}.logradouro_sem_numero, {y}.logradouro_sem_numero) AS similarity,
RANK() OVER (PARTITION BY {x}.tempidgeocodebr ORDER BY similarity DESC) AS rank
{additional_cols}
Expand All @@ -100,50 +105,45 @@ match_cases_probabilistic <- function(
)
SELECT *
FROM ranked_data
WHERE similarity > 0.8
WHERE similarity > {min_cutoff}
AND rank = 1;"
)

DBI::dbExecute(con, query_match)
# a <- DBI::dbReadTable(con, 'temp_db')



# 2nd step: update output table --------------------------------------------------------

if (isTRUE(resultado_completo)) {
additional_cols <- paste0(
glue::glue("temp_db.{key_cols}_encontrado"),
collapse = ', ')

additional_cols <- gsub('logradouro_sem_numero_encontrado', 'logradouro_encontrado', additional_cols)
additional_cols <- gsub('localidade_encontrado', 'localidade_encontrada', additional_cols)
additional_cols <- paste0(", ", additional_cols)
}

#
# if (isTRUE(resultado_completo)) {
#
# colunas_encontradas <- paste0(
# glue::glue("{key_cols}_encontrado"),
# collapse = ', ')
#
# colunas_encontradas <- gsub('logradouro_sem_numero_encontrado', 'logradouro_encontrado', colunas_encontradas)
# colunas_encontradas <- gsub('localidade_encontrado', 'localidade_encontrada', colunas_encontradas)
# colunas_encontradas <- paste0(", endereco_encontrado, ", colunas_encontradas)
#
# additional_cols <- paste0(
# glue::glue("temp_db.{key_cols} AS {key_cols}_encontrado"),
# collapse = ', ')
#
# additional_cols <- gsub('logradouro_sem_numero_encontrado', 'logradouro_encontrado', additional_cols)
# additional_cols <- gsub('localidade_encontrado', 'localidade_encontrada', additional_cols)
# additional_cols <- paste0(", temp_db.endereco_completo AS endereco_encontrado, ", additional_cols)
#
# }


# summarize query
query_update_db <- glue::glue(
"INSERT INTO output_db (tempidgeocodebr, lat, lon, tipo_resultado {colunas_encontradas})
SELECT temp_db.tempidgeocodebr, temp_db.lat, temp_db.lon,
'{match_type}' AS tipo_resultado {additional_cols}
"INSERT INTO output_db (tempidgeocodebr, lat, lon, tipo_resultado, endereco_encontrado {colunas_encontradas})
SELECT temp_db.tempidgeocodebr,
temp_db.lat,
temp_db.lon,
'{match_type}' AS tipo_resultado,
temp_db.endereco_encontrado {additional_cols}
FROM temp_db
WHERE temp_db.lon IS NOT NULL;"
)

temp_n <- DBI::dbExecute(con, query_update_db)
DBI::dbExecute(con, query_update_db)


duckdb::duckdb_unregister_arrow(con, "filtered_cnefe")

# UPDATE input_padrao_db: Remove observations found in previous step
update_input_db(
temp_n <- update_input_db(
con,
update_tb = x,
reference_tb = output_tb
Expand Down
174 changes: 174 additions & 0 deletions tests/tests_rafa/match_weighted_cases_probabilistic.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
match_type = case = "pi01"
x = 'input_padrao_db'
y = 'filtered_cnefe'
output_tb = "output_db"
key_cols <- c("estado", "municipio", "logradouro_sem_numero", "numero", "cep", "localidade")
match_type = case
resultado_completo = F # isso funciona. Falta funcionar quando TRUE


match_weighted_cases_probabilistic <- function(con,
x,
y,
output_tb,
key_cols,
match_type,
resultado_completo){

# read corresponding parquet file
table_name <- paste(key_cols, collapse = "_")
table_name <- gsub('estado_municipio', 'municipio', table_name)
table_name <- gsub('logradouro_sem_numero', 'logradouro', table_name)

# master table
if (match_type %like% 'pn02|pi02|pn03|pi03|pn04|pi04') {
table_name <- "municipio_logradouro_numero_cep_localidade"
}

# build path to local file
path_to_parquet <- paste0(listar_pasta_cache(), "/", table_name, ".parquet")


# determine geographical scope of the search
input_states <- DBI::dbGetQuery(con, "SELECT DISTINCT estado FROM input_padrao_db;")$estado
input_municipio <- DBI::dbGetQuery(con, "SELECT DISTINCT municipio FROM input_padrao_db;")$municipio

# Load CNEFE data and write to DuckDB
# filter cnefe to include only states and municipalities
# present in the input table, reducing the search scope
filtered_cnefe <- arrow::open_dataset( path_to_parquet ) |>
dplyr::filter(estado %in% input_states) |>
dplyr::filter(municipio %in% input_municipio) |>
dplyr::compute()

# register filtered_cnefe to db
duckdb::duckdb_register_arrow(con, "filtered_cnefe", filtered_cnefe)

# cols that cannot be null
cols_not_null <- paste(
glue::glue("{x}.{key_cols} IS NOT NULL"),
collapse = ' AND '
)

# remove numero and logradourot from key cols to allow for the matching
key_cols <- key_cols[key_cols != 'numero']
key_cols <- key_cols[key_cols != 'logradouro_sem_numero']

# Create the JOIN condition by concatenating the key columns
join_condition <- paste(
glue::glue("{y}.{key_cols} = {x}.{key_cols}"),
collapse = ' AND '
)


# whether to keep all columns in the result
colunas_encontradas <- ""
additional_cols <- ""

if (isTRUE(resultado_completo)) {

colunas_encontradas <- paste0(
glue::glue("{key_cols}_encontrado"),
collapse = ', ')

colunas_encontradas <- gsub('logradouro_sem_numero_encontrado', 'logradouro_encontrado', colunas_encontradas)
colunas_encontradas <- gsub('localidade_encontrado', 'localidade_encontrada', colunas_encontradas)
colunas_encontradas <- paste0(", ", colunas_encontradas)

additional_cols <- paste0(
glue::glue("filtered_cnefe.{key_cols} AS {key_cols}_encontrado"),
collapse = ', ')

additional_cols <- gsub('logradouro_sem_numero_encontrado', 'logradouro_encontrado', additional_cols)
additional_cols <- gsub('localidade_encontrado', 'localidade_encontrada', additional_cols)
additional_cols <- paste0(", ", additional_cols)

}

# min cutoff for string match
min_cutoff <- ifelse(match_type == 'pi04', 0.9, 0.7)

# 1st step: match --------------------------------------------------------

query_match <- glue::glue(
"CREATE OR REPLACE TEMPORARY VIEW temp_db AS
WITH ranked_data AS (
SELECT
{x}.tempidgeocodebr, {x}.numero, {y}.numero AS numero_cnefe,
{y}.lat, {y}.lon,
{x}.logradouro_sem_numero AS logradouro_sem_numero,
{y}.logradouro_sem_numero AS logradouro_sem_numero_cnefe,
REGEXP_REPLACE( {y}.endereco_completo, ', \\d+ -', CONCAT(', ', {x}.numero, ' (aprox) -')) AS endereco_encontrado,
jaro_winkler_similarity({x}.logradouro_sem_numero, {y}.logradouro_sem_numero) AS similarity,
RANK() OVER (PARTITION BY {x}.tempidgeocodebr ORDER BY similarity DESC) AS rank
{additional_cols}
FROM {x}
LEFT JOIN {y}
ON {join_condition}
WHERE {cols_not_null} AND {y}.numero IS NOT NULL
)
SELECT *
FROM ranked_data
WHERE similarity > {min_cutoff}
AND rank = 1;"
)

DBI::dbExecute(con, query_match)
# a <- DBI::dbReadTable(con, 'temp_db')



# 2nd step: aggregate --------------------------------------------------------

# summarize query
query_aggregate <- glue::glue(
"INSERT INTO output_db (tempidgeocodebr, lat, lon, tipo_resultado, endereco_encontrado)
SELECT tempidgeocodebr,
SUM((1/ABS(numero - numero_cnefe) * lat)) / SUM(1/ABS(numero - numero_cnefe)) AS lat,
SUM((1/ABS(numero - numero_cnefe) * lon)) / SUM(1/ABS(numero - numero_cnefe)) AS lon,
'{match_type}' AS tipo_resultado,
FIRST(endereco_encontrado) AS endereco_encontrado
FROM temp_db
GROUP BY tempidgeocodebr, endereco_encontrado;"
)



if (isTRUE(resultado_completo)) {

additional_cols <- paste0(
glue::glue("FIRST({key_cols}_encontrado)"),
collapse = ', ')

additional_cols <- gsub('logradouro_sem_numero_encontrado', 'logradouro_encontrado', additional_cols)
additional_cols <- gsub('localidade_encontrado', 'localidade_encontrada', additional_cols)
additional_cols <- paste0(", ", additional_cols)

query_aggregate <- glue::glue(
"INSERT INTO output_db (tempidgeocodebr, lat, lon, tipo_resultado, endereco_encontrado {colunas_encontradas})
SELECT tempidgeocodebr,
SUM((1/ABS(numero - numero_cnefe) * lat)) / SUM(1/ABS(numero - numero_cnefe)) AS lat,
SUM((1/ABS(numero - numero_cnefe) * lon)) / SUM(1/ABS(numero - numero_cnefe)) AS lon,
'{match_type}' AS tipo_resultado,
FIRST(endereco_encontrado) AS endereco_encontrado
{additional_cols}
FROM temp_db
GROUP BY tempidgeocodebr, endereco_encontrado;"
)
}

DBI::dbExecute(con, query_aggregate)
# b <- DBI::dbReadTable(con, 'output_db')


duckdb::duckdb_unregister_arrow(con, "filtered_cnefe")

# UPDATE input_padrao_db: Remove observations found in previous step
temp_n <- update_input_db(
con,
update_tb = x,
reference_tb = output_tb
)

return(temp_n)
}

0 comments on commit dfc465f

Please sign in to comment.