分布式实时处理系统架构设计与机器学习

2017-01-11 22:21 出处:360java.com 作者:卢誉声  阅读()
卢誉声,Autodesk软件研发工程师,从事平台架构方面的研发工作。在此之前,他曾在思科系统(中国)研发中心云产品研发部工作,并参与了大规模分布式系统的服务器后端、前端以及

卢誉声,Autodesk软件研发工程师,从事平台架构方面的研发工作。在此之前,他曾在思科系统(中国)研发中心云产品研发部工作,并参与了大规模分布式系统的服务器后端、前端以及SDK的设计与研发工作,在分布式系统设计与实现、性能调优、高可用性和自动化等方面积累了丰富的敏捷实践与开发经验。他主要从事C/C++开发工作,致力于高性能平台架构的研究与开发。此外,对JavaScript、Lua以及移动开发平台等也有一定研究。著有《分布式实时处理系统:原理、架构和实现》,并译有《Storm实时数据处理》《高级C/C++编译技术》《JavaScript编程精解(原书第2版)》。

 

今天主要跟大家分享的内容包含以下几个部分:

1. 机器学习与实时处理系统应用

2. 分布式计算拓扑搭建

3. 消息算法调优

4. Hurricane分布式计算框架与未来展望

 

现在我们先来看看第一部分:机器学习与实时处理系统应用。我们首先简单了解下机器学习,然后引入分布式实时处理系统的概念以及实时处理系统与机器学的关系。

当然了,少不了架构细节和设计思想。

 

机器学习在现实世界中的作用越来越重要。

机器学习的方法非常多,比如传统的知识库方法,类比方法,归纳方法,演绎方法等各种方法。

目前在大多数领域中应用最多的当属归纳学习方法。

 

在通常的归纳型机器学习中,我们的目标是让计算机学习到一个“模型”(这种模型是人类预先组织好的,有固定的数据结构和算法等等),然后我们就可以用这个“模型”来进行“预测”。 预测就是从现实中输入一些数据,通过学习到的模型进行计算,得到的输出。我们希望这个模型可以在很高的概率下输出一个和真实结果差距不大的结果。

一旦我们得到了这个模型,我们可以使用该模型处理输入数据,得到输出数据(即预测结果),而归纳性机器学习的任务就是学习中间的这个模型。

 

  

如果我们将这个模型看成一个函数,那么我们可以认为归纳性机器学习的目的就是学习得到一个函数F,如果该函数的参数为x,输出为y。那么我们希望学到的东西就是 y = F(x) 中的F。

我们先用一个最简单的例子来讲一下:  

假设我们现在不知道一个物体自由落体速度的计算公式 

需要学习如何预测一个物体的自由落体速度 

机器学习的第一步就是收集数据 

假设我们可以测量出物体下坠的任何时间点的速度 

那么我们需要收集的数据就是某个物体的下坠时间和那个时间点的速度:

 

现在我们收集到一系列数据: 

_____________

时间    物体速度 

1       9.7 

2       20.0 

3       29.0 

4       39.9 

5       49.4 

6       58.5 

7       69.0 

8       78.8 

9       89.0

我们这里给出2个假设:

1) 第1个假设是:一个物体自由落体的速度只和时间有关系;

2) 第2个假设是:我们可以使用一个简单的“模型”:一元一次函数得到物体的速度。(即 F(x) = ax + b)

可以预见的是,这个问题中a、b 这就是这个模型待学习的“参数”。

现在的问题就是——我们需要用什么策略来学习这些参数?

 

因为我们可以遍历的数值空间是无穷大的,因此我们必须采用某种策略指导我们进行学习。

我们就用非常朴素的思想来将解决这个问题吧

在正式学习前,我们先将收集的数据分成两组:

 一组是“训练数据”, 一组是“测试数据”

假设训练数据是: 

__________

时间    物体速度 

1       9.7 

2       20.0 

3       29.0 

4       39.9 

5       49.4 

6       58.5 

 

测试数据是: 

___________

时间    物体速度 

1       69.0 

2       78.8 

3       89.0

我们需要根据训练数据计算出我们的参数a和b。 

