Storm简介Storm简介•实时计算需要解决一些什么问题•实现一个实时计算系统•Storm基本概念•Storm使用场景•Storm分组机制Storm简介•实时计算需要解决一些什么问题伴随着信息科技日新月异的发展,信息呈现出爆发式的膨胀,人们获取信息的途径也更加多样、更加便捷,同时对于信息的时效性要求也越来越高。
举个搜索场景中的例子,当一个卖家发布了一条宝贝信息时,他希望的当然是这个宝贝马上就可以被卖家搜索出来、点击、购买啦,相反,如果这个宝贝要等到第二天或者更久才可以被搜出来,估计这个大哥就要骂娘了。
再举一个推荐的例子,如果用户昨天在淘宝上买了一双袜子,今天想买一副泳镜去游泳,但是却发现系统在不遗余力地给他推荐袜子、鞋子,根本对他今天寻找泳镜的行为视而不见,估计这哥们心里就会想推荐你妹呀。
其实稍微了解点背景知识的码农们都知道,这是因为后台系统做的是每天一次的全量处理,而且大多是在夜深人静之时做的,那么你今天白天做的事情当然要明天才能反映出来啦。
Storm简介•实现一个实时计算系统全量数据处理使用的大多是鼎鼎大名的hadoop或者hive,作为一个批处理系统,hadoop 以其吞吐量大、自动容错等优点,在海量数据处理上得到了广泛的使用。
但是,hadoop不擅长实时计算,因为它天然就是为批处理而生的,这也是业界一致的共识。
否则最近这两年也不会有s4,storm,puma这些实时计算系统如雨后春笋般冒出来啦。
先抛开s4,storm,puma这些系统不谈,我们首先来看一下,如果让我们自己设计一个实时计算系统,我们要解决哪些问题。
Storm简介•实现一个实时计算系统低延迟。
都说了是实时计算系统了,延迟是一定要低的。
高性能。
性能不高就是浪费机器,浪费机器是要受批评的哦。
分布式。
系统都是为应用场景而生的,如果你的应用场景、你的数据和计算单机就能搞定,那么不用考虑这些复杂的问题了。
我们所说的是单机搞不定的情况。
可扩展。
伴随着业务的发展,我们的数据量、计算量可能会越来越大,所以希望这个系统是可扩展的。
容错。
这是分布式系统中通用问题。
一个节点挂了不能影响我的应用。
Storm简介•实现一个实时计算系统好,如果仅仅需要解决这5个问题,可能会有无数种方案,而且各有千秋,随便举一种方案,使用消息队列+分布在各个机器上的工作进程就ok啦。
我们再继续往下看。
✓容易在上面开发应用程序。
亲,你设计的系统需要应用程序开发人员考虑各个处理组件的分布、消息的传递吗?如果是,那有点麻烦啊,开发人员可能会用不好,也不会想去用。
✓消息不丢失。
用户发布的一个宝贝消息不能在实时处理的时候给丢了,对吧?更严格一点,如果是一个精确数据统计的应用,那么它处理的消息要不多不少才行。
这个要求有点高哦。
✓消息严格有序。
有些消息之间是有强相关性的,比如同一个宝贝的更新和删除操作消息,如果处理时搞乱顺序完全是不一样的效果了。
Storm简介•Storm基本概念对比Hadoop的批处理,Storm是个实时的、分布式以及具备高容错的计算系统。
同Hadoop一样Storm也可以处理大批量的数据,然而 Storm在保证高可靠性的前提下还可以让处理进行的更加实时;也就是说,所有的信息都会被处理。
Storm同样还具备容错和分布计算这些特性,这就让Storm可以扩展到不同的机器上进行大批量的数据处理。
他同样还有以下的这些特性:Storm简介•Storm优势✓1. 简单的编程模型。
类似于MapReduce降低了并行批处理复杂性,Storm降低了进行实时处理的复杂性。
✓2. 服务化,一个服务框架,支持热部署,即时上线或下线App.✓3. 可以使用各种编程语言。
你可以在Storm之上使用各种编程语言。
默认支持Clojure、Java、Ruby和Python。
要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。
✓4. 容错性。
Storm会管理工作进程和节点的故障。
✓5. 水平扩展。
计算是在多个线程、进程和服务器之间并行进行的。
Storm简介•Storm基本概念✓6. 可靠的消息处理。
Storm保证每个消息至少能得到一次完整处理。
任务失败时,它会负责从消息源重试消息。
✓7. 快速。
系统的设计保证了消息能得到快速的处理,使用ZeroMQ作为其底层消息队列。
✓8. 本地模式。
Storm有一个“本地模式”,可以在处理过程中完全模拟Storm集群。
这让你可以快速进行开发和单元测试。
Storm简介•Storm存在的问题✓1. 目前的开源版本中只是单节点Nimbus,挂掉只能自动重启,可以考虑实现一个双nimbus的布局。
✓2. Clojure是一个在JVM平台运行的动态函数式编程语言,优势在于流程计算, Storm的部分核心内容由Clojure编写,虽然性能上提高不少但同时也提升了维护成本。
Storm简介•Storm的架构•Storm集群由一个主节点和多个工作节点组成。
主节点运行了一个名为“Nimbus”的守护进程,用于分配代码、布置任务及故障检测。
每个工作节点都运行了一个名为“Supervisor”的守护进程,用于监听工作,开始并终止工作进程。
Nimbus和Supervisor都能快速失败,而且是无状态的,这样一来它们就变得十分健壮,两者的协调工作是由Zookeeper来完成的。
ZooKeeper用于管理集群中的不同组件,ZeroMQ是内部消息系统,JZMQ是ZeroMQMQ 的Java Binding。
有个名为storm-deploy的子项目,可以在AWS上一键部署Storm集群.Storm简介•Storm基本概念首先我们通过一个 storm 和hadoop的对比来了解storm中的基本概念。
Storm简介•Storm基本概念✓Nimbus:负责资源分配和任务调度。
✓Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程。
✓Worker:运行具体处理组件逻辑的进程。
✓Task:worker中每一个spout/bolt的线程称为一个task. 在storm0.8之后,task不再与物理线程对应,同一个spout/bolt的task可能会共享一个物理线程,该线程称为executor。
Storm简介•Storm基本概念Storm简介•Storm基本概念✓Topology:storm中运行的一个实时应用程序,因为各个组件间的消息流动形成逻辑上的一个拓扑结构。
✓Spout:在一个topology中产生源数据流的组件。
通常情况下spout会从外部数据源中读取数据,然后转换为topology内部的源数据。
Spout是一个主动的角色,其接口中有个nextTuple()函数,storm框架会不停地调用此函数,用户只要在其中生成源数据即可。
✓Bolt:在一个topology中接受数据然后执行处理的组件。
Bolt可以执行过滤、函数操作、合并、写数据库等任何操作。
Bolt是一个被动的角色,其接口中有个execute(Tuple input)函数,在接受到消息后会调用此函数,用户可以在其中执行自己想要的操作。
Storm简介•Storm基本概念✓Tuple:一次消息传递的基本单元。
本来应该是一个key-value的map,但是由于各个组件间传递的tuple的字段名称已经事先定义好,所以tuple中只要按序填入各个value就行了,所以就是一个value list.✓Stream:源源不断传递的tuple就组成了stream。
Storm简介•Storm使用场景1.流聚合流聚合把两个或者多个数据流聚合成一个数据流 — 基于一些共同的tuple字段。
builder.setBolt(5, new MyJoiner(), parallelism).fieldsGrouping(1, new Fields("joinfield1", "joinfield2")) .fieldsGrouping(2, new Fields("joinfield1", "joinfield2")) .fieldsGrouping(3, new Fields("joinfield1", "joinfield2"))Storm简介•Storm使用场景✓2.批处理有时候为了性能或者一些别的原因,你可能想把一组tuple一起处理,而不是一个个单独处理。
✓3.BasicBolt1. 读一个输入tuple2. 根据这个输入tuple发射一个或者多个tuple3. 在execute的方法的最后ack那个输入tuple遵循这类模式的bolt一般是函数或者是过滤器, 这种模式太常见,storm为这类模式单独封装了一个接口: IbasicBoltStorm简介•Storm使用场景4.内存内缓存+Fields grouping组合在bolt的内存里面缓存一些东西非常常见。
缓存在和fieldsgrouping结合起来之后就更有用了。
比如,你有一个bolt把短链接变成长链接(bit.ly, t.co之类的)。
你可以把短链接到长链接的对应关系利用LRU算法缓存在内存里面以避免重复计算。
比如组件一发射短链接,组件二把短链接转化成长链接并缓存在内存里面。
看一下下面两段代码有什么不一样:builder.setBolt(2, new ExpandUrl(), parallelism).shuffleGrouping(1);builder.setBolt(2, new ExpandUrl(), parallelism).fieldsGrouping(1, new Fields("url"));Storm简介•Storm使用场景• 5.计算top N比如你有一个bolt发射这样的tuple: "value", "count"并且你想一个bolt基于这些信息算出top N的tuple。
最简单的办法是有一个bolt可以做一个全局的grouping的动作并且在内存里面保持这top N的值。
这个方式对于大数据量的流显然是没有扩展性的,因为所有的数据会被发到同一台机器。
一个更好的方法是在多台机器上面并行的计算这个流每一部分的top N, 然后再有一个bolt合并这些机器上面所算出来的top N以算出最后的top N, 代码大概是这样的:Storm简介•Storm使用场景builder.setBolt(2, new RankObjects(), parallellism) .fieldsGrouping(1, new Fields("value"));builder.setBolt(3, new MergeObjects()).globalGrouping(2);这个模式之所以可以成功是因为第一个bolt的fields grouping使得这种并行算法在语义上是正确的。