SparkStreaming简单例子(oldAPI)
2021-02-07 07:14
标签:splay 引入 object nec etl ldap 外部程序 模式 计算 ◆ Spark Streaming 程序最好以使用Maven或者sbt编译出来的独立应用的形式运行。 ◆ 准备工作: //创建一个本地StreamingContext两个工作线程和批间隔1秒。 //创建一个连接到主机名的DStream,像localhost:9999 val lines = ssc.socketTextStream("localhost", 9999) //将每一行接收到的数据通过空格分割成单词 val words = lines.flatMap(_.split(" “)) // 对每一批次的单词进行转化求和 val pairs = words.map(word => (word, 1)) ssc.start// 开始计算 ssc.awaitTermination() // 等待计算终止 ssc.stop() //结束应用 ◆ 1.启动 Spark Streaming 之前所作的所有步骤只是创建了执行流程, 程序没有真正 SparkStreaming简单例子(oldAPI) 标签:splay 引入 object nec etl ldap 外部程序 模式 计算 原文地址:https://www.cnblogs.com/Diyo/p/11392059.htmlSparkStreaming简单例子
◆ 构建第一个Streaming程序: (wordCount)
1.引入Spark Streaming的jar
2.scala流计算import声明
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.Seconds1.初始化StreamingContext对象
val conf = new SparkConf()
conf.setMaster(“local[2]")
conf.setAppName(“ NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))2.获取DStream对象
3.操作DStream对象
//导入StreamingContext中的隐式转换
import org.apache.spark.streaming.StreamingContext._
val wordCounts = pairs.reduceByKey(_ + _)
// 每个批次中默认打印前十个元素到控制台
wordCounts.print()
4.启动流处理程序
启动网络端口,模拟发送数据
1.借助于nc命令,手动输入数据
Linux/Mac :nc
Windows:cat
nc -lk 9999
2.借助于代码,编写一个模拟数据发生器
package com.briup.streaming
import java.io.PrintWriter
import java.net.ServerSocket
import scala.io.Source
object MassageServer {
// 定义随机获取整数的方法
def index(length: Int) = {
import java.util.Random
val rdm = new Random
rdm.nextInt(length)
}
def main(args: Array[String]) {
println("模拟数据器启动!!!")
// 获取指定文件总的行数
val filename ="Spark/ihaveadream.txt";
val lines = Source.fromFile(filename).getLines.toList
val filerow = lines.length
// 指定监听某端口,当外部程序请求时建立连接
val serversocket = new ServerSocket(9999);
while (true) {
//监听9999端口,获取socket对象
val socket = serversocket.accept()
// println(socket)
new Thread() {
override def run = {
println("Got client connected from: " + socket.getInetAddress)
val out = new PrintWriter(socket.getOutputStream(), true)
while (true) {
Thread.sleep(1000)
// 当该端口接受请求时,随机获取某行数据发送给对方
val content = lines(index(filerow))
println (content)
out.write(content + ‘\n‘)
out.flush()
}
socket.close()
}
}.start()
}
}
}
注意事项:
连接上数据源,也没有对数据进行任何操作,只是设定好了所有的执行计划
◆ 2.当 ssc.start()启动后程序才真正进行所有预期的操作
◆ 3.执行会在另一个线程中进行,所以需要调用awaitTermination来等待流计算完成
◆ 4.一个Streaming context只能启动一次
◆ 5.如果模式是本地模式,那么请务必设置local[n] ,n>=2 1个用于接收,1个用于处理package com.briup.streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Duration, StreamingContext}
object MyTestOldAPI {
def main(args: Array[String]): Unit = {
//设置日志级别
Logger.getLogger("org").setLevel(Level.WARN)
//1 获取DS
val conf = new SparkConf().setAppName("MyTestOldAPI").setMaster("local[*]")
val dss = new StreamingContext(conf, Duration(1000))
val ds = dss.socketTextStream("localhost", 9999)
//2 逻辑处理 //统计
val res = ds.filter(_ != "").flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
res.print()
//3 开启实时处理任务
dss.start()
dss.awaitTermination()
dss.stop()
}
}