Skip to content

Commit 0f912c1

Browse files
committed
Support a deep path when sampling geo_shape fields
1 parent 7f14857 commit 0f912c1

File tree

5 files changed

+171
-2
lines changed

5 files changed

+171
-2
lines changed

mr/src/main/java/org/elasticsearch/hadoop/rest/RestRepository.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ public Map<String, GeoField> sampleGeoFields(Mapping mapping) {
300300
Map<String, GeoField> geoInfo = new LinkedHashMap<String, GeoField>();
301301
for (Entry<String, GeoType> geoEntry : fields.entrySet()) {
302302
String fieldName = geoEntry.getKey();
303-
geoInfo.put(fieldName, MappingUtils.parseGeoInfo(geoEntry.getValue(), geoMapping.get(fieldName)));
303+
geoInfo.put(fieldName, MappingUtils.parseGeoInfo(geoEntry.getValue(), MappingUtils.getGeoMapping(geoMapping, fieldName)));
304304
}
305305

306306
return geoInfo;

mr/src/main/java/org/elasticsearch/hadoop/serialization/dto/mapping/MappingUtils.java

+19
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,25 @@ public abstract class MappingUtils {
4747
"_parent", "_routing", "_index", "_size", "_timestamp", "_ttl", "_field_names", "_meta"));
4848
}
4949

