为了正常的体验网站,请在浏览器设置里面开启Javascript功能!

yahoo_S4

2013-06-22 46页 doc 1MB 28阅读

用户头像

is_507569

暂无简介

举报
yahoo_S4Yahoo! S4:分布式流计算平台 一、概述 S4(Simple Scalable Streaming System)最初是Yahoo!为提高搜索广告有效点击率的问题而开发的一个平台,通过统计分析用户对广告的点击率,排除相关度低的广告,提升点击率。目前该项目刚启动不久,所以也可以理解为是他们提出的一个分布式流计算(Distributed Stream Computing)的模型。 S4的设计目标是: ·提供一种简单的编程接口来处理数据流 ·设计一个可以在普通硬件之上可扩展的高可用集群。 ·通过在每个处理节点使用本地...
yahoo_S4
Yahoo! S4:分布式流计算平台 一、概述 S4(Simple Scalable Streaming System)最初是Yahoo!为提高搜索广告有效点击率的问题而开发的一个平台,通过统计分析用户对广告的点击率,排除相关度低的广告,提升点击率。目前该项目刚启动不久,所以也可以理解为是他们提出的一个分布式流计算(Distributed Stream Computing)的模型。 S4的目标是: ·提供一种简单的编程接口来处理数据流 ·设计一个可以在普通硬件之上可扩展的高可用集群。 ·通过在每个处理节点使用本地内存,避免磁盘I/O瓶颈达到最小化延迟 ·使用一个去中心的,对等架构;所有节点提供相同的功能和职责。没有担负特殊责任的中心节点。这大大简化了部署和维护。 ·使用可插拔的架构,使设计尽可能的即通用又可定制化。 ·友好的设计理念,易于编程,具有灵活的弹性 系统基于如下假设: ·一旦一个节点失败,会failover到另一个standby节点,但是会丢失原节点的内存状态,这也就是为什么说S4是一个部分容错的系统。 ·节点不能动态增加和减少。 二、与MapReduce的区别 通常对于大规模分布式数据的处理会首先想到MapReduce,Yahoo!也维护了Hadoop项目,但是他们最终放弃了扩展Hadoop的想法,因为相比之下,流计算面对的场景和需求是完全不同的。 流计算强调的是数据流的形式和实时性。MapReduce系统主要解决的是对静态数据的批量处理,当MapReduce任务启动时,一般数据已经到位了(比如保存到了分布式文件系统上)。而流式计算系统在启动时,一般数据并没有完全到位,而是经由外部数据流源源不断地流入,并且不像批处理系统重视的是总数据处理的吞吐,而是对数据处理的低延迟,希望进入的数据越快处理越好,这里的思想是数据的价值随着时间的递增而递减,所以数据越快被处理,结果就越有价值,这也是实时处理的价值所在。 MapReduce采用的是一种比较静态的模型,如果用它做数据流的处理,首先需要将数据流缓存并分块,然后放入集群运算。如果数据块分得太小,可以获得一定的低延迟以保障实时性,但是包括集群启动之类的额外开销将会占很大比重;如果数据块分得太大,将无法满足低延迟的需求,达不到实时性的要求。 流计算的数据本身就是数据流,不需要数据准备的时间,有数据即可流入同时计算,同时解决了数据准备和延迟的两个问题,所以流计算是一种有别于MapReduce的分布式计算模型。同时高速数据流的速率也不是MapReduce的设计可以承担的,流计算系统没有批处理的概念,而是自启动开始,每时每刻都在处理实时流入的数据。这也是为什么S4的开发小组一直在强调S4 is not Real-Time MapReduce。 三、实现逻辑 1、Event 数据流是事件(Event)的序列流。每个Event是一个(K,A)元素,通过EventType来标示其类型。K、A分别表示这种类型的 Event的若干个key和若干个attribute。key和attribute都是tuple-valued,即key=value这种元组值。例如: EventType(EV): TradeEvent KEY: product=”T-shirt” type=”buy it now” Attribute: customerId=”1234” time=”2011-4-19 01:21:31” 2、Processing Elements HYPERLINK "http://blog.hesey.net/wp-content/uploads/2011/04/PE.png" Processing Elements(PE)是S4中的基本计算单元,一个PE通过下面四个组件来表示: functionality:由实现PE的Java类和相关配置来定义。 types of events:处理的Event Type。 key:关心哪种key。 key的值:关心(匹配)的key值是多少。 每个PE只负责处理自己所关心的event type,并且只处理自己所对应的key值的event,也就是说,只有当event type, key, key的值都匹配时,才会交由该PE进行计算处理。 这里要注意的是,如果有匹配的PE,则交由该PE处理,如果没有,则会创建一个新的PE。所以一个PN中的PE可能有许多个,这就需要在前期对事件的key及其取值范围进行很好的划分,否则可能因为过多的PE导致系统效率降低,同时也应该定期对使用率较低的PE进行清除,但是由于数据存放在节点内存中,所以清除前应该对有必要的数据进行持久化处理,否则会永久性丢失。后期应该会在PE上添加优先级等属性,可以提升清除PE工作的准确率。 有一种PE没有属性key和attribute,它们可以处理指定event type的所有事件,通常这些事件是原始数据,这类PE一般放置在S4集群的输入层,在这里原始事件会被赋予一个key以便于分发给后面的PE处理。PE处理后可能输出一个或多个event,输出频率在配置文件中定义,可以配置指定时间间隔输出一次或是指定事件数发生后输出一次,特例是可以配置为每次事件触发都输出一次。 PE是直接面向业务方的组件,由业务方定义PE中对事件的处理和处理以后的输出,剩下的事情全部交由平台去做。 3、Processing Node Processing Node(PN)对应着集群中的每一个逻辑结点,主要工作是监听事件,当事件到达时调用适合的PE处理事件,如果PE有输出,则还需和通讯层合作进行事件的分发和输出。需要注意的是,集群中所有的PN都是对等的,使得集群的部署和维护相对简单,没有中心节点。其结构如下图: Event Listener负责监听事件并转交给PE容器(Processing Element Container, PEC),由PEC交由合适的PE处理业务逻辑。配置文件中还会配置PE原型(PE prototype),定义其功能、event type、key。PN对其监听到的每一个唯一属性值会触发一次操作,如果当前没有合适的PE处理该事件,则会根据该PE原型创建新的对应了该唯一属性值的PE来对事件进行处理。 配置文件中定义了S4集群所关心的key的集合,这里会通过对事件的event type, key, key的值计算哈希函数,以路由到指定的PN,此外单个事件可能会被路由到多个PN中,这里应该是为将来做负载均衡做的准备,也可以作为广播事件的途径。 PEC中有若干个PE分别对应不同的情况,这里的思想即是Actor模型。PE处理完逻辑后根据其定义的输出方法可以输出事件,事件交由Dispatcher与通讯层(Communication Layer)进行交互并由Emiter输出至逻辑节点。 结果就是,所有包含特性属性值的事件在理论上都能通过哈希函数到达相应的PN,并被路由到PN内的PE上处理。 4、Communication Layer 通讯层提供集群管理、故障恢复(failover)到备用节点、逻辑节点到物理节点的映射。当监测到硬件故障时,会自动更新映射。通讯层隐藏的映射使得PN发送消息时只需要关心逻辑节点而不用关心物理节点。 通讯层采用插件式的架构来选择网络协议。它会对事件本身采用不同的协议发送,例如对于控制消息,会采用可靠方式(例如TCP)发送,而为了优化集群的吞吐率,可能会采用不可靠的方式(例如UDP)发送数据消息,这在S4中是可以容忍的。 5、Configuration Management System 配置管理系统主要用于对集群的操作,包括为S4任务创建和销毁集群,分配新的物理节点到S4任务集群中,空闲的集群可以作为冷备。这里的一致性保证交由ZooKeeper来做。 S4系统的数据流整体上看起来类似下图: 四、编程模型 面向业务方的PE编程十分简单,只需要编写处理事件的processEvent函数和输出的output函数即可,PE本身可以定义一些变量进行本地的数据,同时S4本身提供了一些持久化的辅助,主要类都在io.ts4.persist包下面。值得注意的是在API文档中可以看到,持久化的类通常都包含一个Clock对象,可以在初始化时定义持久时间,到时间以后会被抛弃。 五、性能 开发小组运用S4系统分别进行了在线实验和离线实验(压力测试)。 在线实验集群有16台服务器,每台4个32位CPU,2GB内存。每天大约有25万用户发起共100万次搜索,实验两周内观察到的峰值是每秒1600个事件。实验结果表明S4增加了3%的广告点击,主要通过快速检测低质量广告并把它们过滤出去达成。 离线实验集群有8台服务器,每台服务器4个64位CPU,16GB内存。集群中跑了16个PN,每台2个,事件由300万服务和点击组成。这次实验主要用于评估系统在远高于期望事件流量下的性能。 压力测试结果如下: 可见当事件流越来越大时,S4系统的错误越来越多,当数据流速达到9.7Mbps时,开始出现错误,这主要是由于S4系统无法及时处理过多的事件而导致了丢失数据的情况。 六、系统分析 S4是面向流式数据和实时处理的,所以针对实时性较高的业务,可以很好地对数据做出高效的分析处理,而且系统一旦上线,很少需要人工干预,源源不断的数据流会被自动路由并分析。对于海量数据,它和MapReduce类似都可以应对,但它能比后者更快地处理数据。 S4目前的缺点在于它的数据传输可靠性还不够,可能丢失数据,同时由于数据存放在内存中,一旦节点出现故障,就会丢失该节点的所有数据,这一点可以通过定期持久化来弥补(但是真的有必要吗?)。同时我认为这和它面向的场景也有关系,实时数据分析通常都是针对一些非常离散、细小的数据,从统计的角度来看,损失掉一部分数据对最后的统计结果并没有很大影响,而这部分牺牲却可以换来吞吐率的大幅提升。所以就目前来看,S4还是更适合对那些不一定非得对每条数据都仔细分析的场景,只求最后一个统计的结果来对业务做出相应的预计和调整。此外S4系统要求输入的是事件流,这就涉及到事件的生成,所以在数据流入S4以前,必须有能将数据转化为事件的系统进行中间处理。 从集群的扩展性来看,理论上可以通过增加节点应对更大的数据流,但是目前还无法在S4工作时动态增加或减少节点。所以对节点进行调整时很可能必须停下当前的工作,做不到无缝调整。而且由于S4由ZooKeeper进行集群管理,所以当集群增加到一定规模时,ZooKeeper的管理能力也有待考验。此外,仍然是因为S4无法保证数据100%的可靠传输,所以集群规模增长时,数据错误也会增长得很快。目前没有相关资料显示S4集群的规模究竟可以做到多大,但是相信未来随着数据传输可靠性的提升,会发挥很可观的作用。 在业务耦合度方面,S4完全隔离了平台和业务逻辑,业务方只需要编写PE逻辑即可,这一点类似于MapReduce中只需编写map和reduce函数,业务和平台的耦合度是非常低的。 七、前景 从目前来看,流式计算的前景非常广阔,目前已经有一些基于流的服务出台。例如Twitter的Streaming API,可以实时推送(Push)消息,Google也利用该API实现了实时搜索。未来无论从数据的产生还是流动上都会有数量级的飞跃,如何对海量数据做出及时、实时的处理,相信不论是实际业务还是数据挖掘领域都会十分关注。也可以独立开发自己的或者基于S4的流式计算平台,以契合自身的业务需求。新的信息不仅对数据分析有用,也能给予用户更好的体验(Yahoo!做S4的初衷就是提升用广告的用户体验以增加点击率)。相信S4会成为继Hadoop之后的又一分布式计算领域的干将。未来的数据将不再是一个一个包的概念,而成为一道道数据流源源不断地流入系统,产出效益。 S4(Simple Scalable Streaming System)是一个分布式流处理引擎,开发者可以在这个引擎基础上开发面向无界的,不间断的流数据处理应用。 什么是流数据处理应用?例如,为了个性化搜索广告,系统需要实时处理来自几百万唯一用户每秒成千上万次的查询,并即时分析用户的会话特征来提高广告相关性 预测模型的准确度。流数据处理应用要求我们的系统可以接受大量的,不间断的数据(称为流式数据),并可以迅速做出数据处理,S4正是完成这样一个需求。 S4的设计思想 S4将一个流抽象为由(K,A)形式的元素组成的序列,这里K和A分别是键和属性。在这种抽象的基础上S4设计了能够消费和发出这些(K,A)元素的组 件,也就是Process Element。Process Element在S4中是最小的数据处理单元,每个PE实例只消费事件类型,属性key,属性value都匹配的事件,并最终输出结果或者输出新的 (K,A)元素。下面这张S4内的图很好的诠释了上面说的内容。 图中节点表示PE,有向边表示一个(K,A)元素及其流向。流初始化为一个Key为null,事件类型为Quote的(K,A)元素,按照上文的说 法,S4中需要定义一个能消费相应事件类型,属性key,属性value事件的PE,也就是图中的PE1。当PE1完成处理后,它发出新的(K,A)元 素,即图中的事件类型为WordEvent,key为word=”x”,属性为count=x的边,最后经过多个PE处理和(K,A)事件转换得到最终结 果(这里不介绍上图的处理过程,详细看论文介绍)。 实际上,我认为S4最核心的部分就是上面的设计,它将流的处理分为多个流事件,抽象为处理图中的有向边,并且每个流事件都用(K,A)的形式表示,这种表 示方式使得事件间的转换传递很方便,这是一种借鉴了MapReduce的(key,value)的设计。同时因为流被分为多个流事件,那么就需要对应多个 处理单元,每个PE唯一处理一种事件,并且PE间独立,这也大大降低了概念复杂性和系统复杂性。我们开发者要做的就是定制个性化的PE。 PE间有事件传递的依赖性,很自然的,我们希望可以有一个PE集群,S4使用Processing Element Container的概念,将多个PE包含到同一个容器中,PEC接收源event,并最终发送结果event。PEC加上通信处理模块就形成了PE的逻 辑主机Processing Node。如下图所示: PN负责监听事件,在到达事件上执行操作(PE完成),然后通过通信层Communication Layer的协助分发事件,也可以发出输出事件。 S4通过一个hash函数,将事件路由到目标PN上,这个hash函数作用于事件的所有已知属性值上(需要配置),所以一个事件可能被路由到多个PN上。 然后PN中的事件监听器会将到来的事件传递给PEC,PEC以适当的顺序调用适当的PE(每个编键keyed的PE都会被映射到一个确定的PN上,即图中 的PE并不是物理存在一个PN相关,而是逻辑相关)。处理完成后,PN可能发出输出事件,也可以向Communication Layer请求协助向指定逻辑节点发送消息。 最后是通信层的介绍,它提供了“集群管理”,“故障恢复到备用节点”,“逻辑节点到物理节点映射”的作用。同时通信层还使用一个插件式的架构来选择网络协议,使用zookeeper在S4集群节点之间做一致性协作! S4的其他特点总结(摘自网络) S4 S4是一个通用的、分布式的、可扩展的、分区容错的、可插拔的流式系统。基于S4框架,开发者可以轻松开发面向持续流数据处理的应用。 S4的设计特点有以下几个方面。 Actor Model 为了能在普通机型构成的集群上进行分布式处理,并 且集群内部不使用共享内存,S4架构采用了Actor模式,这种模式提供了封装和地址透明语义,因此在允许应用大规模并发的同时,也提供了简单的编程接 口。S4系统通过处理单元(Processing Elements,PEs)进行计算,消息在处理单元间以数据事件的形式传送,PE消费事件,发出一个或多个可能被其他PE处理的事件,或者直接发布结 果。每个PE的状态对于其他PE不可见,PE之间唯一的交互模式就是发出事件和消费事件。框架提供了路由事件到合适的PE和创建新PE实例的功能。S4的 设计模式符合封装和地址透明的特性。 Decentralized and Symmetric Architecture 除了遵循Actor模式,S4也参照了 MapReduce模式。为了简化部署和运维,从而达到更好地稳定性和扩展性,S4采用了对等架构,集群中的所有处理节点都是等同的,没有中心控制。这种 架构将使得集群的扩展性很好,处理节点的总数理论上无上限;同时,S4将没有单点容错的问题。 Pluggable Architecture S4系统使用Java开发,采用了极富层次的模块化编程,每个通用功能点都尽量抽象出来作为通用模块,而且尽可能让各模块实现可定制化。 Partial Fault-Tolerance 基于Zookeeper服务的集群管理层将会自动路由事件从失效节点到其他节点。除非显式保存到持久性存储,否则节点故障时,节点上处理事件的状态会丢失。 Object Oriented 节点间通信采用“Plain Old Java Objects”(POJOs)模式,应用开发者不需要写Schemas 或用哈希表来在节点间发送Tuples。 S4的功能组件分3大类,Clients、Adapters和PNode Cluster,图2显示了S4系统框架。 图2 Yahoo! S4流式系统框架结构图 S4提供Client Adapter,允许第三方客户端向S4集群发送事件和接收事件。Adapter实现了基于JSON的API,支持多语言实现的客户端驱动。 Client通过Driver组件与 Adapter进行交互,Adapter也是一个Cluster,其中有多个Adapter结点,Client可以通过多个Driver与多个 Adapter进行通信,这样可以保证单个Client在分发大数据量时Adapter不会成为瓶颈,也可以确保系统支持多个Client应用并发执行的 快速、高效和可靠性。 在Adapter中,真正与Client交互的是 其Stub组件,该组件实现了管理Client与Adapter之间通过TCP/IP协议进行通信的功能。GenericJsonClientStub这 个类支持将事件在Client与Adapter之间以JSON的形式转换,从而支持更多种类型的Client应用。不同的Client可以配置不同的 Stub来与Adapter进行通信,用户可以定义自己的Stub来实现自己想要的业务逻辑,这样也使得Client的行为更加多样性、个性化。 Yahoo! S4:分布式流计算平台 一、概述 S4(Simple Scalable Streaming System)最初是Yahoo!为提高搜索广告有效点击率的问题而开发的一个平台,通过统计分析用户对广告的点击率,排除相关度低的广告,提升点击率。目前该项目刚启动不久,所以也可以理解为是他们提出的一个分布式流计算(Distributed Stream Computing)的模型。 S4的设计目标是: ·提供一种简单的编程接口来处理数据流 ·设计一个可以在普通硬件之上可扩展的高可用集群。 ·通过在每个处理节点使用本地内存,避免磁盘I/O瓶颈达到最小化延迟 ·使用一个去中心的,对等架构;所有节点提供相同的功能和职责。没有担负特殊责任的中心节点。这大大简化了部署和维护。 ·使用可插拔的架构,使设计尽可能的即通用又可定制化。 ·友好的设计理念,易于编程,具有灵活的弹性 系统基于如下假设: ·一旦一个节点失败,会failover到另一个standby节点,但是会丢失原节点的内存状态,这也就是为什么说S4是一个部分容错的系统。 ·节点不能动态增加和减少。 二、与MapReduce的区别 通常对于大规模分布式数据的处理会首先想到MapReduce,Yahoo!也维护了Hadoop项目,但是他们最终放弃了扩展Hadoop的想法,因为相比之下,流计算面对的场景和需求是完全不同的。 流计算强调的是数据流的形式和实时性。MapReduce系统主要解决的是对静态数据的批量处理,当MapReduce任务启动时,一般数据已经到位了(比如保存到了分布式文件系统上)。而流式计算系统在启动时,一般数据并没有完全到位,而是经由外部数据流源源不断地流入,并且不像批处理系统重视的是总数据处理的吞吐,而是对数据处理的低延迟,希望进入的数据越快处理越好,这里的思想是数据的价值随着时间的递增而递减,所以数据越快被处理,结果就越有价值,这也是实时处理的价值所在。 MapReduce采用的是一种比较静态的模型,如果用它做数据流的处理,首先需要将数据流缓存并分块,然后放入集群运算。如果数据块分得太小,可以获得一定的低延迟以保障实时性,但是包括集群启动之类的额外开销将会占很大比重;如果数据块分得太大,将无法满足低延迟的需求,达不到实时性的要求。 流计算的数据本身就是数据流,不需要数据准备的时间,有数据即可流入同时计算,同时解决了数据准备和延迟的两个问题,所以流计算是一种有别于MapReduce的分布式计算模型。同时高速数据流的速率也不是MapReduce的设计可以承担的,流计算系统没有批处理的概念,而是自启动开始,每时每刻都在处理实时流入的数据。这也是为什么S4的开发小组一直在强调S4 is not Real-Time MapReduce。 三、实现逻辑 1、Event 数据流是事件(Event)的序列流。每个Event是一个(K,A)元素,通过EventType来标示其类型。K、A分别表示这种类型的 Event的若干个key和若干个attribute。key和attribute都是tuple-valued,即key=value这种元组值。例如: EventType(EV): TradeEvent KEY: product=”T-shirt” type=”buy it now” Attribute: customerId=”1234” time=”2011-4-19 01:21:31” 2、Processing Elements Processing Elements(PE)是S4中的基本计算单元,一个PE通过下面四个组件来表示: functionality:由实现PE的Java类和相关配置来定义。 types of events:处理的Event Type。 key:关心哪种key。 key的值:关心(匹配)的key值是多少。 每个PE只负责处理自己所关心的event type,并且只处理自己所对应的key值的event,也就是说,只有当event type, key, key的值都匹配时,才会交由该PE进行计算处理。 这里要注意的是,如果有匹配的PE,则交由该PE处理,如果没有,则会创建一个新的PE。所以一个PN中的PE可能有许多个,这就需要在前期对事件的key及其取值范围进行很好的划分,否则可能因为过多的PE导致系统效率降低,同时也应该定期对使用率较低的PE进行清除,但是由于数据存放在节点内存中,所以清除前应该对有必要的数据进行持久化处理,否则会永久性丢失。后期应该会在PE上添加优先级等属性,可以提升清除PE工作的准确率。 有一种PE没有属性key和attribute,它们可以处理指定event type的所有事件,通常这些事件是原始数据,这类PE一般放置在S4集群的输入层,在这里原始事件会被赋予一个key以便于分发给后面的PE处理。PE处理后可能输出一个或多个event,输出频率在配置文件中定义,可以配置指定时间间隔输出一次或是指定事件数发生后输出一次,特例是可以配置为每次事件触发都输出一次。 PE是直接面向业务方的组件,由业务方定义PE中对事件的处理和处理以后的输出,剩下的事情全部交由平台去做。 3、Processing Node Processing Node(PN)对应着集群中的每一个逻辑结点,主要工作是监听事件,当事件到达时调用适合的PE处理事件,如果PE有输出,则还需和通讯层合作进行事件的分发和输出。需要注意的是,集群中所有的PN都是对等的,使得集群的部署和维护相对简单,没有中心节点。其结构如下图: Event Listener负责监听事件并转交给PE容器(Processing Element Container, PEC),由PEC交由合适的PE处理业务逻辑。配置文件中还会配置PE原型(PE prototype),定义其功能、event type、key。PN对其监听到的每一个唯一属性值会触发一次操作,如果当前没有合适的PE处理该事件,则会根据该PE原型创建新的对应了该唯一属性值的PE来对事件进行处理。 配置文件中定义了S4集群所关心的key的集合,这里会通过对事件的event type, key, key的值计算哈希函数,以路由到指定的PN,此外单个事件可能会被路由到多个PN中,这里应该是为将来做负载均衡做的准备,也可以作为广播事件的途径。 PEC中有若干个PE分别对应不同的情况,这里的思想即是Actor模型。PE处理完逻辑后根据其定义的输出方法可以输出事件,事件交由Dispatcher与通讯层(Communication Layer)进行交互并由Emiter输出至逻辑节点。 结果就是,所有包含特性属性值的事件在理论上都能通过哈希函数到达相应的PN,并被路由到PN内的PE上处理。 4、Communication Layer 通讯层提供集群管理、故障恢复(failover)到备用节点、逻辑节点到物理节点的映射。当监测到硬件故障时,会自动更新映射。通讯层隐藏的映射使得PN发送消息时只需要关心逻辑节点而不用关心物理节点。 通讯层采用插件式的架构来选择网络协议。它会对事件本身采用不同的协议发送,例如对于控制消息,会采用可靠方式(例如TCP)发送,而为了优化集群的吞吐率,可能会采用不可靠的方式(例如UDP)发送数据消息,这在S4中是可以容忍的。 5、Configuration Management System 配置管理系统主要用于对集群的操作,包括为S4任务创建和销毁集群,分配新的物理节点到S4任务集群中,空闲的集群可以作为冷备。这里的一致性保证交由ZooKeeper来做。 S4系统的数据流整体上看起来类似下图: 四、编程模型 面向业务方的PE编程十分简单,只需要编写处理事件的processEvent函数和输出的output函数即可,PE本身可以定义一些变量进行本地的数据记录,同时S4本身提供了一些持久化的辅助,主要类都在io.ts4.persist包下面。值得注意的是在API文档中可以看到,持久化的类通常都包含一个Clock对象,可以在初始化时定义持久时间,到时间以后会被抛弃。 五、性能 开发小组运用S4系统分别进行了在线实验和离线实验(压力测试)。 在线实验集群有16台服务器,每台4个32位CPU,2GB内存。每天大约有25万用户发起共100万次搜索,实验两周内观察到的峰值是每秒1600个事件。实验结果表明S4增加了3%的广告点击,主要通过快速检测低质量广告并把它们过滤出去达成。 离线实验集群有8台服务器,每台服务器4个64位CPU,16GB内存。集群中跑了16个PN,每台2个,事件由300万服务和点击组成。这次实验主要用于评估系统在远高于期望事件流量下的性能。 压力测试结果如下: 可见当事件流越来越大时,S4系统的错误越来越多,当数据流速达到9.7Mbps时,开始出现错误,这主要是由于S4系统无法及时处理过多的事件而导致了丢失数据的情况。 六、系统分析 S4是面向流式数据和实时处理的,所以针对实时性较高的业务,可以很好地对数据做出高效的分析处理,而且系统一旦上线,很少需要人工干预,源源不断的数据流会被自动路由并分析。对于海量数据,它和MapReduce类似都可以应对,但它能比后者更快地处理数据。 S4目前的缺点在于它的数据传输可靠性还不够,可能丢失数据,同时由于数据存放在内存中,一旦节点出现故障,就会丢失该节点的所有数据,这一点可以通过定期持久化来弥补(但是真的有必要吗?)。同时我认为这和它面向的场景也有关系,实时数据分析通常都是针对一些非常离散、细小的数据,从统计的角度来看,损失掉一部分数据对最后的统计结果并没有很大影响,而这部分牺牲却可以换来吞吐率的大幅提升。所以就目前来看,S4还是更适合对那些不一定非得对每条数据都仔细分析的场景,只求最后一个统计的结果来对业务做出相应的预计和调整。此外S4系统要求输入的是事件流,这就涉及到事件的生成,所以在数据流入S4以前,必须有能将数据转化为事件的系统进行中间处理。 从集群的扩展性来看,理论上可以通过增加节点应对更大的数据流,但是目前还无法在S4工作时动态增加或减少节点。所以对节点进行调整时很可能必须停下当前的工作,做不到无缝调整。而且由于S4由ZooKeeper进行集群管理,所以当集群增加到一定规模时,ZooKeeper的管理能力也有待考验。此外,仍然是因为S4无法保证数据100%的可靠传输,所以集群规模增长时,数据错误也会增长得很快。目前没有相关资料显示S4集群的规模究竟可以做到多大,但是相信未来随着数据传输可靠性的提升,会发挥很可观的作用。 在业务耦合度方面,S4完全隔离了平台和业务逻辑,业务方只需要编写PE逻辑即可,这一点类似于MapReduce中只需编写map和reduce函数,业务和平台的耦合度是非常低的。 七、前景 从目前来看,流式计算的前景非常广阔,目前已经有一些基于流的服务出台。例如Twitter的Streaming API,可以实时推送(Push)消息,Google也利用该API实现了实时搜索。未来无论从数据的产生还是流动上都会有数量级的飞跃,如何对海量数据做出及时、实时的处理,相信不论是实际业务还是数据挖掘领域都会十分关注。也可以独立开发自己的或者基于S4的流式计算平台,以契合自身的业务需求。新的信息不仅对数据分析有用,也能给予用户更好的体验(Yahoo!做S4的初衷就是提升用广告的用户体验以增加点击率)。相信S4会成为继Hadoop之后的又一分布式计算领域的干将。未来的数据将不再是一个一个包的概念,而成为一道道数据流源源不断地流入系统,产出效益。 Yahoo!S4分布式流处理引擎分析总结 S4(Simple Scalable Streaming System)是一个分布式流处理引擎,开发者可以在这个引擎基础上开发面向无界的,不间断的流数据处理应用。 什么是流数据处理应用?例如,为了个性化搜索广告,系统需要实时处理来自几百万唯一用户每秒成千上万次的查询,并即时分析用户的会话特征来提高广告相关性 预测模型的准确度。流数据处理应用要求我们的系统可以接受大量的,不间断的数据(称为流式数据),并可以迅速做出数据处理,S4正是完成这样一个需求。 S4的设计思想 S4将一个流抽象为由(K,A)形式的元素组成的序列,这里K和A分别是键和属性。在这种抽象的基础上S4设计了能够消费和发出这些(K,A)元素的组 件,也就是Process Element。Process Element在S4中是最小的数据处理单元,每个PE实例只消费事件类型,属性key,属性value都匹配的事件,并最终输出结果或者输出新的 (K,A)元素。下面这张S4论文内的图很好的诠释了上面说的内容。 图中节点表示PE,有向边表示一个(K,A)元素及其流向。流初始化为一个Key为null,事件类型为Quote的(K,A)元素,按照上文的说 法,S4中需要定义一个能消费相应事件类型,属性key,属性value事件的PE,也就是图中的PE1。当PE1完成处理后,它发出新的(K,A)元 素,即图中的事件类型为WordEvent,key为word=”x”,属性为count=x的边,最后经过多个PE处理和(K,A)事件转换得到最终结 果(这里不介绍上图的处理过程,详细看论文介绍)。 实际上,我认为S4最核心的部分就是上面的设计,它将流的处理分为多个流事件,抽象为处理图中的有向边,并且每个流事件都用(K,A)的形式表示,这种表 示方式使得事件间的转换传递很方便,这是一种借鉴了MapReduce的(key,value)的设计。同时因为流被分为多个流事件,那么就需要对应多个 处理单元,每个PE唯一处理一种事件,并且PE间独立,这也大大降低了概念复杂性和系统复杂性。我们开发者要做的就是定制个性化的PE。 PE间有事件传递的依赖性,很自然的,我们希望可以有一个PE集群,S4使用Processing Element Container的概念,将多个PE包含到同一个容器中,PEC接收源event,并最终发送结果event。PEC加上通信处理模块就形成了PE的逻 辑主机Processing Node。如下图所示: PN负责监听事件,在到达事件上执行操作(PE完成),然后通过通信层Communication Layer的协助分发事件,也可以发出输出事件。 S4通过一个hash函数,将事件路由到目标PN上,这个hash函数作用于事件的所有已知属性值上(需要配置),所以一个事件可能被路由到多个PN上。 然后PN中的事件监听器会将到来的事件传递给PEC,PEC以适当的顺序调用适当的PE(每个编键keyed的PE都会被映射到一个确定的PN上,即图中 的PE并不是物理存在一个PN相关,而是逻辑相关)。处理完成后,PN可能发出输出事件,也可以向Communication Layer请求协助向指定逻辑节点发送消息。 最后是通信层的介绍,它提供了“集群管理”,“故障恢复到备用节点”,“逻辑节点到物理节点映射”的作用。同时通信层还使用一个插件式的架构来选择网络协议,使用zookeeper在S4集群节点之间做一致性协作! S4的其他特点总结(摘自网络) S4 S4是一个通用的、分布式的、可扩展的、分区容错的、可插拔的流式系统。基于S4框架,开发者可以轻松开发面向持续流数据处理的应用。 S4的设计特点有以下几个方面。 Actor Model 为了能在普通机型构成的集群上进行分布式处理,并 且集群内部不使用共享内存,S4架构采用了Actor模式,这种模式提供了封装和地址透明语义,因此在允许应用大规模并发的同时,也提供了简单的编程接 口。S4系统通过处理单元(Processing Elements,PEs)进行计算,消息在处理单元间以数据事件的形式传送,PE消费事件,发出一个或多个可能被其他PE处理的事件,或者直接发布结 果。每个PE的状态对于其他PE不可见,PE之间唯一的交互模式就是发出事件和消费事件。框架提供了路由事件到合适的PE和创建新PE实例的功能。S4的 设计模式符合封装和地址透明的特性。 Decentralized and Symmetric Architecture 除了遵循Actor模式,S4也参照了 MapReduce模式。为了简化部署和运维,从而达到更好地稳定性和扩展性,S4采用了对等架构,集群中的所有处理节点都是等同的,没有中心控制。这种 架构将使得集群的扩展性很好,处理节点的总数理论上无上限;同时,S4将没有单点容错的问题。 Pluggable Architecture S4系统使用Java开发,采用了极富层次的模块化编程,每个通用功能点都尽量抽象出来作为通用模块,而且尽可能让各模块实现可定制化。 Partial Fault-Tolerance 基于Zookeeper服务的集群管理层将会自动路由事件从失效节点到其他节点。除非显式保存到持久性存储,否则节点故障时,节点上处理事件的状态会丢失。 Object Oriented 节点间通信采用“Plain Old Java Objects”(POJOs)模式,应用开发者不需要写Schemas 或用哈希表来在节点间发送Tuples。 S4的功能组件分3大类,Clients、Adapters和PNode Cluster,图2显示了S4系统框架。 图2 Yahoo! S4流式系统框架结构图 S4提供Client Adapter,允许第三方客户端向S4集群发送事件和接收事件。Adapter实现了基于JSON的API,支持多语言实现的客户端驱动。 Client通过Driver组件与 Adapter进行交互,Adapter也是一个Cluster,其中有多个Adapter结点,Client可以通过多个Driver与多个 Adapter进行通信,这样可以保证单个Client在分发大数据量时Adapter不会成为瓶颈,也可以确保系统支持多个Client应用并发执行的 快速、高效和可靠性。 在Adapter中,真正与Client交互的是 其Stub组件,该组件实现了管理Client与Adapter之间通过TCP/IP协议进行通信的功能。GenericJsonClientStub这 个类支持将事件在Client与Adapter之间以JSON的形式转换,从而支持更多种类型的Client应用。不同的Client可以配置不同的 Stub来与Adapter进行通信,用户可以定义自己的Stub来实现自己想要的业务逻辑,这样也使得Client的行为更加多样性、个性化。 S4实现机理介绍 S4是yahoo开源的一套流式数据处理系统,什么是流式处理系统,顾名思义,我们把实时处理模块作为一个黑盒(这个黑盒的具体实现我们在下文中讨论),这个黑盒有很多接入管道,待处理数据可以从这些管道实时接入,并实时从出口产生分析后的结果数据。构建的整套系统可以称为流式数据处理系统。 该系统主要的应用场景可以在:系统状态实时监控,用户行为实时监控,营销&广告效果实时统计。 在这里先举个例子:用户行为实时监控: 用户浏览页面行为的log被实时接入到流式数据处理系统,经过实时分析后,系统可以实时输出到每天到当前时间为止,访问网站各个页面的浏览量和用户数(用户数需要针对用户进行去重)。而当我们遇到的计算任务和分析数据都非常庞大时,分布式解决便顺理成章的成为救命稻草。 接下来我们介绍S4系统的实现: S4的主要特点:1.decentralize设计,2.同质化设计 Decentralize:系统并没有任何master机器,可以近似认为没有任何主控角色,这样的设计使得系统扩展性有非常好的表现,系统可以非常方便的按照处理的数据量压力进行横向扩容。 同质化设计:系统中所有模块执行的程序都一样,在系统设计时也是一视同仁,并没有任何不同,简单的设计原则,便于实现和验证,而且并不失系统功能性。 我们先看一下系统设计架构,非常简单,系统只分为两层网络层和业务逻辑层 接下来我们看系统是如果运作的,其实设计理念非常简单,用户逻辑部分主要是Event和EventProcess以及dispatcher的封装。 不同的数据接入进来被抽象成不同的Event,而每个Event会有对应的EventProcess进行处理,EventProcess处理Event的过程中也会产生很多中间结果的Event类型,而这些Event又被PUSH的网络层,网络层通过配置的dispatcher进行转发。 为了便于理解我们再举一个稍微复杂的一个统计需求的具体例子: 假如我们有银行用户实时取款记录如下: 类型1:用户|地区|取款金额|其他冗余信息 类型2:用户|地区|存款金额|其他冗余信息 银行需要监控资金流动性,需要统计每个自然月到当天为止,每个地区的存款金额总数和每个地区的取款总数。 于是乎:数据接入的类型就有两种SaveEvent 和 TakeEvent 1.SaveEvent处理 SaveEvent被网络层接收到后,调用用户配置的dispatcher接口的dispatch方法,将该Event发送到某台机器,这台机器反序列化数据后得到该Event为SaveEvent便调用SaveEventProcess进行处理,去除冗余信息生成SaveTakeEvent并调用该Event对用的dispatcher接口,将该Event发送到某台机器。 2. TakeEvent处理 TakeEvent被网络层接收到后,调用用户配置的dispatcher接口的dispatch方法,将该Event发送到某台机器,这台机器反序列化数据后得到该Event为TakeEvent便调用TakeEventProcess进行处理,去除冗余信息也生成SaveTakeEvent并调用该Event对用的dispatcher接口,将该Event发送到某台机器。 3. SaveTakeEvent处理 同一地区的SaveTakeEvent被发送到一起,系统对Event反序列化知道该Event为SaveTakeEvent类型,便调用TakeAndSaveEventProcess进行处理,将Save操作和Take操作的钱数进行对应的累加。 这里面有几个关键点: 1. dispatcher接口,这个接口起到的重要职责有两个:1.负载均衡。2:将关联数据发布到同一台机器。 2. 不同类型的Event有对应的EvenProcess进行处理。 如上例,distpatcher接口完全可以按照地区进行散列,既保证同一地区的数据落在一起,又保证不同地区数据分散均匀。 上例可以用一个如下一个有向无环图进行表示: 接下来我们就看S4的流程控制如何实现,其实很简单,上述过程基本上也已经描述清楚,S4充分利用了java的反射结合通过数据序列化和凡序列化实现了上述框架。一个Event类型的数据被disaptch不同机器上,成功反序列化出来以后,可以通过反射机制得到Event类型,然后再通过反射机制生成处理这个Event的EventProcess,然后缓存EventProcess实例,用以不断接受此类Event进行处理,产生的中间结果Event也会通过网络层传送出去,并再次经历上述过程,直到DAG任务的结束。 通过Event和EventProcess的对应可以非常方便的构建出一个DAG 还有一个关键就是刚才的dispatcher,这个不再详细赘述了,至于系统通过zookeeper进行名字映射等细节问题,这里也不做过多介绍了。 这样的设计可以看到所有的机器上的程序都一样,任何一台机器都需要承载Event和EventProcess,而且毫无二致,dispatcher也通过反射机制得到Event的某些关键属性值,并通过这个属性值进行dispatch。 上述只是S4的大致实现思路,详细细节分析以后补充。 分布式流式计算平台——S4 本文是作者在充分阅读和理解Yahoo!最新发布的技术论文《S4:Distributed Stream Computing Platform》的基础上,所做出的知识分享。 S4是Yahoo!在2010年10月开源的一套通用、分布式、可扩展、部分容错、具备可插拔功能的平台。这套平台主要是为了方便开发者开发处理流式数据(continuous unbounded streams of data)的应用。项目官方网站为:http://s4.io/。同时,S4的开发者也发表了一篇技术论文《S4:Distributed Stream Computing Platform》来介绍S4的设计。下面我们就来学习这篇论文。 开发动机 “We designed this engine to solve real-world problems in the context of search applications that use data mining and machine learning algorithms.” … “To process user feedback, we developed S4, a low latency, scalable stream processing engine.” Yahoo!之所以开发S4系统,主要是为了解决它现实的问题:搜索广告的展现。搜索广告是当前各大搜索引擎的主要收入来源,用户发出查询请求,搜索引擎在返回正常结果的同时也会返回相关广告,而广告是按照点击付费。为了在最好的位置,放置最相关(也就是用户最有可能点击)的广告,各大搜索引擎使用了大量的数据挖掘和机器学习算法来进行相关性计算,以便提高收入,满足用户需求。其中很重要的一点就是要不断分析用户的点击反馈,以便捕获用户的行为。S4最初主要还只是用来处理用户的点击反馈。 “The streaming paradigm dictates a very different architecture than the one used in batch processing. Attempting to build a general- purpose platform for both batch and stream computing would result in a highly complex system that may end up not being optimal for either task.” 那么Yahoo!为什么没有选择Hadoop来处理呢? MapReduce系统主要解决的是对静态数据的批量处理,即当前的MapReduce系统实现启动计算时,一般数据已经到位了(比如保存到了分布式文件系统上)。 而流式计算系统在启动时,一般数据并没有完全到位,而是源源不断地流入,并且不像批处理系统重视的是总数据处理的吞吐,而是对数据处理的latency,即希望进入的数据越快处理越好。 当然,现在也有很多基于Hadoop系统来处理流式数据。一般有以下几种方式。 Micro-batchinMapReduce:就是把流式的数据按照时间或者大小形成小的静态数据,然后定期启动MapReduce来计算。 Continuous MapReduce:Hadoop Online(http://www.eecs.berkeley.edu/Pubs/TechRpts/2009/EECS-2009-136.html)通过实现作业内的数据传输Pipeline和作业间的数据传输Pipeline,可以实现online aggregation和continuous queries。当前MapReduce模型中,只有Map中间结果完全产生后,Reduce才会过来拖数据,等所有Map数据都拖成功后,才能计算。Hadoop Online实现了Map到Reduce间的数据Pipeline,使得可以在Map产生部分数据后,就可以送到Reduce端,以便Reduce可以提前或者定期计算。 Dynamic add input:百度的一种实现,用来解决计算时数据还没有到位的问题。作业可以在数据还没有完全到位的情况下启动,当新数据累积到一定量时,通过一个命令行接口,向运行中的作业动态增加新的输入。通过这种方式,大大减少了处理大数据作业时等待数据到位的时间,在依次执行多个作业时,也会有时间收益。 在论文中,对类似于第一种的方式,分析了它的缺点。如果将数据流切成较小的data segment,就会增加启动作业的overhead,同时使得维护segment之间的依赖关系变得更加复杂;但如果切得较大,那么处理的latency就会比较长。 随着大量实时应用的发展,比如实时搜索、实时交易系统、实时欺
/
本文档为【yahoo_S4】,请使用软件OFFICE或WPS软件打开。作品中的文字与图均可以修改和编辑, 图片更改请在作品中右键图片并更换,文字修改请直接点击文字进行修改,也可以新增和删除文档中的内容。
[版权声明] 本站所有资料为用户分享产生,若发现您的权利被侵害,请联系客服邮件isharekefu@iask.cn,我们尽快处理。 本作品所展示的图片、画像、字体、音乐的版权可能需版权方额外授权,请谨慎使用。 网站提供的党政主题相关内容(国旗、国徽、党徽..)目的在于配合国家政策宣传,仅限个人学习分享使用,禁止用于任何广告和商用目的。

历史搜索

    清空历史搜索