本书深入浅出地介绍了大数据处理技术中的核心工具Hadoop和Spark,并详细讲解了它们在实际问题解决过程中的算法应用,旨在帮助读者掌握高效的大数据处理方法。
数据算法--HadoopSpark大数据处理技巧
在data algorithms部分主题自写的scala程序实现SecondarySort (chapter 1)的示例数据如下:
```
2015,1,1,10
2015,1,2,11
2015,1,3,12
...
```
对应的Scala代码为:
```scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.{Partitioner, SparkConf}
class SecondarySortPartitioner(val v: Int) extends Partitioner {
override def numPartitions: Int = {
v
}
override def getPartition(key: Any): Int = key match {
case (k: String, v: Int) => math.abs(k.hashCode % numPartitions)
case null => 0
case _ => math.abs(key.hashCode % numPartitions)
}
}
object SecondarySort {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster(local).setAppName(SecondarySort)
val context = SparkSession.builder().config(conf).getOrCreate().sparkContext
val rdd = context.textFile(/path/to/test.txt) //路径需要根据实际情况调整
val step1 = rdd.map(line => line.split(,))
.map(line => ((line(0) + - + line(1), line(3).toInt), line(3).toInt))
val step2 = step1.repartitionAndSortWithinPartitions(new SecondarySortPartitioner(4))
.map { case (k, v: Int) => (k._1, v.toString)}
.reduceByKey((x, y) => x + , + y)
step2.foreach(println)
}
}
```
在CommonFriends(chapter 8)的示例数据如下:
```plaintext
100,200 300 400 500 600
200,100 300 400
...
```
对应的Scala代码为:
```scala
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.sql.SparkSession
object CommonFriends {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster(local).setAppName(CommonFriends)
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val context = SparkSession.builder().config(conf).getOrCreate().sparkContext
val rdd = context.textFile(/path/to/friends.txt) //路径需要根据实际情况调整
}
}
```