import org.apache.spark.SparkConfimport org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}object DStream_Kafa_Consumer { def main(args: Array[String]): Unit = { val sc=new SparkConf().setAppName("kafaCountWord").setMaster("local[2]") val ssc=new StreamingContext(sc,Seconds(10)) ssc.checkpoint("file:///usr/local2/spark/mycode/kafa/checkpoint") val zookeeperServer="localhost:2181"//zookeeper服务器地址(默认的) val group="soyo_kafa_group" val topics="wordsender" val num=1 //每个topic的分区数 val topicMap=topics.split(",").map((_,num.toInt)).toMap val lines=KafkaUtils.createStream(ssc,zookeeperServer,group,topicMap).map(_._2) val words=lines.flatMap(_.split(" ")) val wordCounts=words.map(x=>(x,1)).reduceByKeyAndWindow(_+_,_-_,Minutes(2),Seconds(10),2) // val wordCounts_2=words.map(x=>(x,1)).reduceByKey(_+_) wordCounts.print(200) // wordCounts_2.print(200) ssc.start() ssc.awaitTermination }}
结果:需要Kafka-Producer程序输入数据
-------------------------------------------
Time: 1508230980000 ms-------------------------------------------(4,61)(8,69)(6,66)(0,70)(2,61)(7,69)(5,61)(9,74)(3,62)(1,72)-------------------------------------------Time: 1508230990000 ms-------------------------------------------(4,61)(8,69)(6,66)(0,70)(2,61)(7,69)(5,61)(9,74)(3,62)(1,72)-------------------------------------------