Skip to content

Commit 7ca8971

Browse files
committed
add: image process
1 parent 5c3ca9c commit 7ca8971

File tree

6 files changed

+47
-10
lines changed

6 files changed

+47
-10
lines changed

spark-best-practice/spark-image-process/pom.xml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@
3131
</dependency>
3232

3333
<dependency>
34-
<groupId>org.apache.spark</groupId>
35-
<artifactId>spark-sql_2.12</artifactId>
36-
<version>3.1.2</version>
34+
<groupId>com.twelvemonkeys.imageio</groupId>
35+
<artifactId>imageio-core</artifactId>
36+
<version>3.8.2</version>
3737
</dependency>
3838

3939
<dependency>
40-
<groupId>org.apache.spark</groupId>
41-
<artifactId>spark-core_2.12</artifactId>
42-
<version>3.1.2</version>
40+
<groupId>commons-io</groupId>
41+
<artifactId>commons-io</artifactId>
42+
<version>2.11.0</version>
4343
</dependency>
4444
</dependencies>
4545

Loading

spark-best-practice/spark-image-process/src/main/scala/com/wxmimperio/spark/SimpleDemo.scala

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,24 @@
11
package com.wxmimperio.spark
22

3+
import java.awt.image.{BufferedImage, WritableRaster}
4+
import java.io.{ByteArrayInputStream, File}
5+
6+
import javax.imageio.ImageIO
7+
import org.apache.commons.io.FilenameUtils
38
import org.apache.spark.SparkConf
49
import org.apache.spark.sql.SparkSession
510

611
object SimpleDemo {
712

813
def main(args: Array[String]): Unit = {
9-
val conf = new SparkConf().setMaster("local[*]")
14+
val conf = new SparkConf().setMaster("local")
1015
val spark = SparkSession.builder()
1116
.config(conf)
1217
.getOrCreate()
1318

1419
val imageDF = spark.read
1520
.format("image")
16-
.load("E:\\coding\\github\\hadoop-code-snippets\\spark-best-practice\\spark-image-process\\src\\main\\resources\\gorilla_PNG18712.png")
21+
.load("E:\\coding\\github\\hadoop-code-snippets\\spark-best-practice\\spark-image-process\\src\\main\\resources\\pngs\\")
1722

1823
imageDF.printSchema()
1924

@@ -26,10 +31,42 @@ object SimpleDemo {
2631
"image.data"
2732
)
2833

29-
row.foreach(row => {
34+
/*row.foreach(row => {
3035
val data = row.getAs[Array[Byte]]("data")
3136
println(data)
37+
})*/
38+
39+
row.repartition(1).rdd.map(row => {
40+
val origin = row.getAs[String]("origin")
41+
println(origin)
42+
val data = row.getAs[Array[Byte]]("data")
43+
val width = row.getAs[Int]("width")
44+
val height = row.getAs[Int]("height")
45+
46+
47+
val image = new BufferedImage(width, height, BufferedImage.TYPE_INT_BGR)
48+
val raster = image.getData.asInstanceOf[WritableRaster]
49+
val pixels = data.map(_.toDouble)
50+
raster.setPixels(0, 0, width, height, pixels)
51+
image.setData(raster)
52+
53+
for (i <- image.getMinX until image.getWidth()) {
54+
for (j <- image.getMinY until image.getHeight()) {
55+
println(image.getRGB(i, j))
56+
}
57+
}
58+
59+
val buffImg = new BufferedImage(width, height, BufferedImage.TYPE_3BYTE_BGR)
60+
val g = buffImg.createGraphics
61+
g.drawImage(image, 0, 0, null)
62+
g.dispose()
63+
64+
(origin, buffImg)
65+
}).coalesce(1).foreach(buffer => {
66+
val output = "E:\\coding\\github\\hadoop-code-snippets\\spark-best-practice\\spark-image-process\\src\\main\\resources\\result\\" + FilenameUtils.getName(buffer._1)
67+
ImageIO.write(buffer._2, "png", new File(output))
3268
})
3369

70+
Thread.sleep(100000)
3471
}
35-
}
72+
}

0 commit comments

Comments
 (0)