50+
public static Object getGeoMapping(Map<String, Object> map, String path) {
51+
String[] keys = path.split("\\.");
52+
Object currentValue = map;
53+
54+
for (String key : keys) {
55+
if (currentValue instanceof ArrayList) {
56+
currentValue = ((ArrayList)currentValue).get(0);
57+
}
58+
59+
if (currentValue instanceof Map) {
60+
currentValue = ((Map<String, Object>) currentValue).get(key);
61+
} else {
62+
return null;
63+
}
64+
}
65+
66+
return currentValue;
67+
}
68+
5069
public static void validateMapping(String fields, Mapping mapping, FieldPresenceValidation validation, Log log) {
5170
if (StringUtils.hasText(fields)) {
5271
validateMapping(StringUtils.tokenize(fields), mapping, validation, log);

spark/sql-13/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala

+50
Original file line numberDiff line numberDiff line change
@@ -1819,6 +1819,56 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
18191819
assertThat(array(1), is(0.0d))
18201820
}
18211821

1822+
@Test
1823+
def testGeoShapePointDeep() {
1824+
val mapping = wrapMapping("data", s"""{
1825+
| "properties": {
1826+
| "name": {
1827+
| "type": "$keyword"
1828+
| },
1829+
| "location": {
1830+
| "properties": {
1831+
| "deep": {
1832+
| "type": "geo_shape"
1833+
| }
1834+
| }
1835+
| }
1836+
| }
1837+
| }
1838+
""".stripMargin)
1839+
1840+
val index = wrapIndex("sparksql-test-geoshape-point-geoshape-deep")
1841+
val typed = "data"
1842+
val (target, _) = makeTargets(index, typed)
1843+
RestUtils.touch(index)
1844+
RestUtils.putMapping(index, typed, mapping.getBytes(StringUtils.UTF_8))
1845+
1846+
val point = """{"name":"point", "location": { "deep":{ "type" : "point", "coordinates": [100.0, 0.0] } }}""".stripMargin
1847+
1848+
sc.makeRDD(Seq(point)).saveJsonToEs(target)
1849+
val df = sqc.read.format("es").load(index)
1850+
1851+
println(df.schema.treeString)
1852+
1853+
val dataType = df.schema("location").dataType.asInstanceOf[StructType]("deep").dataType
1854+
assertEquals("struct", dataType.typeName)
1855+
1856+
val struct = dataType.asInstanceOf[StructType]
1857+
assertTrue(struct.fieldNames.contains("type"))
1858+
var coords = struct("coordinates").dataType
1859+
assertEquals("array", coords.typeName)
1860+
coords = coords.asInstanceOf[ArrayType].elementType
1861+
assertEquals("double", coords.typeName)
1862+
1863+
val head = df.select("location.*").head()
1864+
1865+
val obj = head.getStruct(0)
1866+
assertThat(obj.getString(0), is("point"))
1867+
val array = obj.getSeq[Double](1)
1868+
assertThat(array(0), is(100.0d))
1869+
assertThat(array(1), is(0.0d))
1870+
}
1871+
18221872
@Test
18231873
def testGeoShapeLine() {
18241874
val mapping = wrapMapping("data", s"""{

spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala

+51-1
Original file line numberDiff line numberDiff line change
@@ -1881,6 +1881,56 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
18811881
assertThat(array(1), is(0.0d))
18821882
}
18831883

1884+
@Test
1885+
def testGeoShapePointDeep() {
1886+
val mapping = wrapMapping("data", s"""{
1887+
| "properties": {
1888+
| "name": {
1889+
| "type": "$keyword"
1890+
| },
1891+
| "location": {
1892+
| "properties": {
1893+
| "deep": {
1894+
| "type": "geo_shape"
1895+
| }
1896+
| }
1897+
| }
1898+
| }
1899+
| }
1900+
""".stripMargin)
1901+
1902+
val index = wrapIndex("sparksql-test-geoshape-point-geoshape-deep")
1903+
val typed = "data"
1904+
val (target, _) = makeTargets(index, typed)
1905+
RestUtils.touch(index)
1906+
RestUtils.putMapping(index, typed, mapping.getBytes(StringUtils.UTF_8))
1907+
1908+
val point = """{"name":"point", "location": { "deep":{ "type" : "point", "coordinates": [100.0, 0.0] } }}""".stripMargin
1909+
1910+
sc.makeRDD(Seq(point)).saveJsonToEs(target)
1911+
val df = sqc.read.format("es").load(index)
1912+
1913+
println(df.schema.treeString)
1914+
1915+
val dataType = df.schema("location").dataType.asInstanceOf[StructType]("deep").dataType
1916+
assertEquals("struct", dataType.typeName)
1917+
1918+
val struct = dataType.asInstanceOf[StructType]
1919+
assertTrue(struct.fieldNames.contains("type"))
1920+
var coords = struct("coordinates").dataType
1921+
assertEquals("array", coords.typeName)
1922+
coords = coords.asInstanceOf[ArrayType].elementType
1923+
assertEquals("double", coords.typeName)
1924+
1925+
val head = df.select("location.*").head()
1926+
1927+
val obj = head.getStruct(0)
1928+
assertThat(obj.getString(0), is("point"))
1929+
val array = obj.getSeq[Double](1)
1930+
assertThat(array(0), is(100.0d))
1931+
assertThat(array(1), is(0.0d))
1932+
}
1933+
18841934
@Test
18851935
def testGeoShapeLine() {
18861936
val mapping = wrapMapping("data", s"""{
@@ -1905,7 +1955,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
19051955

19061956
sc.makeRDD(Seq(line)).saveJsonToEs(target)
19071957
val df = sqc.read.format("es").load(index)
1908-
1958+
19091959
val dataType = df.schema("location").dataType
19101960
assertEquals("struct", dataType.typeName)
19111961

spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala

+50
Original file line numberDiff line numberDiff line change
@@ -1881,6 +1881,56 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
18811881
assertThat(array(1), is(0.0d))
18821882
}
18831883

1884+
@Test
1885+
def testGeoShapePointDeep() {
1886+
val mapping = wrapMapping("data", s"""{
1887+
| "properties": {
1888+
| "name": {
1889+
| "type": "$keyword"
1890+
| },
1891+
| "location": {
1892+
| "properties": {
1893+
| "deep": {
1894+
| "type": "geo_shape"
1895+
| }
1896+
| }
1897+
| }
1898+
| }
1899+
| }
1900+
""".stripMargin)
1901+
1902+
val index = wrapIndex("sparksql-test-geoshape-point-geoshape-deep")
1903+
val typed = "data"
1904+
val (target, _) = makeTargets(index, typed)
1905+
RestUtils.touch(index)
1906+
RestUtils.putMapping(index, typed, mapping.getBytes(StringUtils.UTF_8))
1907+
1908+
val point = """{"name":"point", "location": { "deep":{ "type" : "point", "coordinates": [100.0, 0.0] } }}""".stripMargin
1909+
1910+
sc.makeRDD(Seq(point)).saveJsonToEs(target)
1911+
val df = sqc.read.format("es").load(index)
1912+
1913+
println(df.schema.treeString)
1914+
1915+
val dataType = df.schema("location").dataType.asInstanceOf[StructType]("deep").dataType
1916+
assertEquals("struct", dataType.typeName)
1917+
1918+
val struct = dataType.asInstanceOf[StructType]
1919+
assertTrue(struct.fieldNames.contains("type"))
1920+
var coords = struct("coordinates").dataType
1921+
assertEquals("array", coords.typeName)
1922+
coords = coords.asInstanceOf[ArrayType].elementType
1923+
assertEquals("double", coords.typeName)
1924+
1925+
val head = df.select("location.*").head()
1926+
1927+
val obj = head.getStruct(0)
1928+
assertThat(obj.getString(0), is("point"))
1929+
val array = obj.getSeq[Double](1)
1930+
assertThat(array(0), is(100.0d))
1931+
assertThat(array(1), is(0.0d))
1932+
}
1933+
18841934
@Test
18851935
def testGeoShapeLine() {
18861936
val mapping = wrapMapping("data", s"""{

0 commit comments

Comments
 (0)