Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into trunk
Browse files Browse the repository at this point in the history
  • Loading branch information
lujiefsi authored May 15, 2022
2 parents 13054a0 + a3ba5e2 commit 417b9b0
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 7 deletions.
10 changes: 10 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ PIG-5282: Upgade to Java 8 (satishsaley via rohini)

IMPROVEMENTS

PIG-5398: SparkLauncher does not read SPARK_CONF_DIR/spark-defaults.conf (knoguchi)

PIG-5397: Update spark2.version to 2.4.8 (knoguchi)

PIG-5400: OrcStorage dropping struct(tuple) when it only holds a single field inside a Bag(array) (knoguchI)

PIG-4764: Make Pig work with Hive 3.1 (szita)
Expand Down Expand Up @@ -100,6 +104,12 @@ OPTIMIZATIONS

BUG FIXES

PIG-5413: [spark] TestStreaming.testInputCacheSpecs failing with "File script1.pl was already registered" (knoguchi)

PIG-5415: [spark] TestScriptLanguage conflict between multiple SparkContext (after spark2.4 upgrade) (knoguchi)

PIG-5412: testSkewedJoinOuter spark unit-test failing with ClassNotFoundException (knoguchi)

PIG-5404: FLATTEN infers wrong datatype (knoguchi)

PIG-5243: describe with typecast on as-clause shows the types before the typecasting (knoguchi)
Expand Down
2 changes: 1 addition & 1 deletion build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@
value="${mvnrepo}/org/codehaus/jackson/jackson-core-asl/${jackson-pig-3039-test.version}/jackson-core-asl-${jackson-pig-3039-test.version}.jar"/>
<property name="jackson_mapper_repo_url"
value="${mvnrepo}/org/codehaus/jackson/jackson-mapper-asl/${jackson-pig-3039-test.version}/jackson-mapper-asl-${jackson-pig-3039-test.version}.jar"/>
<property name="test.spark.spark_master" value="yarn-client" />
<property name="test.spark.spark_master" value="yarn" />

<!--this is the naming policy for artifacts we want pulled down-->
<property name="ivy.artifact.retrieve.pattern" value="${ant.project.name}/[artifact]-[revision](-[classifier]).[ext]"/>
Expand Down
4 changes: 2 additions & 2 deletions ivy/libraries.properties
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ antlr.version=3.4
stringtemplate.version=4.0.4
log4j.version=1.2.16
netty.version=3.6.6.Final
netty-all.version=4.1.1.Final
netty-all.version=4.1.17.Final
rats-lib.version=0.5.1
slf4j-api.version=1.6.1
slf4j-log4j12.version=1.6.1
spark1.version=1.6.1
spark2.version=2.1.1
spark2.version=2.4.8
xerces.version=2.10.0
xalan.version=2.7.1
wagon-http.version=1.0-beta-2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package org.apache.pig.backend.hadoop.executionengine.spark;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
Expand Down Expand Up @@ -162,6 +165,9 @@ public class SparkLauncher extends Launcher {
private SparkEngineConf sparkEngineConf = new SparkEngineConf();
private static final String PIG_WARNING_FQCN = PigWarning.class.getCanonicalName();

// this set is unnecessary once PIG-5241 is fixed
private static Set<String> allCachedFiles = null;

@Override
public PigStats launchPig(PhysicalPlan physicalPlan, String grpName,
PigContext pigContext) throws Exception {
Expand Down Expand Up @@ -415,8 +421,11 @@ private void cacheFiles(String cacheFiles) throws IOException {
fs.copyToLocalFile(src, tmpFilePath);
tmpFile.deleteOnExit();
LOG.info(String.format("CacheFile:%s", fileName));
addResourceToSparkJobWorkingDirectory(tmpFile, fileName,
ResourceType.FILE);
if(!allCachedFiles.contains(file.trim())) {
allCachedFiles.add(file.trim());
addResourceToSparkJobWorkingDirectory(tmpFile, fileName,
ResourceType.FILE);
}
}
}
}
Expand Down Expand Up @@ -569,7 +578,7 @@ private static synchronized void startSparkIfNeeded(JobConf jobConf, PigContext
}
if (sparkContext == null) {
String sparkHome = System.getenv("SPARK_HOME");
if (!master.startsWith("local") && !master.equals("yarn-client")) {
if (!master.startsWith("local") && !master.equals("yarn")) {
// Check that we have the Mesos native library and Spark home
// are set
if (sparkHome == null) {
Expand All @@ -596,6 +605,30 @@ private static synchronized void startSparkIfNeeded(JobConf jobConf, PigContext
LOG.warn("SPARK_HOME is not set");
}

String sparkConfEnv = System.getenv("SPARK_CONF_DIR");
if( sparkConfEnv == null && sparkHome != null) {
sparkConfEnv = sparkHome + "/conf";
}
if( sparkConfEnv != null ) {
try {
Properties props = new Properties();
File propsFile = new File (sparkConfEnv,"spark-defaults.conf");
if (propsFile.isFile()) {
try (InputStreamReader isr = new InputStreamReader(
new FileInputStream(propsFile), StandardCharsets.UTF_8)) {
props.load(isr);
for (Map.Entry<Object, Object> e : props.entrySet()) {
pigCtxtProperties.setProperty(e.getKey().toString(),
e.getValue().toString().trim());
}
}
}
} catch (IOException ex) {
LOG.warn("Reading $SPARK_CONF_DIR/spark-defaults.conf failed");
}
}


//Copy all spark.* properties to SparkConf
for (String key : pigCtxtProperties.stringPropertyNames()) {
if (key.startsWith("spark.")) {
Expand All @@ -612,10 +645,11 @@ private static synchronized void startSparkIfNeeded(JobConf jobConf, PigContext
checkAndConfigureDynamicAllocation(master, sparkConf);

sparkContext = new JavaSparkContext(sparkConf);
jobConf.set(SPARK_VERSION, sparkContext.version());
SparkShims.getInstance().addSparkListener(sparkContext.sc(), jobStatisticCollector.getSparkListener());
SparkShims.getInstance().addSparkListener(sparkContext.sc(), new StatsReportListener());
allCachedFiles = new HashSet<String>();
}
jobConf.set(SPARK_VERSION, sparkContext.version());
}

private static void checkAndConfigureDynamicAllocation(String master, SparkConf sparkConf) {
Expand Down

0 comments on commit 417b9b0

Please sign in to comment.