然后使用我们计算出来的a和b预测测试数据,比较F(x)和实际数据的差距。

就是这么简单。

如果误差小到一定程度,说明我们学习到的参数是正确的 

比如和实际数据的差距都小于5% ;如果满足条件说明参数正确,否则说明参数不够精确,需要进一步学习 。

 

-----> 这个差距,我们称之为误差(Loss)

现在我们来看一下在这个模型(简单的一元一次线性函数)下如何学习这两个参数。

 

比如我们可以采用这种学习策略 :

1) 首先a和b都假定为整数,假定a的范围是[-10, 10]这个区间,b的范围是[-100, 100]这个区间

2) 遍历所有的a和b的组合,使用a和b计算ax + b,x取每个训练数据的输入数据,评估计算结果精确性的方法是计算结果和训练数据结果的差的绝对值除以训练数据结果,也就是 Loss = |F(x) - Y| / Y

3) 计算每个组合的Loss的平均值,取平均Loss最小的为我们假定的“学习结果” 。

现在我们就得到了a和b,并且这个a和b是在我们给定范围里精度最高的参数。

 

-----> 我们用这个a和b去训练数据里面计算平均的 |F(x) - Y| / Y: 

1) 如果平均Loss小于 5%,说明这个a和b是符合我们精度的 。

2) 否则我们需要优化我们的学习策略

好了,我们简单的说明了基于归纳学习的机器学习方法的大概思路。这种朴素的基于归纳学习的机器学习方法可以分为以下几步:

1) 预先定义一个模型 

2) 根据模型制定学习策略 

3) 使用学习策略使用模型来学习(拟合)训练数据,得到该模型中的所有参数 

4) 使用测试数据评估模型是否精确。如果不够精确则根据学习策略继续学习。如果足够精确,我们就认为机器学习结束了。 

5) 最后我们可以得到模型和参数,这就是我们学到的结果,也就是那个用来预测的函数。 

 

这里我们也要注意,上述步骤的前提是我们的模型是可以收敛的,如果模型本身就是发散的,那么我们就永远得不到我们的结果了。

这一过程,我们仍然可以使用开头贴的那张图来表述,我再贴一次:

扯了十来分钟机器学习,那么我们的主题--分布式实时处理系统,跟机器学习之间的关系又是怎么样的呢?现在我们切入主主主题( ),一起来了解一下机器学习与实时处理系统之间的关系,并从一个实际的例子来看整个分布式实时处理系统的架构设计。

“传统的机器学习”是一种“批处理式”的方法,在这种方法下,我们需要预先准备好所有的训练数据,对训练数据进行精心组织和筛选,很多情况下还需要对数据进行标记(监督式学习),而训练数据的组织会对最后的训练结果产生相当大的影响。

在这种算法中我们要处理完所有数据后才能更新权重和模型。

   

但是现在出现了许多在线学习算法,这种算法可以对实时输入的数据进行计算,马上完成权重和模型更新。

一方面我们可以用于监督式学习(完成数据标记后马上加入训练),也可以用于大量数据的非监督式学习。

而在这种情况下 ,“分布式实时处理系统”就可以大展身手了。在线系统和实时处理系统可以确保实时完成对数据的学习,利用实时新系统。

 

因此我们引入“分布式实时处理系统”的概念,本人主导开发了基于C/C++开发的高性能分布式实时处理系统-- Hurricane real-time processing,我们会在接下来深入了解其设计。


这是开源官网对该系统的介绍性文字:

那么在这种新的机器学习思潮下,我们可以简单概括其实现的基本思路:


我想需要说明一些东西:

1)这里我们可以看到,系统接收来自其他系统的实时输入,然后实时处理系统中使用在线算法快速处理数据,实时地更新模型权重信息。

2)纯粹的在线算法可能并不适合许多情景,但是如果将部分在线算法和传统的批处理式算法结合,将会起到非常好的效果。而且许多数据分析工作确实可以通过这种方式完成一部分处理,至少是预处理。

BUT。。。

