1717import java .nio .file .Path ;
1818import java .time .Instant ;
1919import java .time .format .DateTimeFormatter ;
20- import java .util .ArrayList ;
21- import java .util .List ;
2220import java .util .Objects ;
21+ import java .util .concurrent .CopyOnWriteArrayList ;
2322
2423import static org .apache .spark .sql .functions .*;
2524
@@ -73,6 +72,7 @@ public void run() {
7372 .format ("com.crealytics.spark.excel" ) // Or .format("excel") for V2 implementation
7473 .option ("dataAddress" , String .format ("'%s'!A1" , SHEETS .researchers .value ())) // Optional, default: "A1"
7574 .option ("treatEmptyValuesAsNulls" , "false" ) // Optional, default: true
75+ .option ("maxRowsInMemory" , 20 )
7676 .option ("header" , "true" )
7777 .load (input .toString ())
7878 .toDF ("_c0" , "_c1" , "_c2" , "_c3" )
@@ -87,6 +87,7 @@ public void run() {
8787 .format ("com.crealytics.spark.excel" ) // Or .format("excel") for V2 implementation
8888 .option ("dataAddress" , String .format ("'%s'!A1" , SHEETS .departments .value ())) // Optional, default: "A1"
8989 .option ("treatEmptyValuesAsNulls" , "false" ) // Optional, default: true
90+ .option ("maxRowsInMemory" , 20 )
9091 .option ("header" , "true" )
9192 .load (input .toString ())
9293 .toDF ("_c0" , "_c1" , "_c2" , "_c3" , "_c4" , "_c5" , "_c6" )
@@ -100,7 +101,8 @@ public void run() {
100101 .read ()
101102 .format ("com.crealytics.spark.excel" ) // Or .format("excel") for V2 implementation
102103 .option ("dataAddress" , String .format ("'%s'!A1" , SHEETS .departments_relations .value ())) // Optional, default: "A1"
103- .option ("treatEmptyValuesAsNulls" , "false" ) // Optional, default: true
104+ .option ("treatEmptyValuesAsNulls" , "false" ) // Optional, default:
105+ .option ("maxRowsInMemory" , 20 )
104106 .option ("header" , "true" )
105107 .load (input .toString ())
106108 .toDF ("_c0" , "_c1" )
@@ -123,6 +125,7 @@ public void run() {
123125 .format ("com.crealytics.spark.excel" ) // Or .format("excel") for V2 implementation
124126 .option ("dataAddress" , String .format ("'%s'!A1" , SHEETS .research_groups .value ())) // Optional, default: "A1"
125127 .option ("treatEmptyValuesAsNulls" , "false" ) // Optional, default: true
128+ .option ("maxRowsInMemory" , 20 )
126129 .option ("header" , "true" )
127130 .load (input .toString ())
128131 .toDF ("_c0" , "_c1" , "_c2" , "_c3" , "_c4" , "_c5" , "_c6" )
@@ -137,6 +140,7 @@ public void run() {
137140 .format ("com.crealytics.spark.excel" ) // Or .format("excel") for V2 implementation
138141 .option ("dataAddress" , String .format ("'%s'!A1" , SHEETS .research_groups_relations .value ())) // Optional, default: "A1"
139142 .option ("treatEmptyValuesAsNulls" , "false" ) // Optional, default: true
143+ .option ("maxRowsInMemory" , 20 )
140144 .option ("header" , "true" )
141145 .load (input .toString ())
142146 .toDF ("_c0" , "_c1" , "_c2" , "_c3" )
@@ -161,6 +165,7 @@ public void run() {
161165 .format ("com.crealytics.spark.excel" ) // Or .format("excel") for V2 implementation
162166 .option ("dataAddress" , String .format ("'%s'!A1" , SHEETS .projects .value ())) // Optional, default: "A1"
163167 .option ("treatEmptyValuesAsNulls" , "false" ) // Optional, default: true
168+ .option ("maxRowsInMemory" , 20 )
164169 .option ("header" , "true" )
165170 .load (input .toString ())
166171 .toDF ("_c0" , "_c1" , "_c2" , "_c3" , "_c4" , "_c5" , "_c6" )
@@ -175,6 +180,7 @@ public void run() {
175180 .format ("com.crealytics.spark.excel" ) // Or .format("excel") for V2 implementation
176181 .option ("dataAddress" , String .format ("'%s'!A1" , SHEETS .projects_relations .value ())) // Optional, default: "A1"
177182 .option ("treatEmptyValuesAsNulls" , "false" ) // Optional, default: true
183+ .option ("maxRowsInMemory" , 20 )
178184 .option ("header" , "true" )
179185 .load (input .toString ())
180186 .toDF ("_c0" , "_c1" , "_c2" , "_c3" )
@@ -198,6 +204,7 @@ public void run() {
198204 .format ("com.crealytics.spark.excel" ) // Or .format("excel") for V2 implementation
199205 .option ("dataAddress" , String .format ("'%s'!A1" , SHEETS .publications .value ())) // Optional, default: "A1"
200206 .option ("treatEmptyValuesAsNulls" , "false" ) // Optional, default: true
207+ .option ("maxRowsInMemory" , 20 )
201208 .option ("header" , "true" )
202209 .load (input .toString ())
203210 .toDF ("_c0" , "_c1" , "_c2" , "_c3" , "_c4" , "_c5" , "_c6" , "_c7" , "_c8" , "_c9" , "_c10" , "_c11" , "_c12" , "_c13" , "_c14" )
@@ -212,6 +219,7 @@ public void run() {
212219 .format ("com.crealytics.spark.excel" ) // Or .format("excel") for V2 implementation
213220 .option ("dataAddress" , String .format ("'%s'!A1" , SHEETS .publication_relations .value ())) // Optional, default: "A1"
214221 .option ("treatEmptyValuesAsNulls" , "false" ) // Optional, default: true
222+ .option ("maxRowsInMemory" , 20 )
215223 .option ("header" , "true" )
216224 .load (input .toString ())
217225 .toDF ("_c0" , "_c1" , "_c2" , "_c3" )
@@ -234,43 +242,49 @@ public void run() {
234242 Dataset <Row > research_groups_join = research_groups .join (research_groups_relations , col ("research_groups._c4" ).equalTo (col ("research_groups_relations._c0" )), "left" ).drop (col ("research_groups_relations._c0" ));
235243 Dataset <Row > publication_join = publications .join (publication_relations , col ("publications._c1" ).equalTo (col ("publication_relations._c0" )), "left" ).drop (col ("publication_relations._c0" ));
236244
237- // projects_join.show(false);
238- // departments_join.show(false);
239- // research_groups_join.show(false);
240- // publication_join.show(false);
241-
242245 //CERIF
243246 Marshaller marshaller = new Marshaller (ruct );
244247
245- List <CfPersType > cfPersTypeList = new ArrayList <>();
246- List <CfOrgUnitType > cfOrgUnitTypeList = new ArrayList <>();
247- List <CfProjType > cfProjTypeList = new ArrayList <>();
248- List <CfResPublType > cfResPublTypeList = new ArrayList <>();
249-
250- researchers .collectAsList ().forEach (row -> {
251- cfPersTypeList .add (new Researcher (row , Semantics .getClassId (ClassId .CHECKED )));
252- });
253-
254- departments_join .collectAsList ().forEach (row -> {
255- cfOrgUnitTypeList .add (new Department (row , Semantics .getClassId (ClassId .DEPARTMENT_OR_INSTITUTE )));
256- });
257-
258- research_groups_join .collectAsList ().forEach (row -> {
259- cfOrgUnitTypeList .add (new ResearchGroup (row , Semantics .getClassId (ClassId .RESEARCH_GROUP ), cfPersTypeList ));
260- });
261-
262- projects_join .collectAsList ().forEach (row -> {
263- cfProjTypeList .add (new Project (row , cfPersTypeList ));
264- });
265-
266- publication_join .collectAsList ().forEach (row -> {
267- cfResPublTypeList .add (new Publication (row , cfPersTypeList ));
268- });
248+ CopyOnWriteArrayList <CfPersType > cfPersTypeList = new CopyOnWriteArrayList <>();
249+ CopyOnWriteArrayList <CfOrgUnitType > cfOrgUnitTypeList = new CopyOnWriteArrayList <>();
250+ CopyOnWriteArrayList <CfOrgUnitType > cfOrgUnitTypeList_2 = new CopyOnWriteArrayList <>();
251+ CopyOnWriteArrayList <CfProjType > cfProjTypeList = new CopyOnWriteArrayList <>();
252+ CopyOnWriteArrayList <CfResPublType > cfResPublTypeList = new CopyOnWriteArrayList <>();
253+
254+ if (researchers .count () > 0 ) {
255+ researchers .collectAsList ().forEach (row -> {
256+ cfPersTypeList .add (new Researcher (row , Semantics .getClassId (ClassId .CHECKED )));
257+ });
258+ }
259+
260+ if (departments_join .count () > 0 ) {
261+ departments_join .collectAsList ().forEach (row -> {
262+ cfOrgUnitTypeList .add (new Department (row , Semantics .getClassId (ClassId .DEPARTMENT_OR_INSTITUTE )));
263+ });
264+ }
265+
266+ if (research_groups_join .count () > 0 ) {
267+ research_groups_join .collectAsList ().forEach (row -> {
268+ cfOrgUnitTypeList_2 .add (new ResearchGroup (row , Semantics .getClassId (ClassId .RESEARCH_GROUP ), cfPersTypeList ));
269+ });
270+ }
271+
272+ if (projects_join .count () > 0 ) {
273+ projects_join .collectAsList ().forEach (row -> {
274+ cfProjTypeList .add (new Project (row , cfPersTypeList ));
275+ });
276+ }
277+
278+ if (publication_join .count () > 0 ) {
279+ publication_join .collectAsList ().forEach (row -> {
280+ cfResPublTypeList .add (new Publication (row , cfPersTypeList ));
281+ });
282+ }
269283
270284 if (Objects .isNull (output ))
271- marshaller .buld (String .format ("/tmp/%s.xml" , ruct ), formatted , cfPersTypeList , cfOrgUnitTypeList , cfProjTypeList , cfResPublTypeList );
285+ marshaller .build (String .format ("/tmp/%s.xml" , ruct ), formatted , cfPersTypeList , cfOrgUnitTypeList , cfProjTypeList , cfResPublTypeList );
272286 else
273- marshaller .buld (output .toString (), formatted , cfPersTypeList , cfOrgUnitTypeList , cfProjTypeList , cfResPublTypeList );
287+ marshaller .build (output .toString (), formatted , cfPersTypeList , cfOrgUnitTypeList , cfProjTypeList , cfResPublTypeList );
274288
275289
276290 sparkSession .log ().info ("Saved output {}" , Objects .isNull (output ) ? String .format ("/tmp/%s.xml" , ruct ) : output );
0 commit comments