当前位置:
文档之家› 基于案例讲解Storm实时流计算
基于案例讲解Storm实时流计算
Storm的应用场景举例
Storm简介 Storm的主要特点 Storm组件 Storm编程模型 Storm安装 Storm实例讲解
电商实时推荐 淘宝双十一实时销售额统计
车辆7*24小时监控
电信行业重大节假日实时保障监控
1. Storm是一个分布式的、容错的实时计算系统,它采用Clojure编写的 2. Storm可被用于“流处理”之中,实时处理消息并更新数据库 3. Storm可以进行连续查询并把结果即时反馈给客户,比如将Twitter上 的热门话题发送到客户端 4. Storm可以用来并行处理密集查询,Storm的拓扑结构是一个等待调 用信息的分布函数,当它收到一条调用信息后,会对查询进行计算,并 返回查询结果。
1.简单的编程模型。类似于MapReduce降低了并行批处理复杂性,Storm 降低了进行实时处理的复杂性。 2.可以使用各种编程语言。你可以在Storm之上使用各种编程语言。默认支 持Clojure、Java、Ruby和Python。
3.水平扩展。计算是在多个线程、进程和服务器之间并行进行的。
4.可靠的消息处理。Storm保证每个消息至少能得到一次完整处理。任务失 败时,它会负责从消息源重试消息。 5.快速。系统的设计保证了消息能得到快速的处理,使用ZeroMQ作为其底 层消息队列。 6.本地模式。Storm有一个“本地模式”,可以在处理过程中完全模拟 Storm集群。这让你可以快速进行开发和单元测试。
Bolt
可实现接口IBolt,或BaseRichbolt
主要方法:
IBolt继承了java.io.Serializable,我们在nimbus上提交了topology以后,创建出来的bolt会序列化后发送到具体 执行的worker上去。worker在执行该Bolt时,会先调用prepare方法传入当前执行的上下文execute接受一个tuple 进行处理,并用prepare方法传入的OutputCollector的ack方法(表示成功)或fail(表示失败)来反馈处理结果 cleanup 同ISpout的close方法,在关闭前调用。同样不保证其一定执行 execute 最重要的方法,用来处理自己的业务逻辑
<=120 实时监控超速车辆 在高速上,速度 是否>120
>120
存入超 速数据 库
Spout 数据源
是否在高速 上 >80 不在高速上,速 度是否>80 <=80
Spout
简而言之,Spout从来源处读取数据并放入topology。 Spout分成可靠和不可靠两种;当Storm接收失败时,可靠的Spout会对 tuple(元组,数据 项组成的列表)进行重发;而不可靠的Spout不会考虑接收成功与否只发射一次。而Spout中 最主要的方法就是 nextTuple(),该方法会发射一个新的tuple到topology,如果没有新 tuple发射则会简单的返回一个Topology是由Spout组件(数据源)和Bolt组件(数据操作)通过 Stream
mq
Spout从mq中 接受订单信息
MergeBolt汇 总,写入 Mysql
前台界面 展示,每 30秒查一 次数据库
仿淘宝双十一实时监控销售额
1.从activemq中接受订单信息(数据格式:用户id|时间|金额|商品id|商家id)
仿淘宝双十一实时监控销售额
2.计算订单金额,一分钟输出一次
仿淘宝双十一实时监控销售额 3.合并计算结果,并写入mysql
仿淘宝双十一实时监控销售额 本地测试与部署
集群部署 storm jar storm_taobao.jar com.wxj.taobao.TaoBaoTopology taobao_double11_bill_topology
Q& Grouping
Stream Grouping定义了一个流在Bolt任务间该如何被切分。 随机分组(Shuffle grouping):随机分发tuple到Bolt的任务,保证每个任务获得相等数量的tuple。 字段分组(Fields grouping):根据指定字段分割数据流,并分组。例如,根据“user-id”字段,相同 “user-id”的元组总是分发到同一个任务,不同“user-id”的元组可能分发到不同的任务。 全部分组(All grouping):tuple被复制到bolt的所有任务。这种类型需要谨慎使用。 全局分组(Global grouping):全部流都分配到bolt的同一个任务。明确地说,是分配给ID最小的那个 task。 无分组(None grouping):你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终, Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(如果可能)。 直接分组(Direct grouping):这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务 接收。 当然还可以实现CustomStreamGroupimg接口来定制自己需要的分组。
数据处理完就结束 一旦运行,根本停 不下来。。。
Nimbus:负责资源分配和任务调度。 Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程。 Worker:运行具体处理组件逻辑的进程。 Task:worker中每一个spout/bolt的线程称为一个task. 在storm0.8之后,task不再与物理线 程对应,同一个spout/bolt的task可能会共享一个物理线程,该线程称为executor。
首先我们通过一个 storm 和hadoop的对比来了解storm中的基本概念。
hadoop Jobtracker 系统角色 Tasktracker Child 应用名称 组件接口 运行状态 Job M/R storm Nimbus Supervisor Worker Topology Spout/Bolt
Bolt
Topology中所有的处理都由Bolt完成。
Bolt可以完成任何事,比如:连接的过滤、聚合、访问文件/数据库、等等。 Bolt从Spout中接收数据并进行处理,如果遇到复杂流的处理也可能将tuple发送给另一个 Bolt进行处理。而Bolt中最重要的方法是execute(),以新的tuple作为参数接收。不管是 Spout还是Bolt,如果将tuple发射成多个流,这些流都可以通过declareStream()来声明。
Spout
可实现接口Ispout,或继承BaseSpout
主要方法:
open方法是初始化动作。允许你在该spout初始化时做一些动作,传入了上下文,方便取上下文的一些数据。 close方法在该spout关闭前执行,但是并不能得到保证其一定被执行。spout是作为task运行在 worker内,在 cluster模式下,supervisor会直接kill -9 woker的进程,这样它就无法执行了。而在本地模式下,只要不是kill -9, 如果是发送停止命令,是可以保证close的执行的。 activate和deactivate :一个spout可以被暂时激活和关闭,这两个方法分别在对应的时刻被调用。 nextTuple 用来发射数据。 ack(Object) 传入的Object其实是一个id,唯一表示一个tuple。该方法是这个id所对应的tuple被成功处理后执行。 fail(Object) 同ack,只不过是tuple处理失败时执行。 declareOutputFields 申明要发射的字段
下面这个图描述了以上几个角色之间的关系。
Supervisor zookeeper
主控节点,用于提 交任务,分配集群 任务,监控集群状 态
Supervisor Worker
Supervisor
zookeeper Supervisor Supervisor Supervisor
协调,存放集群的公共数据 (心跳,集群状态,配置信 息),Nimbus分配给Supervisor 的任务
仿淘宝双十一实时监控销售额
1.从activemq中接受订单信息(数据格式:用户id|时间|金额|商品id|商家id) 2.计算订单金额,一分钟输出一次
3.合并计算结果,并写入mysql CountBolt计算 金额
CountBolt计算 金额 CountBolt计算 金额 CountBolt计算 金额
Worker Worker
具体的处理 逻辑组件
Nimbus
zookeeper
接受Nimbus分配的任务,管 理自己的Worker进程
Topology
在storm中,应用程序的实现实时处理的逻辑被封装在Topology中
一个Topology是由Spout组件(数据源)和Bolt组件(数据操作)通过Stream Groupings进行连接的
declareOutputFields 申明要发射的字段
搭建Zookeeper集群 安装Storm依赖库(ZeromQ,JZMQ,JDK,Python)
下载并解压Storm发布版本
修改storm.yaml配置文件 启动Storm各个后台进程
Nimbus:bin/storm nimbus Supervisor:bin/storm supervisor UI:storm ui