目前机器学习的趋势就是对精度和速度的要求越来越高,方法越来越复杂,而数据越来越多,计算量越来越大,如果没有足够的计算资源,不一定能够在有限时间内完成足够的学习,因此现在类似于Tensorflow之类的机器学习解决方案都会提供针对分布式的支持。而大数据场景下的机器学习也变得越来越重要,这也对我们的分布式计算与存储方案提出了严峻的挑战。

到这里,我们可以看到,分布式实时处理系统不仅能够应用于海量大数据处理、分析,还可很好的应用于机器学习领域,而且基于C/C++的分布式实时处理系统,还为高性能GPU计算,带来了天然优势。(我们稍后就会细说)

当然了,跟今天主题不太相关的一部分就是Hurricane real-time processing 还可应用于 SDN 和下一代 5G 网络通信控制,当然了,对于这个主题,有机会咱们再表 。

好,继续

现在,我们一起来看一个我写的sample,跟随基本业务逻辑,来看看hurricane分布式实时处理系统是怎么组织起来的。

现在我们来看一个现实工程中常常会遇到的问题。

我们在开发实际系统时常常会收集大量的用户体验信息,而我们常常需要对这些体验信息进行筛选、处理和分析。那么我们应该如何搭建一个用于实时处理体验信息的分布式系统呢?

 

我们先来看一下整体流程:

这是基于 hurricane 分布式实时处理系统构建的一个简单的分布式计算拓扑(topology)。

整个计算拓扑十分简单啦。~

λ 收集体验信息

业务系统调用体验信息接口,将体验信息信息异步写入到特定的文件当中。使用永不停息的体验信息检测程序不断将新生成的体验信息发送到数据处理服务器。

λ 处理体验信息

首先数据处理服务器的体验信息接收负责将体验信息写入本地的Redis数据库中。然后我们使用消息源从Redis中读取数据,再将数据发送到之后的消息处理单元,由不同的数据处理单元对体验信息进行不同处理。

λ 存储结果

消息处理单元完成体验信息处理之后,将体验信息处理结果写入到Cassandra数据库中,并将体验信息数据写入到Elasticsearch数据库中。

 

其中关键的部分就是图中用长方形框出来的部分,该部分的作用是完成对数据的筛选、处理和基本分析。这部分我们将其称作计算拓扑,也就是用于完成实际计算的部分。

我们接下来阐述一下每一步具体如何做。

 

一、收集体验信息:收集体验信息分为以下几步:

 1) 程序通过体验信息接口将体验信息写入体验信息文件中。我们假设程序会使用非阻塞的异步写入接口,体验信息接口的调用方只是将体验信息送入某个队列中,然后继续向下执行。

 2) 接着体验信息写入线程从消息队列中读取数据,并将体验信息数据写入到真正的体验信息文件中。

 3) 写入后,某一个体验信息代理程序会不断监视体验信息文件的改动,并将用户新写入的体验信息信息发送到体验信息处理服务器的体验信息收集服务接口上。

 

 4) 体验信息收集服务接口是整个服务的对外接口,负责将其他节点发送的体验信息信息送入集群内部的Redis节点,并将体验信息数据写入到Redis的列表中。至此为止,体验信息收集过程就完成了。

二、处理信息:接下来是处理体验信息,处理体验信息主要在计算拓扑中完成。分为四步:

 

1) 体验信息处理消息源:负责监视Redis列表的改变,从Redis列表中读取体验信息规则,并将体验信息规则文本转换成计算拓扑的内部数据格式,传送到下一个体验信息处理单元。

 

2) 体验信息规则引擎:使用体验信息规则引擎对体验信息进行处理和过滤。这一步是可选的,也就是用户可以加入自己的消息处理单元对收集的体验信息进行处理。这将会影响到发送到后续的消息处理单元(索引器和计数器)中的体验信息消息。这一步我们就不做处理了,如果读者感兴趣可以自己加入一个或者多个消息处理单元对体验信息进行处理。

 

3) 索引:这一步必不可少,用于将体验信息规则引擎输出的体验信息写入到ElasticSearch中,并便于用户日后检索这些体验信息。这里涉及到一步——将体验信息规则元组转换成JSON,并将JSON写入ElasticSearch。

 

