Skip to content

Commit 14fa8a3

Browse files
committed
Django structured ingestion architecture, see #HEA-159
1 parent ea41702 commit 14fa8a3

File tree

18 files changed

+1606
-1
lines changed

18 files changed

+1606
-1
lines changed

apps/baseline/importers.py

Lines changed: 329 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,329 @@
1+
import logging
2+
3+
from baseline.models import (
4+
Community,
5+
LivelihoodActivity,
6+
LivelihoodStrategy,
7+
LivelihoodZoneBaseline,
8+
WealthGroup,
9+
)
10+
from ingestion.decorators import register
11+
from ingestion.importers import Importer
12+
from ingestion.models import SpreadsheetLocation
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
@register
18+
class LivelihoodZoneBaselineImporter(Importer):
19+
# Management command load_from_bss calls this importer's ingest() for a pre-saved LivelihoodZoneBaseline instance.
20+
21+
class Meta:
22+
model = LivelihoodZoneBaseline
23+
dependent_model_fields = [
24+
"livelihood_strategies",
25+
"communities",
26+
]
27+
28+
29+
@register
30+
class LivelihoodStrategyImporter(Importer):
31+
class Meta:
32+
model = LivelihoodStrategy
33+
fields = [
34+
"product",
35+
"strategy_type",
36+
"season",
37+
"unit_of_measure",
38+
"currency",
39+
"additional_identifier",
40+
]
41+
parent_model_fields = [
42+
"livelihood_zone_baseline",
43+
]
44+
dependent_model_fields = [
45+
"livelihoodactivity",
46+
]
47+
48+
def ingest_product(
49+
self,
50+
field_def,
51+
successful_mappings,
52+
failed_mappings,
53+
parent_instances,
54+
bss_value_extractors,
55+
):
56+
# Scan down column A of the three Data sheets looking for a product alias.
57+
for sheet_name in ("Data", "Data2", "Data3"):
58+
row_count, column_count = parent_instances[LivelihoodZoneBaseline][0].load_sheet(sheet_name).shape
59+
for row in range(7, min(row_count, 3000)):
60+
new_spreadsheet_location, successful_mappings, failed_mappings = self.attempt_load_from_cell(
61+
parent_instances=parent_instances,
62+
field_def=field_def,
63+
find_field=False,
64+
sheet_name=sheet_name,
65+
column=0, # col A
66+
row=row,
67+
bss_value_extractors=bss_value_extractors,
68+
successful_mappings=successful_mappings,
69+
failed_mappings=failed_mappings,
70+
)
71+
return successful_mappings, failed_mappings, parent_instances
72+
73+
def ingest_strategy_type(
74+
self,
75+
field_def,
76+
successful_mappings,
77+
failed_mappings,
78+
parent_instances,
79+
bss_value_extractors,
80+
):
81+
# The products must already have been mapped, so we know how many LSes we have and which rows they're on.
82+
# This finds the strategy_types (~12), then generates a strategy_type SpreadsheetLocation per LS (~90).
83+
84+
# 1. Identify SpreadsheetLocation of each strategy_type found in the BSS (approx 12):
85+
strategy_type_spreadsheet_locations = []
86+
for sheet_name in ("Data", "Data2", "Data3"):
87+
row_count, column_count = parent_instances[LivelihoodZoneBaseline][0].load_sheet(sheet_name).shape
88+
for row in range(10, min(row_count, 3000 + 1)):
89+
new_spreadsheet_location, successful_mappings, failed_mappings = self.attempt_load_from_cell(
90+
parent_instances=parent_instances,
91+
field_def=field_def,
92+
find_field=False,
93+
sheet_name=sheet_name,
94+
column=0, # all in column A
95+
row=row,
96+
bss_value_extractors=bss_value_extractors,
97+
successful_mappings=successful_mappings,
98+
failed_mappings=failed_mappings,
99+
)
100+
if new_spreadsheet_location:
101+
strategy_type_spreadsheet_locations.append(new_spreadsheet_location)
102+
103+
# 2. Generate a strategy_type SpreadsheetLocation per LivelihoodStrategy in the BSS (approx 90):
104+
# The first row of each LivelihoodStrategy has the product in col A, so we use product mappings to iterate LS.
105+
sl_per_livelihood_strategy = []
106+
for instance_number, product_sl in enumerate(successful_mappings["product"]):
107+
strategy_type_sl = self.get_strategy_type_for_instance(instance_number, successful_mappings)
108+
sl_per_livelihood_strategy.append(strategy_type_sl)
109+
110+
# 3. Clean up working data:
111+
# Grab the PKs of the SLs not attached to any LS instance for deletion later
112+
sls_to_delete = [o.pk for o in strategy_type_spreadsheet_locations]
113+
114+
# Generate a new SpreadsheetLocation per LivelihoodStrategy instance
115+
for instance_number, sl in enumerate(sl_per_livelihood_strategy):
116+
sl.pk = None
117+
sl.id = None
118+
sl.instance_number = instance_number
119+
sl.save()
120+
121+
# Delete the strategy_type SpreadsheetLocations not attached to any LivelihoodStrategy instance
122+
SpreadsheetLocation.objects.filter(pk__in=sls_to_delete).delete()
123+
124+
return successful_mappings, failed_mappings, parent_instances
125+
126+
@staticmethod
127+
def get_strategy_type_for_instance(instance_number, successful_mappings):
128+
# The strategy type for a given LivelihoodStrategy instance is the one closest above it in the BSS:
129+
product = successful_mappings["product"][instance_number]
130+
strategy_types = successful_mappings["strategy_type"]
131+
st_index = 0
132+
while st_index < len(strategy_types) and (
133+
product.sheet_name != strategy_types[st_index].sheet_name or product.row < strategy_types[st_index].row
134+
):
135+
st_index += 1
136+
return strategy_types[st_index]
137+
138+
139+
@register
140+
class CommunityImporter(Importer):
141+
class Meta:
142+
model = Community
143+
fields = [
144+
"name",
145+
"full_name",
146+
"code",
147+
"interview_number",
148+
]
149+
dependent_model_fields = [
150+
"wealth_groups",
151+
]
152+
153+
def ingest_name(
154+
self,
155+
field_def,
156+
successful_mappings,
157+
failed_mappings,
158+
parent_instances,
159+
bss_value_extractors,
160+
):
161+
# The community/village names are on row 4, repeated for each wealth category (on row 2).
162+
# So scan across the first wealth category accumulating village names.
163+
sheet_name = "Data"
164+
sheet = parent_instances[LivelihoodZoneBaseline][0].load_sheet(sheet_name)
165+
row = 4
166+
column = 1
167+
first_wc = sheet.iloc[2, column]
168+
while first_wc == sheet.iloc[2, column]:
169+
new_spreadsheet_location, successful_mappings, failed_mappings = self.attempt_load_from_cell(
170+
parent_instances=parent_instances,
171+
field_def=field_def,
172+
find_field=False,
173+
sheet_name=sheet_name,
174+
column=column,
175+
row=row,
176+
bss_value_extractors=bss_value_extractors,
177+
successful_mappings=successful_mappings,
178+
failed_mappings=failed_mappings,
179+
)
180+
column += 1
181+
return successful_mappings, failed_mappings, parent_instances
182+
183+
def ingest_full_name(
184+
self,
185+
field_def,
186+
successful_mappings,
187+
failed_mappings,
188+
parent_instances,
189+
bss_value_extractors,
190+
):
191+
# 1. Scan across Data sheet row 3 loading district names
192+
sheet_name = "Data"
193+
row = 3
194+
for name_loc in successful_mappings["name"]:
195+
new_spreadsheet_location, successful_mappings, failed_mappings = self.attempt_load_from_cell(
196+
parent_instances=parent_instances,
197+
field_def=field_def,
198+
find_field=False,
199+
sheet_name=sheet_name,
200+
column=name_loc.column,
201+
row=row,
202+
bss_value_extractors=bss_value_extractors,
203+
successful_mappings=successful_mappings,
204+
failed_mappings=failed_mappings,
205+
)
206+
# 2. Prefix the village names
207+
for i, full_name_loc in enumerate(successful_mappings[field_def.name]):
208+
village_loc = successful_mappings["name"][i]
209+
full_name_loc.mapped_value = ", ".join((village_loc.mapped_value, full_name_loc.mapped_value))
210+
return successful_mappings, failed_mappings, parent_instances
211+
212+
def ingest_code(
213+
self,
214+
field_def,
215+
successful_mappings,
216+
failed_mappings,
217+
parent_instances,
218+
bss_value_extractors,
219+
):
220+
# 1. Populate in the same way as the name field
221+
successful_mappings, failed_mappings, parent_instances = self.ingest_name(
222+
field_def,
223+
successful_mappings,
224+
failed_mappings,
225+
parent_instances,
226+
bss_value_extractors,
227+
)
228+
# 2. Convert to lower case
229+
for loc in successful_mappings[field_def.name]:
230+
loc.mapped_value = loc.mapped_value.lower()
231+
return successful_mappings, failed_mappings, parent_instances
232+
233+
234+
@register
235+
class LivelihoodActivityImporter(Importer):
236+
class Meta:
237+
model = LivelihoodActivity
238+
fields = [
239+
"scenario",
240+
"wealth_group",
241+
"quantity_produced",
242+
"quantity_purchased",
243+
"quantity_sold",
244+
"quantity_other_uses",
245+
"quantity_consumed",
246+
"price",
247+
"income",
248+
"expenditure",
249+
"kcals_consumed",
250+
"percentage_kcals",
251+
"household_labor_provider",
252+
"strategy_type",
253+
]
254+
parent_model_fields = [
255+
"livelihood_strategy",
256+
"livelihood_zone_baseline",
257+
]
258+
dependent_model_fields = [
259+
"milkproduction",
260+
"butterproduction",
261+
"meatproduction",
262+
"livestocksale",
263+
"cropproduction",
264+
"foodpurchase",
265+
"paymentinkind",
266+
"reliefgiftother",
267+
"fishing",
268+
"wildfoodgathering",
269+
"othercashincome",
270+
"otherpurchase",
271+
]
272+
273+
def ingest_quantity_produced(
274+
self,
275+
field_def,
276+
successful_mappings,
277+
failed_mappings,
278+
parent_instances,
279+
bss_value_extractors,
280+
):
281+
# The product is specified on the first row of each LS.
282+
# Use them to iterate over each LS's rows, looking for quantity_produced field name aliases
283+
for strategy_i, strategy_loc in enumerate(parent_instances[LivelihoodStrategy]["product"]):
284+
sheet = parent_instances[LivelihoodZoneBaseline][0].load_sheet(strategy_loc.sheet_name)
285+
row_count, column_count = sheet.shape
286+
row = strategy_loc.row
287+
288+
# If there's a subsequent LS on the same sheet, scan col A until that row, otherwise scan to bottom
289+
if (
290+
strategy_i + 1 < len(parent_instances[LivelihoodStrategy]["product"])
291+
and parent_instances[LivelihoodStrategy]["product"][strategy_i + 1].sheet_name
292+
== strategy_loc.sheet_name
293+
):
294+
last_row = parent_instances[LivelihoodStrategy]["product"][strategy_i + 1].row - 1
295+
else:
296+
last_row = min(row_count, 3000)
297+
298+
# locate the field in col A
299+
while row < last_row:
300+
new_spreadsheet_location, successful_mappings, failed_mappings = self.attempt_load_from_cell(
301+
parent_instances=parent_instances,
302+
field_def=field_def,
303+
find_field=True,
304+
sheet_name=strategy_loc.sheet_name,
305+
column=0,
306+
row=row,
307+
bss_value_extractors=bss_value_extractors,
308+
successful_mappings=successful_mappings,
309+
failed_mappings=failed_mappings,
310+
)
311+
# When we locate a quantity_produced field alias in col A, stop looking and load the values
312+
if new_spreadsheet_location:
313+
break
314+
row += 1
315+
316+
# get the value on row `row` for each LA.
317+
# There is 1 WealthGroup per WealthCategory per Community, plus 1 WG per WealthCategory with no Community
318+
for wg_i, wg_loc in enumerate(parent_instances[WealthGroup]["wealth_category"]):
319+
new_spreadsheet_location, successful_mappings, failed_mappings = self.attempt_load_from_cell(
320+
parent_instances=parent_instances,
321+
field_def=field_def,
322+
find_field=False,
323+
sheet_name=strategy_loc.sheet_name,
324+
column=wg_loc.column,
325+
row=row,
326+
bss_value_extractors=bss_value_extractors,
327+
successful_mappings=successful_mappings,
328+
failed_mappings=failed_mappings,
329+
)

