This repository was archived by the owner on Jul 15, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathprepare_campaign_finance_data.py
82 lines (73 loc) · 2.11 KB
/
prepare_campaign_finance_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from pathlib import Path
from urllib.request import urlretrieve
from zipfile import ZipFile
import ibis
# Download and unzip the 2018 individual contributions data
url = (
"https://cg-519a459a-0ea3-42c2-b7bc-fa1143481f74.s3-us-gov-west-1."
"amazonaws.com/bulk-downloads/2018/indiv18.zip"
)
root_dir = Path(__file__).resolve().parent.parent
data_dir = root_dir.joinpath("data")
data_dir.mkdir(exist_ok=True)
zip_path = data_dir.joinpath("indiv18.zip")
csv_path = data_dir.joinpath("itcont.txt")
parquet_path = data_dir.joinpath("itcont.parquet")
if not zip_path.exists():
print("Downloading indiv18.zip...")
urlretrieve(url, zip_path)
else:
print("indiv18.zip already downloaded")
if not csv_path.exists():
print("Extracting itcont.txt...")
with ZipFile(zip_path) as zip_file:
zip_file.extract("itcont.txt", path=data_dir)
else:
print("itcont.txt already extracted")
if not parquet_path.exists():
print("Generating itcont.parquet...")
# Read in the CSV
# The CSV doesn't have a header, we need to manually add titles
t = ibis.read_csv(
csv_path,
header=False,
names=[
"CMTE_ID",
"AMNDT_IND",
"RPT_TP",
"TRANSACTION_PGI",
"IMAGE_NUM",
"TRANSACTION_TP",
"ENTITY_TP",
"NAME",
"CITY",
"STATE",
"ZIP_CODE",
"EMPLOYER",
"OCCUPATION",
"TRANSACTION_DT",
"TRANSACTION_AMT",
"OTHER_ID",
"TRAN_ID",
"FILE_NUM",
"MEMO_CD",
"MEMO_TEXT",
"SUB_ID",
],
)
# For the analysis, we're only going to use a few of the columns. To save
# bandwidth, lets select out only the columns we'll be using.
columns = [
"CMTE_ID",
"TRANSACTION_PGI",
"ENTITY_TP",
"CITY",
"STATE",
"TRANSACTION_DT",
"TRANSACTION_AMT",
]
t = t[columns]
# Write out a parquet file
t.to_parquet(parquet_path, compression="zstd")
else:
print("itcont.parquet already exists")