Skip to content

Commit aec2041

Browse files
committed
Django structured ingestion architecture, see #HEA-159
1 parent ea41702 commit aec2041

File tree

18 files changed

+1643
-1
lines changed

18 files changed

+1643
-1
lines changed

apps/baseline/importers.py

Lines changed: 352 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,352 @@
1+
import logging
2+
3+
from baseline.models import (
4+
Community,
5+
LivelihoodActivity,
6+
LivelihoodStrategy,
7+
LivelihoodZoneBaseline,
8+
WealthGroup,
9+
)
10+
from ingestion.decorators import register
11+
from ingestion.importers import Importer
12+
from ingestion.models import SpreadsheetLocation
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
@register
18+
class LivelihoodZoneBaselineImporter(Importer):
19+
class Meta:
20+
# This importer's ingest is called with a saved, complete LivelihoodZoneBaseline instance passed in,
21+
# with the Excel spreadsheet file uploaded.
22+
model = LivelihoodZoneBaseline
23+
dependent_model_fields = [
24+
"livelihood_strategies",
25+
"communities",
26+
]
27+
28+
MAX_ROW = 3000
29+
COLUMN = 0
30+
31+
32+
@register
33+
class LivelihoodStrategyImporter(Importer):
34+
class Meta:
35+
model = LivelihoodStrategy
36+
fields = [
37+
"product",
38+
"strategy_type",
39+
"season",
40+
"unit_of_measure",
41+
"currency",
42+
"additional_identifier",
43+
]
44+
parent_model_fields = [
45+
"livelihood_zone_baseline",
46+
]
47+
dependent_model_fields = [
48+
"livelihoodactivity",
49+
]
50+
51+
FIRST_ROW = 10
52+
MAX_COLUMN = 0 # all are in column A
53+
54+
def ingest_product(
55+
self,
56+
field_def,
57+
successful_mappings,
58+
failed_mappings,
59+
parent_instances,
60+
bss_value_extractors,
61+
):
62+
# Scan down column A of the three Data sheets looking for a product alias.
63+
for sheet_name in ("Data", "Data2", "Data3"):
64+
row_count, column_count = parent_instances[LivelihoodZoneBaseline][0].load_sheet(sheet_name).shape
65+
for row in range(7, min(row_count, self.MAX_ROW)):
66+
new_spreadsheet_location, successful_mappings, failed_mappings = self.attempt_load_from_cell(
67+
parent_instances=parent_instances,
68+
field_def=field_def,
69+
find_field=False,
70+
sheet_name=sheet_name,
71+
column=self.MAX_COLUMN,
72+
row=row,
73+
bss_value_extractors=bss_value_extractors,
74+
successful_mappings=successful_mappings,
75+
failed_mappings=failed_mappings,
76+
)
77+
return successful_mappings, failed_mappings, parent_instances
78+
79+
def ingest_strategy_type(
80+
self,
81+
field_def,
82+
successful_mappings,
83+
failed_mappings,
84+
parent_instances,
85+
bss_value_extractors,
86+
):
87+
# The products must already have been mapped, so we know how many LSes we have and which rows they're on.
88+
# This finds the strategy_types (~12), then generates a strategy_type SpreadsheetLocation per LS (~90).
89+
90+
# 1. Create one strategy_type SpreadsheetLocation for each strategy type found in the BSS (approx 12):
91+
strategy_type_spreadsheet_locations = []
92+
for sheet_name in ("Data", "Data2", "Data3"):
93+
row_count, column_count = parent_instances[LivelihoodZoneBaseline][0].load_sheet(sheet_name).shape
94+
for row in range(self.FIRST_ROW, min(row_count, self.MAX_ROW + 1)):
95+
new_spreadsheet_location, successful_mappings, failed_mappings = self.attempt_load_from_cell(
96+
parent_instances=parent_instances,
97+
field_def=field_def,
98+
find_field=False,
99+
sheet_name=sheet_name,
100+
column=0, # all in column A
101+
row=row,
102+
bss_value_extractors=bss_value_extractors,
103+
successful_mappings=successful_mappings,
104+
failed_mappings=failed_mappings,
105+
)
106+
if new_spreadsheet_location:
107+
strategy_type_spreadsheet_locations.append(new_spreadsheet_location)
108+
109+
# 2. Back-fill one strategy_type SpreadsheetLocation per LivelihoodStrategy in the BSS (approx 90):
110+
# The first row of each LivelihoodStrategy has the product in col A, so we use product mappings to iterate LS.
111+
sl_per_livelihood_strategy = []
112+
for instance_number, product_sl in enumerate(successful_mappings["product"]):
113+
strategy_type_sl = self.get_strategy_type_for_instance(
114+
instance_number,
115+
field_def,
116+
successful_mappings,
117+
failed_mappings,
118+
parent_instances,
119+
bss_value_extractors,
120+
)
121+
sl_per_livelihood_strategy.append(strategy_type_sl)
122+
123+
# 3. Clean up working data:
124+
# Grab the PKs of the SLs not attached to any LS instance for deletion later
125+
sls_to_delete = [o.pk for o in strategy_type_spreadsheet_locations]
126+
127+
# Generate a new SpreadsheetLocation per LivelihoodStrategy instance
128+
for instance_number, sl in enumerate(sl_per_livelihood_strategy):
129+
sl.pk = None
130+
sl.id = None
131+
sl.instance_number = instance_number
132+
sl.save()
133+
134+
# Delete the strategy_type SpreadsheetLocations not attached to any LivelihoodStrategy instance
135+
SpreadsheetLocation.objects.filter(pk__in=sls_to_delete).delete()
136+
137+
return successful_mappings, failed_mappings, parent_instances
138+
139+
def get_strategy_type_for_instance(
140+
self,
141+
instance_number,
142+
field_def,
143+
successful_mappings,
144+
failed_mappings,
145+
parent_instances,
146+
bss_value_extractors,
147+
):
148+
# The strategy type for an instance is the one closest above it in the BSS:
149+
product = successful_mappings["product"][instance_number]
150+
strategy_types = successful_mappings["strategy_type"]
151+
st_index = 0
152+
while st_index < len(strategy_types) and (
153+
product.sheet_name != strategy_types[st_index].sheet_name or product.row < strategy_types[st_index].row
154+
):
155+
st_index += 1
156+
return strategy_types[st_index]
157+
158+
159+
@register
160+
class CommunityImporter(Importer):
161+
class Meta:
162+
model = Community
163+
fields = [
164+
"name",
165+
"full_name",
166+
"code",
167+
"interview_number",
168+
]
169+
dependent_model_fields = [
170+
"wealth_groups",
171+
]
172+
173+
def ingest_name(
174+
self,
175+
field_def,
176+
successful_mappings,
177+
failed_mappings,
178+
parent_instances,
179+
bss_value_extractors,
180+
):
181+
# The community/village names are on row 4, repeated for each wealth category (on row 2).
182+
# So scan across the first wealth category accumulating village names.
183+
sheet_name = "Data"
184+
sheet = parent_instances[LivelihoodZoneBaseline][0].load_sheet(sheet_name)
185+
row = 4
186+
column = 1
187+
first_wc = sheet.iloc[2, column]
188+
while first_wc == sheet.iloc[2, column]:
189+
new_spreadsheet_location, successful_mappings, failed_mappings = self.attempt_load_from_cell(
190+
parent_instances=parent_instances,
191+
field_def=field_def,
192+
find_field=False,
193+
sheet_name=sheet_name,
194+
column=column,
195+
row=row,
196+
bss_value_extractors=bss_value_extractors,
197+
successful_mappings=successful_mappings,
198+
failed_mappings=failed_mappings,
199+
)
200+
column += 1
201+
return successful_mappings, failed_mappings, parent_instances
202+
203+
def ingest_full_name(
204+
self,
205+
field_def,
206+
successful_mappings,
207+
failed_mappings,
208+
parent_instances,
209+
bss_value_extractors,
210+
):
211+
# Scan across Data sheet row 3 loading district names
212+
sheet_name = "Data"
213+
row = 3
214+
for name_loc in successful_mappings["name"]:
215+
new_spreadsheet_location, successful_mappings, failed_mappings = self.attempt_load_from_cell(
216+
parent_instances=parent_instances,
217+
field_def=field_def,
218+
find_field=False,
219+
sheet_name=sheet_name,
220+
column=name_loc.column,
221+
row=row,
222+
bss_value_extractors=bss_value_extractors,
223+
successful_mappings=successful_mappings,
224+
failed_mappings=failed_mappings,
225+
)
226+
# Prefix the village names
227+
for i, full_name_loc in enumerate(successful_mappings[field_def.name]):
228+
village_loc = successful_mappings["name"][i]
229+
full_name_loc.mapped_value = ", ".join((village_loc.mapped_value, full_name_loc.mapped_value))
230+
return successful_mappings, failed_mappings, parent_instances
231+
232+
def ingest_code(
233+
self,
234+
field_def,
235+
successful_mappings,
236+
failed_mappings,
237+
parent_instances,
238+
bss_value_extractors,
239+
):
240+
# Populate in the same way as the name field, then convert to lower case
241+
# Possibly more correct to use the BssValueExtractor regex.
242+
successful_mappings, failed_mappings, parent_instances = self.ingest_name(
243+
field_def,
244+
successful_mappings,
245+
failed_mappings,
246+
parent_instances,
247+
bss_value_extractors,
248+
)
249+
for loc in successful_mappings[field_def.name]:
250+
loc.mapped_value = loc.mapped_value.lower()
251+
return successful_mappings, failed_mappings, parent_instances
252+
253+
254+
@register
255+
class LivelihoodActivityImporter(Importer):
256+
class Meta:
257+
model = LivelihoodActivity
258+
fields = [
259+
"scenario",
260+
"wealth_group",
261+
"quantity_produced",
262+
"quantity_purchased",
263+
"quantity_sold",
264+
"quantity_other_uses",
265+
"quantity_consumed",
266+
"price",
267+
"income",
268+
"expenditure",
269+
"kcals_consumed",
270+
"percentage_kcals",
271+
"household_labor_provider",
272+
"strategy_type",
273+
]
274+
parent_model_fields = [
275+
"livelihood_strategy",
276+
"livelihood_zone_baseline",
277+
]
278+
dependent_model_fields = [
279+
"milkproduction",
280+
"butterproduction",
281+
"meatproduction",
282+
"livestocksale",
283+
"cropproduction",
284+
"foodpurchase",
285+
"paymentinkind",
286+
"reliefgiftother",
287+
"fishing",
288+
"wildfoodgathering",
289+
"othercashincome",
290+
"otherpurchase",
291+
]
292+
293+
MAX_ROW = 3000
294+
COLUMN = 0
295+
296+
def ingest_quantity_produced(
297+
self,
298+
field_def,
299+
successful_mappings,
300+
failed_mappings,
301+
parent_instances,
302+
bss_value_extractors,
303+
):
304+
# The product is specified on the first row of each LS.
305+
# Use them to iterate over each LS's rows, looking for quantity_produced field name aliases
306+
for strategy_i, strategy_loc in enumerate(parent_instances[LivelihoodStrategy]["product"]):
307+
sheet = parent_instances[LivelihoodZoneBaseline][0].load_sheet(strategy_loc.sheet_name)
308+
row_count, column_count = sheet.shape
309+
row = strategy_loc.row
310+
311+
# if there's a subsequent LS on the same sheet, scan col A until that row, otherwise scan to bottom
312+
if (
313+
strategy_i + 1 < len(parent_instances[LivelihoodStrategy]["product"])
314+
and parent_instances[LivelihoodStrategy]["product"][strategy_i + 1].sheet_name
315+
== strategy_loc.sheet_name
316+
):
317+
last_row = parent_instances[LivelihoodStrategy]["product"][strategy_i + 1].row - 1
318+
else:
319+
last_row = min(row_count, self.MAX_ROW)
320+
321+
# locate the field in col A
322+
while row < last_row:
323+
new_spreadsheet_location, successful_mappings, failed_mappings = self.attempt_load_from_cell(
324+
parent_instances=parent_instances,
325+
field_def=field_def,
326+
find_field=True,
327+
sheet_name=strategy_loc.sheet_name,
328+
column=0,
329+
row=row,
330+
bss_value_extractors=bss_value_extractors,
331+
successful_mappings=successful_mappings,
332+
failed_mappings=failed_mappings,
333+
)
334+
# When we locate a quantity_produced field alias in col A, stop looking and load the values
335+
if new_spreadsheet_location:
336+
break
337+
row += 1
338+
339+
# get the value on row `row` for each LA.
340+
# There is 1 WealthGroup per WealthCategory per Community, plus 1 WG per WealthCategory with no Community
341+
for wg_i, wg_loc in enumerate(parent_instances[WealthGroup]["wealth_category"]):
342+
new_spreadsheet_location, successful_mappings, failed_mappings = self.attempt_load_from_cell(
343+
parent_instances=parent_instances,
344+
field_def=field_def,
345+
find_field=False,
346+
sheet_name=strategy_loc.sheet_name,
347+
column=wg_loc.column,
348+
row=row,
349+
bss_value_extractors=bss_value_extractors,
350+
successful_mappings=successful_mappings,
351+
failed_mappings=failed_mappings,
352+
)

apps/baseline/models.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
"""
44

55
import numbers
6+
from collections import defaultdict
67

8+
import pandas as pd
79
from django.conf import settings
810
from django.contrib.gis.db import models
911
from django.core.exceptions import ValidationError
@@ -247,6 +249,14 @@ class Meta:
247249
)
248250
]
249251

252+
def load_sheet(self, sheet_name):
253+
# TODO: Apply overrides.
254+
if not hasattr(self, "cache"):
255+
self.cache = defaultdict(dict)
256+
if sheet_name not in self.cache:
257+
self.cache[sheet_name] = pd.read_excel(self.bss.path, sheet_name=sheet_name, header=None)
258+
return self.cache[sheet_name]
259+
250260

251261
class LivelihoodZoneBaselineCorrection(common_models.Model):
252262
"""

apps/ingestion/__init__.py

Whitespace-only changes.

apps/ingestion/admin.py

Whitespace-only changes.

apps/ingestion/apps.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from django.apps import AppConfig
2+
from django.utils.translation import gettext_lazy as _
3+
4+
5+
class IngestionConfig(AppConfig):
6+
default_auto_field = "django.db.models.BigAutoField"
7+
name = "ingestion"
8+
verbose_name = _("ingestion")

0 commit comments

Comments
 (0)