Skip to content

Commit

Permalink
PIG-5413: [spark] TestStreaming.testInputCacheSpecs failing with "Fil…
Browse files Browse the repository at this point in the history
…e script1.pl was already registered" (knoguchi)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1896929 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Koji Noguchi committed Jan 11, 2022
1 parent d39288c commit a3ba5e2
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ OPTIMIZATIONS

BUG FIXES

PIG-5413: [spark] TestStreaming.testInputCacheSpecs failing with "File script1.pl was already registered" (knoguchi)

PIG-5415: [spark] TestScriptLanguage conflict between multiple SparkContext (after spark2.4 upgrade) (knoguchi)

PIG-5412: testSkewedJoinOuter spark unit-test failing with ClassNotFoundException (knoguchi)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ public class SparkLauncher extends Launcher {
private SparkEngineConf sparkEngineConf = new SparkEngineConf();
private static final String PIG_WARNING_FQCN = PigWarning.class.getCanonicalName();

// this set is unnecessary once PIG-5241 is fixed
private static Set<String> allCachedFiles = null;

@Override
public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
PigContext pigContext) throws Exception {
Expand Down Expand Up @@ -418,8 +421,11 @@ private void cacheFiles(String cacheFiles) throws IOException {
fs.copyToLocalFile(src, tmpFilePath);
tmpFile.deleteOnExit();
LOG.info(String.format("CacheFile:%s", fileName));
addResourceToSparkJobWorkingDirectory(tmpFile, fileName,
ResourceType.FILE);
if(!allCachedFiles.contains(file.trim())) {
allCachedFiles.add(file.trim());
addResourceToSparkJobWorkingDirectory(tmpFile, fileName,
ResourceType.FILE);
}
}
}
}
Expand Down Expand Up @@ -641,6 +647,7 @@ private static synchronized void startSparkIfNeeded(JobConf jobConf, PigContext
sparkContext = new JavaSparkContext(sparkConf);
SparkShims.getInstance().addSparkListener(sparkContext.sc(), jobStatisticCollector.getSparkListener());
SparkShims.getInstance().addSparkListener(sparkContext.sc(), new StatsReportListener());
allCachedFiles = new HashSet<String>();
}
jobConf.set(SPARK_VERSION, sparkContext.version());
}
Expand Down

0 comments on commit a3ba5e2

Please sign in to comment.