Skip to content

Commit

Permalink
Merge pull request #25 from PDOK/fix-oom
Browse files Browse the repository at this point in the history
fix: OOM
  • Loading branch information
rkettelerij authored Jan 10, 2025
2 parents 4d25d4f + 4d1c0af commit e297505
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 23 deletions.
13 changes: 8 additions & 5 deletions internal/etl/etl.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Extract interface {
type Transform interface {

// Transform each raw record in one or more search records depending on the given configuration
Transform(records []t.RawRecord, collection config.GeoSpatialCollection, substitutionsFile string, synonymsFile string) ([]t.SearchIndexRecord, error)
Transform(records []t.RawRecord, collection config.GeoSpatialCollection) ([]t.SearchIndexRecord, error)
}

// Load - the 'L' in ETL. Datasource agnostic interface to load data into target database.
Expand Down Expand Up @@ -74,7 +74,10 @@ func ImportFile(collection config.GeoSpatialCollection, searchIndex string, file
}
defer target.Close()

transformer := newTransformer()
transformer, err := newTransformer(substitutionsFile, synonymsFile)
if err != nil {
return err
}

// import records in batches depending on page size
offset := 0
Expand All @@ -90,7 +93,7 @@ func ImportFile(collection config.GeoSpatialCollection, searchIndex string, file
break // no more batches of records to extract
}
log.Printf("extracted %d source records, starting transform", sourceRecordCount)
targetRecords, err := transformer.Transform(sourceRecords, collection, substitutionsFile, synonymsFile)
targetRecords, err := transformer.Transform(sourceRecords, collection)
if err != nil {
return fmt.Errorf("failed to transform raw records to search index records: %w", err)
}
Expand Down Expand Up @@ -123,6 +126,6 @@ func newTargetToLoad(dbConn string) (Load, error) {
return nil, fmt.Errorf("unsupported target database connection: %s", dbConn)
}

func newTransformer() Transform {
return t.Transformer{}
func newTransformer(substitutionsFile string, synonymsFile string) (Transform, error) {
return t.NewTransformer(substitutionsFile, synonymsFile)
}
2 changes: 1 addition & 1 deletion internal/etl/etl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestImportGeoPackage(t *testing.T) {
{
name: "import everything",
where: "",
count: 67230, // 33030*2 + substitution and synonyms combinations
count: 72210, // 33030*2 + substitution and synonyms combinations
},
{
name: "with where clause",
Expand Down
3 changes: 2 additions & 1 deletion internal/etl/testdata/synonyms.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
eerste,1ste
tweede,2de
fryslân,friesland
fryslân,friesland
oud,oude
29 changes: 21 additions & 8 deletions internal/etl/transform/subst_and_synonyms.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ func (s SubstAndSynonyms) generate(fieldValuesByName map[string]string) []map[st
// Create map with for each key a slice of []values
fieldValuesByNameWithAllValues[key] = allValues
}
return generateAllCombinations(fieldValuesByNameWithAllValues)
combinations := generateAllCombinations(fieldValuesByNameWithAllValues)
return combinations
}

// Transform a map[string][]string into a []map[string]string using the cartesian product, i.e.
Expand Down Expand Up @@ -82,19 +83,31 @@ func generateCombinations(keys []string, values [][]string) []map[string]string

func extendValues(input []string, mapping map[string]string) []string {
var results []string
results = append(results, input...)

for j := range input {
for len(input) > 0 {
// Pop the first element from the input slice
current := input[0]
input = input[1:]

// Add the current string to the results
results = append(results, current)

// Generate new strings based on the mapping
for oldChar, newChar := range mapping {
if strings.Contains(input[j], oldChar) {
for i := 0; i < strings.Count(input[j], oldChar); i++ {
extendedInput := replaceNth(input[j], oldChar, newChar, i+1)
subCombinations := extendValues([]string{extendedInput}, mapping)
results = append(results, subCombinations...)
if strings.Contains(current, oldChar) {
for i := 0; i < strings.Count(current, oldChar); i++ {
if strings.HasPrefix(newChar, oldChar) {
// skip to prevent endless loop for cases such as
// oldChar = "foo", newChar = "foos" and input = "foosball", which would otherwise result in "foosssssssssssssssball"
continue
}
extendedInput := replaceNth(current, oldChar, newChar, i+1)
input = append(input, extendedInput)
}
}
}
}

// Possible performance improvement here by avoiding duplicates in the first place
return uniqueSlice(results)
}
Expand Down
5 changes: 3 additions & 2 deletions internal/etl/transform/subst_and_synonyms_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/stretchr/testify/assert"
)

func Test_generateAllFieldValues(t *testing.T) {
func Test_generate(t *testing.T) {
type args struct {
fieldValuesByName map[string]string
substitutionsFile string
Expand All @@ -23,6 +23,7 @@ func Test_generateAllFieldValues(t *testing.T) {
{"single synonym record", args{map[string]string{"component_thoroughfarename": "eerste", "component_postaldescriptor": "1234AB", "component_addressareaname": "bar"}, "../testdata/substitutions.csv", "../testdata/synonyms.csv"}, []map[string]string{{"component_thoroughfarename": "eerste", "component_postaldescriptor": "1234ab", "component_addressareaname": "bar"}, {"component_thoroughfarename": "1ste", "component_postaldescriptor": "1234ab", "component_addressareaname": "bar"}}, assert.NoError},
{"single synonym with capital", args{map[string]string{"component_thoroughfarename": "Eerste", "component_postaldescriptor": "1234AB", "component_addressareaname": "bar"}, "../testdata/substitutions.csv", "../testdata/synonyms.csv"}, []map[string]string{{"component_thoroughfarename": "eerste", "component_postaldescriptor": "1234ab", "component_addressareaname": "bar"}, {"component_thoroughfarename": "1ste", "component_postaldescriptor": "1234ab", "component_addressareaname": "bar"}}, assert.NoError},
{"two-way synonym record", args{map[string]string{"component_thoroughfarename": "eerste 2de", "component_postaldescriptor": "1234AB", "component_addressareaname": "bar"}, "../testdata/substitutions.csv", "../testdata/synonyms.csv"}, []map[string]string{{"component_thoroughfarename": "eerste 2de", "component_postaldescriptor": "1234ab", "component_addressareaname": "bar"}, {"component_thoroughfarename": "1ste 2de", "component_postaldescriptor": "1234ab", "component_addressareaname": "bar"}, {"component_thoroughfarename": "eerste tweede", "component_postaldescriptor": "1234ab", "component_addressareaname": "bar"}, {"component_thoroughfarename": "1ste tweede", "component_postaldescriptor": "1234ab", "component_addressareaname": "bar"}}, assert.NoError},
{"avoid endless loop for synonyms that contain source value", args{map[string]string{"street": "oude kerkstraat"}, "../testdata/substitutions.csv", "../testdata/synonyms.csv"}, []map[string]string{{"street": "oude kerkstraat"}, {"street": "oud kerkstraat"}}, assert.NoError},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -135,7 +136,7 @@ func Test_readCsvFile(t *testing.T) {
wantErr assert.ErrorAssertionFunc
}{
{"Read substitutions csv", args{"../testdata/substitutions.csv"}, map[string]string{"ae": "a", "à": "a"}, assert.NoError},
{"Read synonyms csv", args{"../testdata/synonyms.csv"}, map[string]string{"eerste": "1ste", "fryslân": "friesland", "tweede": "2de"}, assert.NoError},
{"Read synonyms csv", args{"../testdata/synonyms.csv"}, map[string]string{"eerste": "1ste", "fryslân": "friesland", "oud": "oude", "tweede": "2de"}, assert.NoError},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
14 changes: 8 additions & 6 deletions internal/etl/transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ type SearchIndexRecord struct {
Bbox *pggeom.Polygon
}

type Transformer struct{}
type Transformer struct {
substAndSynonyms *SubstAndSynonyms
}

func (t Transformer) Transform(records []RawRecord, collection config.GeoSpatialCollection, substitutionsFile string, synonymsFile string) ([]SearchIndexRecord, error) {
func NewTransformer(substitutionsFile string, synonymsFile string) (*Transformer, error) {
substAndSynonyms, err := NewSubstAndSynonyms(substitutionsFile, synonymsFile)
if err != nil {
return nil, err
}
return &Transformer{substAndSynonyms}, err
}

func (t Transformer) Transform(records []RawRecord, collection config.GeoSpatialCollection) ([]SearchIndexRecord, error) {
result := make([]SearchIndexRecord, 0, len(records))
for _, r := range records {
fieldValuesByName, err := slicesToStringMap(collection.Search.Fields, r.FieldValues)
Expand All @@ -51,7 +53,7 @@ func (t Transformer) Transform(records []RawRecord, collection config.GeoSpatial
if err != nil {
return nil, err
}
allFieldValuesByName := substAndSynonyms.generate(fieldValuesByName)
allFieldValuesByName := t.substAndSynonyms.generate(fieldValuesByName)
suggestions := make([]string, 0, len(collection.Search.ETL.SuggestTemplates))
for i := range allFieldValuesByName {
for _, suggestTemplate := range collection.Search.ETL.SuggestTemplates {
Expand Down

0 comments on commit e297505

Please sign in to comment.