apps/baseline/models.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
"""
44

55
import numbers
6+
from collections import defaultdict
67

8+
import pandas as pd
79
from django.conf import settings
810
from django.contrib.gis.db import models
911
from django.core.exceptions import ValidationError
@@ -247,6 +249,14 @@ class Meta:
247249
)
248250
]
249251

252+
def load_sheet(self, sheet_name):
253+
# TODO: Apply overrides.
254+
if not hasattr(self, "cache"):
255+
self.cache = defaultdict(dict)
256+
if sheet_name not in self.cache:
257+
self.cache[sheet_name] = pd.read_excel(self.bss.path, sheet_name=sheet_name, header=None)
258+
return self.cache[sheet_name]
259+
250260

251261
class LivelihoodZoneBaselineCorrection(common_models.Model):
252262
"""

apps/ingestion/__init__.py

Whitespace-only changes.

apps/ingestion/admin.py

Whitespace-only changes.

apps/ingestion/apps.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from django.apps import AppConfig
2+
from django.utils.translation import gettext_lazy as _
3+
4+
5+
class IngestionConfig(AppConfig):
6+
default_auto_field = "django.db.models.BigAutoField"
7+
name = "ingestion"
8+
verbose_name = _("ingestion")

apps/ingestion/decorators.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import logging
2+
from inspect import isclass
3+
4+
from ingestion.exceptions import ImportException
5+
from ingestion.importers import Importer
6+
7+
logger = logging.getLogger(__name__)
8+
9+
10+
def register(importer_class=None):
11+
def report_invalid_importer(klass):
12+
raise ImportException(
13+
f"Attempted to @register an importer class which is not a sub-class of Importer. Class {klass.__name__}."
14+
)
15+
16+
def attach_importer_to_model(importer_class):
17+
if isclass(importer_class) and issubclass(importer_class, Importer):
18+
importer_class.Meta.model.importer = importer_class
19+
print(
20+
f"Attached importer class to model. {importer_class.__name__} to {importer_class.Meta.model.__name__}"
21+
)
22+
return importer_class
23+
else:
24+
report_invalid_importer(importer_class)
25+
26+
# Usage @register or register(Importer) (ie, without brackets or without @)
27+
if importer_class is not None:
28+
return attach_importer_to_model(importer_class)
29+
30+
# Usage @register() (ie, with brackets and @)
31+
def inner(importer_class_inner):
32+
return attach_importer_to_model(importer_class)
33+
34+
return inner

0 commit comments

Comments
 (0)