4) 统计:这一步也非常重要,用于对体验信息进行计数,这一步会将体验信息计数结果写入Cassandra的对应表中。便于用户获取统计信息。

三、存储结果:最后就是对计算结果的存储,我们需要使用存储模块将数据写入到不同的数据库中:

 

1) ElasticSearch:该数据库用于存储被转换成JSON的原始体验信息信息。用户可以在ElasticSerach中检索体验信息。

 

2) Cassandra:该数据库用于存储体验信息的统计计算结果。因为Cassandra支持原子计数列,因此可以非常胜任这个工作。

嗯,这就是大概的整个过程了!

我们可以发现,在上面几步中,其他都可以使用现成的系统来完成任务,最关键的部分就是分布式计算拓扑,计算拓扑需要“高实时性”地完成体验信息处理分析任务,这样才能应付大型系统中以极快速度产生的大量体验信息。

这里我们可以使用一个独立的计算集群来完成这个事情。每个计算节点负责完成一个计算任务,完成之后将数据传送给下一个计算节点完成后续的计算任务。每个计算节点都有一个消息队列用于接收来自上一级的消息,然后处理消息并继续将结果发送给下一级的计算节点。
 

这里我们通常关心三个问题:

1. 如何确保所有数据都得到了处理。

2. 如何组织消息(数据)的传递,为整个集群高效计算提供一个良好的I/O支持。

3. 如何搭建这个计算拓扑并尽量高效地进行完成计算。
 

我们先来看一下如何解决解决数据的完全处理问题。

 

我们这里讲每一个需要处理的数据(一条体验信息记录)组织成一个Tuple,也就是元组。每个计算节点都以Tuple为单位进行数据处理。每个元组都会有一个ack方法,用于告知上一级计算节点该Tuple已经处理完成。

我们以下面的方式处理Tuple,保证所有数据都会被完全处理:

1. 首先给每个Tuple一个id(伪随机的64位id)。

2. 由消息源发出的Tuple会有一个Acker,构造Tuple的时候会把新的Tuple加入这个Acker(就是包含这个Acker)。

3. 每个节点处理完一个元组调用元组的ack方法,改变Acker内部的记录值,表示当前Tuple已经完成处理。

4. 如果某个Acker中的所有Tuple都已经处理完成,那么这个Spout Tuple就已经处理完成。表明该消息源发出的Tuple被完全处理。

5. 由于我们无法在Acker中记录下Tuple树,因此比较好的方式是实现一个基于异或的优化算法,该算法在Storm中得到了应用。其具体实现是:在Acker中设置一个ack id,每创建一个Tuple,将id与其异或,每ack一个Tuple时,将其与id做异或运算。这样当所有Tuple处理完成后,ack id为0,就可以知道所有元组处理完成。

6. 如果消息源检测到某个其发出的Tuple没有在特定时间内得到处理,就会重发该元组。后续的计算节点重新开始处理。为了实现一个同时符合CAP的分布式系统,我们这里后续的计算节点并不会缓存计算结果,而是会重新开始计算上一级节点重发的元组,具体为什么这样做请参见How to beat the CAP theorem,地址是:http://nathanmarz.com/blog/how-to-beat-the-cap-theorem.html

______________________________

 

第二个需要解决的问题就是数据流量控制问题。

我们可以设想一下,如果网络状况不好,在特定时间内有许多元组都没有得到处理,那么数据源节点就会重发许多Tuple,然后后续节点继续进行处理,产生更多的Tuple,加上我们需要正常处理的Tuple,使得集群中的Tuple越来越多。而由于网络状况不好,节点计算速度优先,会导致集群中积累的过多数据拖慢整个集群的计算速度,进一步导致更多的Tuple可能计算失败。

为了解决这个问题,我们必须想方设法控制集群中的流量。

这个时候我们就会采用一种流量背压机制。该机制借鉴自Twitter Heron。

这个机制的思想其实很简单,当每个计算节点处理 Tuple过慢,导致消息队列中挤压的Tuple过多时会向其他节点发送消息,那么所有向该节点发送消息的节点都会降低其发送消息的速度。经过逐级传播慢慢将整个集群的流量控制在比较合理的情况下。只不过这个算法具体如何实现有待我们继续研究。

