7
7
8
8
9
9
import elasticsearch .helpers
10
- from elasticsearch import Elasticsearch , RequestsHttpConnection , serializer , compat , exceptions
10
+ from elasticsearch import Elasticsearch , RequestsHttpConnection , serializer , compat , exceptions , helpers
11
11
12
12
if (not os .environ .get ('PYTHONHTTPSVERIFY' , '' ) and getattr (ssl , '_create_unverified_context' , None )):
13
13
ssl ._create_default_https_context = ssl ._create_unverified_context
@@ -71,31 +71,39 @@ def load(self):
71
71
def __load_file (self , file ):
72
72
doc_count = 0
73
73
data = []
74
+ chunk_size = 100 # Set chunk size to 100 records
74
75
75
76
with open (file ) as f :
76
77
print ("Starting indexing on " + f .name )
77
78
reader = csv .DictReader (f )
78
79
79
80
for row in reader :
80
81
# gracefully handle empty locations
81
- if ( row ['decimalLatitude' ] == '' or row ['decimalLongitude' ] == '' ):
82
+ if row ['decimalLatitude' ] == '' or row ['decimalLongitude' ] == '' :
82
83
row ['location' ] = ''
83
84
else :
84
- row ['location' ] = row ['decimalLatitude' ] + "," + row ['decimalLongitude' ]
85
+ row ['location' ] = row ['decimalLatitude' ] + "," + row ['decimalLongitude' ]
85
86
86
- # pipeline code identifies null yearCollected values as 'unknown'. es_loader should be empty string
87
- if (row ['yearCollected' ] == 'unknown' ):
88
- row ['yearCollected' ] = ''
89
- if (row ['yearCollected' ] == 'Unknown' ):
87
+ # handle 'unknown' values for yearCollected
88
+ if row ['yearCollected' ].lower () == 'unknown' :
90
89
row ['yearCollected' ] = ''
91
90
92
- data .append ({k : v for k , v in row .items () if v }) # remove any empty values
91
+ data .append ({k : v for k , v in row .items () if v }) # remove empty values
92
+
93
+ # When chunk_size is reached, send bulk data to Elasticsearch
94
+ if len (data ) == chunk_size :
95
+ helpers .bulk ( client = self .es , index = self .index_name , actions = data , raise_on_error = True , request_timeout = 60 )
96
+ doc_count += len (data )
97
+ print (f"Indexed { len (data )} documents. Total indexed: { doc_count } " )
98
+ data = [] # Clear the data list for the next chunk
93
99
94
- elasticsearch .helpers .bulk (client = self .es , index = self .index_name , actions = data ,
95
- raise_on_error = True , chunk_size = 10000 , request_timeout = 60 )
96
- doc_count += len (data )
97
- print ("Indexed {} documents in {}" .format (doc_count , f .name ))
100
+ # Index remaining data if it’s less than chunk_size
101
+ if data :
102
+ helpers .bulk ( client = self .es , index = self .index_name , actions = data , raise_on_error = True , request_timeout = 60 )
103
+ doc_count += len (data )
104
+ print (f"Indexed { len (data )} remaining documents. Total indexed: { doc_count } " )
98
105
106
+ print ("Finished indexing in" , f .name )
99
107
return doc_count
100
108
101
109
def __create_index (self ):
@@ -144,7 +152,6 @@ def __create_index(self):
144
152
"dayCollected" : {"type" :"text" },
145
153
"verbatimEventDate" : {"type" :"text" },
146
154
"collectorList" : {"type" : "text" },
147
- "Sample_bcid" : {"type" : "text" },
148
155
"occurrenceID" : {"type" :"text" },
149
156
"otherCatalogNumbers" : {"type" :"text" },
150
157
"fieldNumber" : {"type" :"text" },
@@ -171,7 +178,8 @@ def __create_index(self):
171
178
"zeScore" : {"type" :"text" },
172
179
"diagnosticLab" : {"type" :"text" },
173
180
"projectId" : {"type" :"text" },
174
- "projectURL" : {"type" :"text" }
181
+ "projectURL" : {"type" :"text" },
182
+ "Sample_bcid" : {"type" : "text" }
175
183
}
176
184
}
177
185
}
@@ -190,7 +198,7 @@ def get_files(dir, ext='csv'):
190
198
index = 'amphibiandisease'
191
199
drop_existing = True
192
200
alias = 'amphibiandisease'
193
- host = 'tarly.cyverse.org :80'
201
+ host = '149.165.170.158 :80'
194
202
#file_location = 'test.csv'
195
203
file_location = 'data/amphibian_disease_data_processed.csv'
196
204
0 commit comments