当前位置:
文档之家› Storm分布式实时计算在物联网系统中的应用
Storm分布式实时计算在物联网系统中的应用
事务处理过程:
程序部署:
1、生成JAR包 cd /opt/storm/jar/iot-flow-stat/source jar cfm ../iot-flow-stat.jar META-INF/MANIFEST.MF *
2、 topology发布 args[0] : topology名称 args[1] : NumWorkers args[2] : 1 表示最新,0表示最旧。 /opt/storm/apache-storm-1.0.3/bin/storm jar /opt/storm/jar/iot-flow-stat/iot-flow-
实时:数据不写磁盘,延迟低(毫秒级) 流式:不断有数据流入、处理、流出 开源:twitter开源,社区很活跃
1、简单的编程模型。类似于MapReduce降低了并行批处理复杂性, Storm降低了进行实时处理的复杂性。 2、可以使用各种编程语言。你可以在Storm之上使用各种编程语言。默 认支持Clojure、Java、Ruby和Python。 3、水平扩展。计算是在多个线程、进程和服务器之间并行进行的。
数据之间有关系(聚合等):如日志pv/uv统计、访问热点统计
N行日志
Client
N行日志
N行日志
ip
MQ
Spout
Bolt1
pv/uv
Bolt2
Storage
received
Ø 16年物联网卡流量数据通过日增量文件方式每天提供一次,流量数据当时使用 hadoop+spark(独立部署的集群,不是基于YARN)进行入库。内存消耗大、服务器 间数据交互不稳定,导致每天需要几个小时才能把全省流量数据清洗处理完入库, 流量数据整体延时在36小时到48小时左右。 Ø 17年为了缩短批量数据的时延,物联网平台与BOSS数据接口实现重大优化, BOSS提供海量物联网数据的实时订购关系、用户状态变化、流量数据(误差1M) 的实时推送。目前,归属广东移动的物联网卡活跃号码每天产生的流量话单数据大 概是48G左右。
– Bolt:接收Spout/Bolt输出的Tuple,处理,输出新Tuple
– Grouping:Tuple从上游到某个下游多个并发task的分组方式 shuffleGrouping:随机发给某个下游task fieldsGrouping:按照某几个字段做hash取模,发给对应task allGrouping:发给下游全部task
Ø 采用Storm的主要原因:
1. 常驻运行 2. 数据在内存中不写磁盘 3. 扩展简单:加机器,提高并发,重新提交 4. 自动容错:进程、机器、网络异常,消息可重发 5. 流式处理:数据来一批处理一批
Ø 搭建Zookeeper集群 Ø 安装Storm依赖库(JDK,Python) Ø 下载并解压Storm发布版本 Ø 修改storm.yaml配置文件 Ø 启动Storm各个后台进程
declareOutputFields 申明要发射的字段
请求应答(同步) – DRPC 实时图片处理 实时网页分析
Client
图片X 图片Y
DRPC Server
图片X
Spout
图片X Bolt
图片Y
Return
流式处理(异步)
图片Y
– 逐条处理
数据之间无关系:如实时日志格式标准化入库
– 分析统计
的任务
Supervisor Supervisor
Worker Worker Worker
具体的处理 逻辑组件
接受Nimbus分配的任务,管 理自己的Worker进程
DAG计算模型 – Tuple:数据处理单元,一个Tuple由多个字段组成 – Stream:持续的Tuple流
– Spout:从外部获取数据,输出原始Tuple
Ø Storm简介 Ø Storm的主要特点 Ø Storm组件 Ø Storm编程模型 Ø Storm典型应用场景 Ø Storm在物联网系统中的应用背景 Ø Storm安装 Ø Storm实例讲解
Storm是一个分布式实时流式计算平台
分布式:
水平扩展:通过加机器、提高并发数就提高处理能力 自动容错:自动处理进程、机器、网络异常
Spout
可实现接口Ispout,或继承BaseSpout 主要方法:
open方法是初始化动作。允许你在该spout初始化时做一些动作,传入了上下文,方便取上下文的一些数据。 close方法在该spout关闭前执行,但是并不能得到保证其一定被执行。spout是作为task运行在 worker内,在 cluster模式下,supervisor会直接kill -9 woker的进程,这样它就无法执行了。而在本地模式下,只要不是kill -9, 如果是发送停止命令,是可以保证close的执行的。 activate和deactivate :一个spout可以被暂时激活和关闭,这两个方法分别在对应的时刻被调用。 nextTuple 用来发射数据。 ack(Object) 传入的Object其实是一个id,唯一表示一个tuple。该方法是这个id所对应的tuple被成功处理后执行。 fail(Object) 同ack,只不过是tuple处理失败时执行。 declareOutputFields 申明要发射的字段
Nimbus:bin/storm nimbus Supervisor:bin/storm supervisor UI:stoቤተ መጻሕፍቲ ባይዱm ui
物联云实时流量计算 1、文件采集,300s/10000条话单记录一个流量文件,平台按时间间隔对文件记录进行合并处理 2、从kafka中流量话单信息(数据格式:地市(GZ、ZH...)|手机号码|主体产品ID|当前账期|通用总使 用流量(字节)|专用使用总流量(字节)|最新使用时间(YYYYMMDD HH24MISS|当前话单总流 量(字节)|套餐产品ID1,套内使用流量(字节),套内剩余流量总量(字节);套餐产品ID2,套内使用流 量(字节),套内剩余流量总量(字节);...; |当前话单总流量对应APN|) 3、关联套餐数据、订购关系数据流量计算 4、保存或者更新流量数据,写入redis
stat.jar com.hvgroup.storm.flow.stat.app.App flowStat 42 1
不稳定因素: 1、FTP异常(包括服务异常、网络异常),如何重跑数据 2、网络问题(存在数据丢失)
性能瓶颈: 1、处理时,每个号码需要读取套餐订购关系,需要读取套餐平台相应套餐信息 2、结合蓝色两部分信息,按套餐的优先级和已使用量,确定此话单流量用于哪个(或者多个)套餐,并 扣除相关套餐流量 3、使用Trident方式(Exactly Once,也就是所谓的事务机制)批量处理话单信息----- 整个运算瓶颈在这 里
下面这个图描述了以上几个角色之间的关系。
主控节点,用于提 交任务,分配集群 任务,监控集群状
态
Nimbus
zookeeper zookeeper
Supervisor Supervisor Supervisor Supervisor
zookeeper
协调,存放集群的公共数据 (心跳,集群状态,配置信 息),Nimbus分配给Supervisor
Stream Grouping
Stream Grouping定义了一个流在Bolt任务间该如何被切分。
Ø 随机分组(Shuffle grouping):随机分发tuple到Bolt的任务,保证每个任务获得相等数量的tuple。 Ø 字段分组(Fields grouping):根据指定字段分割数据流,并分组。例如,根据“user-id”字段,相同 “user-id”的元组总是分发到同一个任务,不同“user-id”的元组可能分发到不同的任务。 Ø 全部分组(All grouping):tuple被复制到bolt的所有任务。这种类型需要谨慎使用。 Ø 全局分组(Global grouping):全部流都分配到bolt的同一个任务。明确地说,是分配给ID最小的那个 task。 Ø 无分组(None grouping):你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终, Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(如果可能)。 Ø 直接分组(Direct grouping):这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务 接收。 Ø 当然还可以实现CustomStreamGroupimg接口来定制自己需要的分组。
4、可靠的消息处理。Storm保证每个消息至少能得到一次完整处理。任 务失败时,它会负责从消息源重试消息。 5、本地模式。Storm有一个“本地模式”,可以在处理过程中完全模拟 Storm集群。这让你可以快速进行开发和单元测试。
首先我们通过一个 storm 和hadoop的对比来了解storm中的基本概念。
– Topology:在storm中,应用程序的实现实时处理的逻辑被封装在Topology中 一个Topology是由Spout组件(数据源)和Bolt组件(数据操作)通过Stream Groupings进行连接的
Spout
Ø 简而言之,Spout从来源处读取数据并放入topology。 Ø Spout分成可靠和不可靠两种;当Storm接收失败时,可靠的Spout会对 tuple(元组,数据 项组成的列表)进行重发;而不可靠的Spout不会考虑接收成功与否只发射一次。而Spout中 最主要的方法就是 nextTuple(),该方法会发射一个新的tuple到topology,如果没有新 tuple发射则会简单的返回一个Topology是由Spout组件(数据源)和Bolt组件(数据操作)通过 Stream
系统角色
应用名称 组件接口 运行状态
hadoop
storm
Jobtracker
Nimbus
Tasktracker