______________________________

第三个,也是最后一个:就是如何搭建这个拓扑,并尽量高效地完成计算了。

在分布式实时处理系统领域,目前最为成功的例子就是Apache Storm项目,而Apache Storm采用的就是一种流模型。而我们的Hurricane则借鉴了Storm的结构,并进行了简化(主要在任务和线程模型上)。

!!!前方高能,5个基本概念的堆叠(不喜,可以跳过下一条消息

这种流模型包括以下几个概念:

1. 拓扑结构:一个拓扑结构代表一个打包好的实时应用程序,相当于Hadoop中的一个MapReduce任务。但是和MapReduce最大的不同就是,MapReduce最后会停止,相当于任务处理结束,而拓扑结构则会持续执行,永不停息,除非你手动停止。因此任何时刻流入的数据流都会被拓扑结构迅速处理。

2. 流:一个流是拓扑结构中由元组组成的无限的序列,通常是由一个元组经过不同的处理单元处理之后产生的。每一个流入拓扑结构中的数据都会产生一个流。

3. 元组:元组是在流中传输的数据,数据源会将输入的数据转成元组输入到拓扑结构中,而数据处理单元会处理上一级的元组并产生新的元组传给下一级的数据处理单元。元组中支持存储不同类型的数据。

4. 消息源:消息源是拓扑结构中数据流的源头。通常其任务是读取外部数据源输入,并产生元组输入拓扑结构中。可靠的数据源可以确保消息完全得到处理,并在合适的时候重发元组。

5. 数据处理单元:数据处理单元是拓扑结构中负责处理数据的部分,你可以在其中筛选数据,统计数据,拼接数据等等。

数据处理单元会接收来自上一级的元组,并经过处理得到下一级的元组。每个数据处理单元会向上一级确认其元组有没有得到正确处理,如果数据源发现固定时间内并不是全部元组都被处理完了,就会重发元组。

下面咱们说点儿实在的。。为了支撑这套模型,我们设计了Hurricane的架构,该架构如下图所示:

最上方的是President,这是整个集群的管理者,负责存储集群的所有元数据,所有Manager都需要与之通信并受其控制。

下方的是多个Manager,每个Manager中会包含多个Executor,每个Executor会执行一个任务,可能为Spout(消息源)和Bolt(消息处理单元)。

从任务的抽象角度来讲,每个Executor之间会相互传递数据,只不过都需要通过Manager完成数据的传递,Manager会帮助Executor将数据以元组的形式传递给其他的Executor。

Manager之间可以自己传递数据(如果分组策略是确定的),有些情况下还需要通过President来得知自己应该将数据发送到哪个节点中。

了解整体架构后,我们来具体讲解一下President和Manager的架构。

首先,最顶层和President一样,是一个事件队列,并使用一个基于Meshy的NetListener来完成IO事件的响应(转换成事件放入事件队列)。

接下来有两个模块:

 

一个是Metadata Manager,该线程负责监听EventQueue,接收元数据的同步事件,负责和President同步集群的元数据。

另一个是Tuple Dispatcher,该线程负责响应OnTuple事件,接收其他节点发过来的元组,并将元组分发到响应的Bolt Executor的元组队列中。

(Manager的架构相对来说较为复杂。考虑到性能优化等问题,这个架构修改了几次。)

再下一层就是Executor。Executor分为SpoutExecutor和BoltExecutor,每个Executor都是一个单独的线程,在系统初始化Topology的时候,Managert会初始化Executor,并设置其中的任务。SpoutExecutor负责执行Spout任务,而BoltExecutor负责执行Bolt任务。

其中BoltExecutor需要接受来自其他Executor的Tuple,因此包含一个Tuple Queue。Tuple Dispatcher会将Tuple投送到这个Tuple Queue中,而Bolt则从Tuple Queue中取出数据并执行任务。

Eexecutor在执行完任务后,可能会将Tuple通过OutputCollector投送到OutputQueue中。我们又设计了一个OutputDispatcher,从OutputQueue中获取Tuple并发送到其他节点。

OutputQueue也是一个带锁的阻塞队列,是唯一用于输出的队列。

出于时间原因,我这里不再详细描述 Task Spout Bolt 的细节(有兴趣的同学,我会在稍后把更加完整的信息共享出来)。

 

我们来直接看一下hurricane分布式实时处理系统的高级抽象元语架构设计与原理。

我把高级抽象元语命名为“Squared”。首先我们解释一下Squared是什么?

左侧是Hurricane基本的计算模型,在该计算模型中,系统是一个计算任务组成的网络。我们需要考虑每个节点的琐屑实现。

但如果在日常任务中,使用这种模型相对来说会显得比较复杂,尤其当网络非常复杂的时候。

为了解决这个问题,看一下右边这个计算模型,这是对我们完成计算任务的再次抽象。

第一步是产生语句的数据源

 

然后每条语句需要使用名为SplitSentence的函数处理将句子划分为单词。

接下来根据单词分组,使用CountWord这个统计操作完成单词的计数。

这里其实是将网络映射成了简单的数据操作流程。

 

这样一来,解决问题和讨论问题都会变得更为简单直观。

这就是Squared所做的事情——将基于网络与数据流的模型转换成这种简单的流模型,让开发者更关注于数据的统计分析,脱离部分繁琐的工作。

嘿嘿,是不是很强大,而且将分布式系统内部、底层的复杂细节,对于上层应用开发人员来说,就隐藏起来了,而且使用起来更加方便。

____________________________

最后,我还想唠叨一下“保序”的问题,因为这个对于分布式实时系统来说,在某些特定领域,也显得十分重要,比如银行系统。

在现实的工作中,我们常常需要一个的特性就是保序。

 

比如部分银行交易和部分电商订单处理,希望数据按照顺序进行处理,但是传统的数据处理系统往往不支持这个特性。

所以我们就实现了保序功能。

保序的实现原理很简单,首先每个Tuple会一个一个orderId字段,orderId是依据顺序生成的,然后所有对Tuple的操作都会检验该orderId之前的Tuple是否已经完成。

如果已经完成则处理该Tuple,否则就将Tuple放在一个队列里,等待前面的Tuple处理完毕为止。

Squared的实现原理很简单,首先每个Tuple会一个一个orderId字段,orderId是依据顺序生成的,然后所有对Tuple的操作都会检验该orderId之前的Tuple是否已经完成。

 

如果已经完成则处理该Tuple,否则就将Tuple放在一个队列里,等待前面的Tuple处理完毕为止。

由于时间原因,我这里不过多介绍多语言支持子系统,不过这一部分也是当时实现的时候非常梦幻的一个部分。有兴趣的同学,我们可以今后再表。

(hurricane realtime processing 没有使用任何第三方库,包括分布式消息队列,以及网络通信库,都没有使用。)

_____________________________

最后简单展望一下吧,其实已经在实现的路上了。

由于现在有许多计算任务需要使用基于向量和矩阵的浮点计算,因此我们计划开发一个Hurricane的子项目——SewedBLAS:

这是一个BLAS(BLAS 是指Basic Linear Algebra Subprograms,即基础线性代数子程序库,里面拥有大量已经编写好的关于线性代数运算的程序)库的高层抽象,我们希望整合大量的BLAS库,比如使用CPU的MKL/OpenBLAS,使用GPU的CUDA和ACML,构建一个易于使用、跨平台的高性能线性代数库,并与Hurricane进行深度整合,力求在分布式和科学计算、深度学习找到最好的切合点,并充分吸收整合其他现有的分布式机器学习框架,减少从科研到产品的转换难度。(不要打我,最后这句有点官方化了,哈啊哈)

总之,谢谢大家,今天我要扯的,就这些。 

 
分享到:
本文标签: 分布式, 架构设计, 机器学习

相关文章

发表评论(共条评论)愿您的每句评论,都能给大家的生活添色彩,带来共鸣,带来思索,带来快乐。

Copyright (C) java学习 360java 360java.com, All Rights Reserved.

苏ICP备16022210号