Giraph源代码分析(九)—— Aggregators 原理解析
Giraph中Aggregator的基本使用方法请參考官方文档:http://giraph.apache.org/aggregators.html 。本文重点在解析Giraph怎样实现Aggregators,后文用图示的方法描写叙述了Aggregator的运行过程。
基本原理:在每一个超级步中,每一个Worker计算本地的聚集值。
超级步计算完毕后,把本地的聚集值发送给Master汇总。在MasterCompute()运行后,把全局的聚集值回发给全部的Workers。
缺点:当某个应用(或算法)使用了多个聚集器(Aggregators),Master要完毕全部聚集器的计算。由于Master要接受、处理、发送大量的数据,不管是在计算方面还是网络通信层次,都会导致Master成为系统瓶颈。
改进:採用分片聚集 (sharded aggregators) . 在每一个超级步的后。每一个聚集器被派发给一个Worker。该Worker接受和聚集其它Workers发送给该聚集器的值。
然后Workers把自己的全部的聚集器发送给Master。这样Master就无需运行不论什么聚集,仅仅是接收每一个聚集器的终于值。在MasterCompute.compute运行后,Master不是直接把全部的聚集器发送给全部的Workers,而是发送给聚集器所属的Worker。然后每一个Worker再把其上的聚集器发送给全部的Workers.
首先给出Master <-- > Worker间, Worker <--> Worker间通信协议,在每一个类中的doRequest(ServerData serverData)方法中会解析并存储收到的消息。
1). org.apache.giraph.comm.requests.SendWorkerAggregatorsRequest 类 . Worker --> Worker Owner
功能:每一个worker把当前超步的局部 aggregated values 发送到该Aggregator的拥有者。
2). org.apache.giraph.comm.requests.SendAggregatorsToMasterRequest 类. Worker Owner--> Master
功能:每一个Worker把自己所拥有的Aggregator的终于 aggregated values 发送给 master。
3). org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest 类. Master --> Worker Owner.
功能:master把终于的 aggregated values 或aggregators 发送给该Aggregator的拥有者。
4). org.apache.giraph.comm.requests.SendAggregatorsToWorkerRequest 类。 Worker Owner--> Worker
功能: 发送终于的 aggregated values 到 其它workers。发送者为该Aggregator的拥有者。接受者为除发送者之外的全部workers。
Aggregator分类和 注冊
Giraph中把Aggregator分为两类:regular aggregators和persistent aggregators。
regular aggregators的值在每一个超级步開始会被重置为初始值,然而persistent aggregators的值在整个应用(算法)中一直保持。
举例来说。若LongSumAggregator在每一个顶点的compute()方法中加1。假设使用regular aggregators,在每一个超级步中就能够读取前一个超级步的參与计算的顶点总数;假设使用persistent aggregators,就能够获取前面全部超级步中參与计算的顶点总和。
在使用aggregator之前,必需要在mastes上Registering aggregators。做法:继承org.apache.giraph.master.DefaultMasterCompute类,重写 void initalize() 方法。
在该方法中注冊aggregators。语法例如以下:
registerAggregator(aggregatorName, aggregatorClass)
registerPersistentAggregator(aggregatorName, aggregatorClass)
说明:MasterCompute.initalize()方法仅仅在第 INPUT_SUPERSTEP (-1) 超级步中运行一次。详细在 BSPServiceMaster.runMasterCompute(long superstep)方法中。在MasterCompute.compute()方法中,能够使用下述方法读取或改动聚集器的值。
getAggregatedValue(aggregatorName) //获取前一个超级步的聚集器值
setAggregatedValue(aggregatorName, aggregatedValue) //改动聚集器的值
MasterCompute.compute()总是在Vertex.compute()前运行。 因为第 INPUT_SUPERSTEP ( -1)个超级步进行的是数据的载入和重分布过程,不计算Vertex.compute()。第0个超级步Vertex.compute()又是在MasterCompute.compute()方法后运行。故对第 -1 、 0个超级步MasterCompute.compute()方法中获得的聚集器值均为其初始值。从第1个超级步開始。MasterCompute.compute()方法才获得了全部Vertex.compute()在第0个超级步聚集的值。
1. 从第0个超级步開始。BspServiceMaster调用MasterAggregatorHandler类的finishSuperStep(MasterClient masterClient) 方法把聚集器派发给Worker。聚集器的value为上一个超级步的全局聚集值(final aggregated values)。次为初始值。先给出MasterAggregatorHandler的类继承关系。例如以下:
finishSuperStep(MasterClient masterClient) 方法核心内容例如以下:
/** * Finalize aggregators for current superstep and share them with workers */ public void finishSuperstep(MasterClient masterClient) { for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) { if (aggregator.isChanged()) { // if master compute changed the value, use the one he chose aggregator.setPreviousAggregatedValue( aggregator.getCurrentAggregatedValue()); // reset aggregator for the next superstep aggregator.resetCurrentAggregator(); } } /** * 把聚集器发送给所属的Worker。发送内容: * 1). Name of the aggregator * 2). Class of the aggregator * 3). Value of the aggretator */ try { for (Map.Entry<String, AggregatorWrapper<Writable>> entry : aggregatorMap.entrySet()) { masterClient.sendAggregator(entry.getKey(), entry.getValue().getAggregatorClass(), entry.getValue().getPreviousAggregatedValue()); } masterClient.finishSendingAggregatedValues(); } catch (IOException e) { throw new IllegalStateException("finishSuperstep: " + "IOException occurred while sending aggregators", e); } }
问题1:怎样确定aggregator的Worker Owner ?
答:依据aggregator的Name来确定它所属的Worker。计算方法例如以下:
/** * 依据aggregatorName和全部的workers列表来计算aggregator所属的Worker * 參数aggregatorName:Name of the aggregator * 參数workers: Workers的list列表 * 返回值:Worker which owns the aggregator */ public static WorkerInfo getOwner(String aggregatorName,List<WorkerInfo> workers) { //用aggregatorName的HashCode()值模以 Workers的总数目 int index = Math.abs(aggregatorName.hashCode() % workers.size()); return workers.get(index); //返回aggregator所属的Worker }
问题2:Worker 怎样推断自身是否接收完自己所拥有的aggregators?
答:Master给某个Worker发送aggregators时。同一时候发送到该Worker的aggregators数目。使用的 SendAggregatorsToOwnerRequest类对消息进行封装和解析。
2. Worker接受Master发送的Aggregator,Worker把接收到的聚集体值发送给其它全部Workers,然后每一个Workers就会得到上一个超级步的全局聚集值。
由前文知道,每一个Worker都有一个ServerData对象,ServerData类中关于Aggregator的两个成员变量例如以下:
// 保存Worker在当前超步拥有的aggregators private final OwnerAggregatorServerData ownerAggregator; // 保存前一个超步的aggregators private final AllAggregatorServerData allAggregatorData;
能够看到,ownerAggregatorData用来存储在当前超步Master发送给Worker的聚集器,allAggregatorData用来保存上一个超级步全局的聚集值。ownerAggregatorData和allAggregatorData值的初始化在SendAggregatorsToOwnerRequest 类中的doRequest(ServerData serverData)方法中,例如以下:
public void doRequest(ServerData serverData) { DataInput input = getDataInput(); AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData(); try { //收到的Aggregators数目。在CountingOutputStream类中有计数器counter, //每向输出流中加入一个聚集器对象,计数加1. 发送时,在flush方法中把该值插入到输出流前面。int numAggregators = input.readInt(); for (int i = 0; i < numAggregators; i++) { String aggregatorName = input.readUTF(); String aggregatorClassName = input.readUTF(); if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) { LongWritable count = new LongWritable(0); //Master发送给该Worker的requests总数目. count.readFields(input); aggregatorData.receivedRequestCountFromMaster(count.get(), getSenderTaskId()); } else { Class<Aggregator<Writable>> aggregatorClass = AggregatorUtils.getAggregatorClass(aggregatorClassName); aggregatorData.registerAggregatorClass(aggregatorName, aggregatorClass); Writable aggregatorValue = aggregatorData.createAggregatorInitialValue(aggregatorName); aggregatorValue.readFields(input); //把收到的上一次全局聚集的值赋值给allAggregatorData aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue); //ownerAggregatorData仅仅接受聚集器 serverData.getOwnerAggregatorData().registerAggregator( aggregatorName, aggregatorClass); } } } catch (IOException e) { throw new IllegalStateException("doRequest: " + "IOException occurred while processing request", e); } //接受一个 request,计数减1。同一时候把收到的Data加入到allAggregatorServerData的List<byte[]> masterData中 aggregatorData.receivedRequestFromMaster(getData()); }
每一个Worker在開始计算前。会调用BspServiceWorker类的prepareSuperStep()方法来进行聚集器值的派发和接受其它Workers发送的聚集器值。调用关系例如以下:
BspServiceWorker类的prepareSuperStep()方法例如以下:
@Override public void prepareSuperstep() { if (getSuperstep() != INPUT_SUPERSTEP) { /* * aggregatorHandler为WorkerAggregatorHandler类型, * 可參考上文中MasterAggregatorHandler的类继承关系. * workerAggregatorRequestProcessor声明为WorkerAggregatorRequestProcessor(接口) * 类型,实际为NettyWorkerAggregatorRequestProcessor的实例。 * 用于Worker间发送聚集器的值。*/ aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor); } }
WorkerAggregatorHandler类的prepareSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法例如以下:
public void prepareSuperstep(WorkerAggregatorRequestProcessor requestProcessor) { AllAggregatorServerData allAggregatorData = serviceWorker.getServerData().getAllAggregatorData(); /** * 等待直到Master发送给该Worker的聚集器都已接受完, * 返回值为Master发送给该Worker的全部Data(聚集器) */ Iterable<byte[]> dataToDistribute = allAggregatorData.getDataFromMasterWhenReady( serviceWorker.getMasterInfo()); // 把从Master收到的Data(聚集器)发送给其它全部Workers requestProcessor.distributeAggregators(dataToDistribute); // 等待直到接受完其它Workers发送给该Workers的聚集器 allAggregatorData.fillNextSuperstepMapsWhenReady( getOtherWorkerIdsSet(), previousAggregatedValueMap, currentAggregatorMap); // 仅仅是清空allAggregatorServerData的List<byte[]> masterData对象 // 为下一个超级步接受Master发送的聚集器做准备 allAggregatorData.reset(); }
以下详述Worker怎样判定已接收全然部Master发送的全部Request ? 主要目的在于描写叙述分布式环境下线程间怎样协作。
在AllAggregatorServerData类中定义了TaskIdsPermitBarrier类型的变量masterBarrier,用来推断是否接收完Master发送的Request. TaskIdsPermitBarrier类中主要使用wait()、notifyAll()等方法来控制。当获得的aggregatorName等于AggregatorUtils.SPECIAL_COUNT_AGGREGATOR时,会调用requirePermits(long permits,int taskId)来添加接收的arrivedTaskIds和须要等待的request数目waitingOnPermits. 接受一个Request
/** * Require more permits. This will increase the number of times permits * were required. Doesn't wait for permits to become available. * * @param permits Number of permits to require * @param taskId Task id which required permits */ public synchronized void requirePermits(long permits, int taskId) { arrivedTaskIds.add(taskId); waitingOnPermits += permits; notifyAll(); }
相关文章