当前位置: 首页 > >

从指定端口获取数据到spark进行统计

1. 环境

spark2.2.0
scala2.11.0
centos7
IntelliJ IDEA 2019.3.2

2. 代码

package scala.spark

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkWordCount {
def main(args: Array[String]): Unit = {

if (args.length != 2){
System.err.println("Usage:SparkWordCount")
System.exit(1)
}

/*创建SparkConf对象,在这我指定master为local[2],
本地模式方便测试,另外需要注意,本地模式下local的必须大于等于2,否则就无法正确运行
因为接收数据和处理数据需要两个线程。*/
val sparkWordCount = new SparkConf().setMaster("local[2]").setAppName("SparkWordCount")
//批处理间隔,每10s,创建Streaming

val sc = new StreamingContext(sparkWordCount,Seconds(5))

//构建数据源为socket,第一个参数为IP,第二个参数为端口号
val lines = sc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_AND_DISK)

//单词计数
val words = lines.flatMap(_.split(","))
val wordCounts = words.map(x => (x,1)).reduceByKey(_+_).saveAsTextFiles("hdfs://master:9000/sparkWordCount/data")

//output操作,streaming中必须至少有一个output 操作
// wordCounts.print()

sc.start()//开启线程
sc.awaitTermination()//等待程序结束
}

}

设置传入的参数


3. 安装nc(netcat)

Spark Streaming编写wordCount程序时,在Linux集群中需要安装nc,来对程序中使用到的端口进行开放。


yum install -y nc

4. 监听端口,并发送数据


5. 运行程序并查看hdfs上是否有数据



友情链接: year2525网 工作范文网 QS-ISP 138资料网 528200 工作范文网 baothai 表格模版