Hadoop Map Reduce 用于 Google 网络图

2022-01-14 00:00:00 graph hadoop mapreduce java

我们的任务是创建 map reduce 函数,该函数将为 google web 图表中的每个节点 n 输出可以从节点 n 在 3 跳内到达的节点.(实际数据可以在这里找到:http://snap.stanford.edu/data/web-Google.html)这是列表中项目的示例:

we have been given as an assignment the task of creating map reduce functions that will output for each node n in the google web graph list the nodes that you can go from node n in 3 hops. (The actual data can be found here: http://snap.stanford.edu/data/web-Google.html) Here's an example of how the items in the list will be :

1 2 
1 3 
2 4 
3 4 
3 5 
4 1 
4 5 
4 6 
5 6 

从上面的示例图表将是这个

From the above an example graph will be this

在上面的简化示例中,节点 1 的示例路径是α [1 -> 2 -> 4 -> 1], [1 -> 2 -> 4 -> 5], [1 -> 2 -> 4 -> 6], [1 -> 3 -> 4 ->1],[1 -> 3 -> 4 -> 5], [1 -> 3 -> 4 -> 6] και [1 -> 3 -> 5 -> 6]因此 map reduce 将为节点 1 输出顶点 1,5,6 ((a) 每个顶点只能计算一次,并且(b) 我们仅在存在长度为 3 的圆形路径时才包含当前顶点,如示例 [1 -> 2 -> 4 -> 1] 和 [1 -> 3 -> 4 -> 1].

In the above simplified example the paths for example of node 1 are α [1 -> 2 -> 4 -> 1], [1 -> 2 -> 4 -> 5], [1 -> 2 -> 4 -> 6], [1 -> 3 -> 4 -> 1], [1 -> 3 -> 4 -> 5], [1 -> 3 -> 4 -> 6] και [1 -> 3 -> 5 -> 6] and thus the map reduce will output for node 1 the vertices 1,5,6 ( (a) each vertex must be counted only once, and (b) we include the current vertex only when there is a circular path of length 3, as is the example [1 -> 2 -> 4 -> 1] and [1 -> 3 -> 4 -> 1].

我很迷茫,因为我认为这需要图论和算法的知识,而我们还没有学到任何与此相关的知识.

I am very lost because i believe this requires knowledge of graph theory and algorithms and we haven't been taught anything related to that.

如果有人能给我一个正确的开始方向,我将不胜感激.(我已经研究过最短路径理论等,但我不确定它是否对这个特定的练习有用)

I will greatly appreciate if somebody can give me a right direction to start . ( I've looked into shortest path theory and such but i'm not sure if it's going to be useful for this particular exercise)

提前致谢,祝您假期愉快.

Thanks in advance, and have a good Holidays season.

编辑

我尝试创建附属列表,但是虽然我希望输出是vertexID"node1 node2 node3 node4...",但我看到在输出文件中我的减速器将每个顶点 ID 的列表拆分为一对三.

I try to create the adjucency list , but while i expect the output to be "vertexID" "node1 node2 node3 node4..." i see that in the output file my reducer splits the list for each vertex Id into pairs of three.

例如,如果我有连接到 Z、X、C、V、B、N、M、G、H、J、K、L 的顶点 A,它会将其输出为

for example if i have vertex A that connects to Z,X,C,V,B,N,M,,G,H,J,K,L I it outputs this as

A Z,X,C

A V,B,N

A M,G,H

A J,K,L

下面是我的映射器和reducer

below are my mapper and reducer

public class AdjacentsListDriver extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {



        Configuration conf = getConf();
        Job job = Job.getInstance(conf);
        job.setJobName("Test driver");
        job.setJarByClass(AdjacentsListDriver.class);

        String[] arg0 = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (arg0.length != 2) {
            System.err.println("Usage: hadoop jar <my_jar> <input_dir> <output_dir>");
            System.exit(1);
        }

        Path in = new Path(arg0[0]);
        Path out = new Path(arg0[1]);

        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setMapperClass(ListMapper.class);
        job.setReducerClass(ListReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);    
        job.waitForCompletion(true);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new AdjacentsListDriver(), args);
        System.exit(res);

    }



}





/**
 * @author George
 * Theoretically this takes a key(vertexID) and maps all nodes that are connected to it in one hop....
 *
 */
public class ListMapper extends Mapper<LongWritable, Text, Text, Text> {

    private Text vertexID = new Text();
    //private LongWritable vertice= new LongWritable(0);
    private Text vertice=new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();
        StringTokenizer itr = new StringTokenizer(line,"
");
        StringTokenizer itrInside;

        //vertice=new LongWritable(Long.valueOf(value.toString()).longValue());


        while (itr.hasMoreTokens()) {
            if(itr.countTokens() > 2){

            }//ignore first line ??
            else{
                itrInside=new StringTokenizer(itr.toString());
                vertexID.set(itr.nextToken());

                while(itrInside.hasMoreTokens()){               
                    vertice.set(itrInside.nextToken());
                    context.write(vertexID, value);
                }           
            }
        }

    }

}

@override
public class ListReducer extends Reducer<Text, Text, Text, Text>{
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        String vertices="";

        for (Text value : values) {
            if(vertices=="")
                vertices+=value.toString();
            else
                vertices=vertices+","+value.toString();         
        }

        Text value=new Text();
        value.set(vertices);
        context.write(key, value);

    }

}

推荐答案

因为这是你的(家庭作业)作业,我不会包含 Java/Hadoop 解决方案,但我会尝试用 MapReduce 来做图计算的概念对你来说更清楚一点,这样你就可以自己实现了.

Since it is your (homework) assignment, I will not include a Java/Hadoop solution, but I will attempt to make the concept of graph computation with MapReduce a bit clearer for you so you can implement it yourself.

对于每个顶点,您想要 恰好 n 跳的顶点.在查看最短路径算法时,您走在了正确的道路上,但是通过简单的广度优先搜索可以更轻松地解决它.

You want, for each vertex, the vertices that are exactly n-hops away. You were on the right path while looking at the shortest path algorithm, but it can be solved easier with a simple breadth-first search.

但是,当使用 MapReduce 处理图形时,您需要更深入地研究顶点之间的消息传递.图算法通常用多个作业来表示,其中 map 和 reduce 阶段具有以下分配:

When crunching graphs with MapReduce however, you will need to dive a bit deeper into message passing between vertices. Graph algorithms are usually expressed with multiple jobs where the map and reduce stage have the following assignment:

  • Mapper:向另一个顶点发送消息(通常针对顶点的每个邻居)
  • Reducer:对传入消息进行分组,加入核心图并减少它们,有时会发送更多消息.
  • Mapper: Emit a message to another vertex (usually for every neighbour of a vertex)
  • Reducer: Group incoming messages, joining the core graph and reduce them, sometimes sending more messages.

每个作业都将始终对前一个作业的输出进行操作,直到您达到结果或放弃为止.

Each job will always operate on the output of the previous job until you have reached either your result, or give up.

数据准备

在您真正想要运行图形算法之前,请确保您的数据采用邻接列表的形式.它将使以下迭代算法更容易实现.

Before you actually want to run your graph algorithm, make sure that your data is in the form of an adjacency list. It will make the following iteration algorithms much easier to implement.

因此,您需要按顶点 ID 对它们进行分组,而不是您的邻接元组.这是一些伪代码:

So instead of your adjacency tuples, you will need to group them by the vertex id. Here is some pseudo code:

map input tuple (X, Y): 
   emit (X, Y)

reduce input (X, Y[]) :
  emit (X, Y[])

基本上,您只是按顶点 ID 进行分组,因此您的输入数据是其邻居的键(顶点 ID)(可以从该特定键顶点 ID 访问的顶点 ID 列表).如果想节省资源,可以使用reducer作为combiner.

Basically you are just grouping by the vertex id, so your input data is a key (vertex id) to its neighbours (list of vertex ids that can be reached from that particular key vertex id). If you want to save resources, you can use the reducer as a combiner.

算法

就像我已经提到的,您只需要广度优先搜索算法.您将为图中的每个顶点执行广度优先搜索算法,当遇到邻居时,您将只增加一个跳跃计数器,告诉我们距离起始顶点有多远(这是最短路径算法的最简单情况,即当边缘权重为 1).

Like I already mentioned you will only need a breadth first search algorithm. You will execute the breadth first search algorithm for every vertex in your graph, when hitting a neighbour you will just increment a hop-counter that tells how far we are away from our start vertex (which is the simplest case of a shortest path algorithm, namely when the edge weight is 1).

让我给你看一张简单的图片,用一个简单的图表来描述它.橙色表示已访问,蓝色表示未访问,绿色是我们的结果.括号中是跳数计数器.

Let me show you a simple picture describing it with a simplistic graph. Orange means visited, blue unvisited and green is our result. In the parenthesis is the hop counter.

你看,在每次迭代中,我们都会为每个顶点设置一个 hopcounter.如果我们碰到一个新的顶点,我们只会将它加一.如果我们命中第 n 个顶点,我们会以某种方式标记它以供以后查找.

You see, at every iteration we are setting a hopcounter for every vertex. If we hit a new vertex, we will just increment it by one. If we hit the n-th vertex, we will somehow mark it for later lookup.

使用 MapReduce 分发

虽然对每个顶点运行广度优先搜索确实很浪费,但我们可以通过并行化来做得更好.消息传递到这里.就像上图一样,我们在映射步骤中获得的每个顶点最初都会向其邻居发送一条消息,其中包含以下有效负载:

While it seems really wasteful to run a breadth first search for every vertex, we can do better by parallelizing it. Here comes the message passing into play. Just like in that picture above, every vertex we get in our mapping step will initially send a message to its neighbours containing following payload:

HopMessage: Origin (VertexID) | HopCounter(Integer)

在第一次迭代中,我们将尝试将消息发送给邻居以开始计算.否则,我们将只代理图表或传入消息.

At the first iteration we will attempt to send the message to the neighbours to kick off the computation. Otherwise we will just proxy the graph or incoming messages.

因此,在您准备好数据后的第一份工作中,您的 map 和 reduce 看起来像这样:

So in your very first job after the data preparation your map and reduce looks like that:

map input (VertexID key, either HopMessage or List<VertexID> adjacents):
  if(iterationNumber == 1): // only in the first iteration to kick off
     for neighbour in adjacents:
        emit (neighbour, new HopMessage(key, 0))
  emit (key, adjacents or HopMessage) // for joining in the reducer

reducer 现在在图和消息之间进行简单的连接,主要是为了获取顶点的邻居,从而导致输入(以我的简单图为例):

The reducer now does a simple join between the graph and the messages, majorly to get the neighbours of a vertex, leading to that input (taking my simplistic graph):

1 2 // graph 
2 1 // hop message
2 3 // graph
3 1 // hop message
3 4 // graph
4 1 // hop message
4 - // graph

在 reducer 步骤中,我们将再次将消息转发给邻居,并检查增加后的 hop counter 是否已经达到 3.

In the reducer step we will just forward the messages to the neighbours again and check if the hop counter has reached 3 already after incrementing.

reducer input(VertexID key, List<either HopMessage or List<VertexID> neighbours> values):
 for hopMessage in values:
    hopMessage.counter += 1
    if (hopMessage.counter == 3) 
       emit to some external resource (HopMessage.origin, key)
    else 
       for neighbour in neighbours of key:
          emit (neighbour, hopMessage)
    emit (key, neighbours)

正如您所看到的,这里可能会变得非常混乱:您需要管理两种不同类型的消息,然后还要写入一些外部资源,以跟踪正好 3 跳外的顶点​​.

As you can see, it can get really messy here: You need to manage two different kinds of messages and then also write to some external resource that will keep track of the vertices that are exactly 3 hops away.

只要有要发送的 HopMessage,您就可以安排迭代的作业.这很容易出现图表中的循环问题,因为在这种情况下您将无限增加 hopcounter.所以我建议要么用每条消息发送到目前为止的完整遍历路径(相当浪费),要么简单地限制要进行的迭代次数.在 n=3 的情况下,不需要超过 3 次作业迭代.

You schedule jobs that iterate as long as there are HopMessages to be sent. This is prone to problems with cycles in the graph, because you will infinitely increment the hopcounter in this case. So I suggest to either send the complete traversed path so far with every message (quite wasteful) or simply cap the number of iterations to take. In the n=3 case, more than 3 job iterations are not needed.

有很多博客和项目将为您提供有关如何处理 Hadoop 中的每个问题的示例.至少我在我的博客中写过关于 MapReduce 中的图形处理的文章,你可以在我的 github 上找到一些示例.

There are plenty of blogs and projects that will give you examples on how to deal with each of those problems in Hadoop. At least I have written about graph crunching in MapReduce in my blog and you can find some examples on my github.

清理输出数据

最后你会得到一堆包含顶点 -> 顶点映射的文件.您可以像在准备中那样减少它们.

In the end you will have a bunch of files that contain a vertex -> vertex mapping. You can reduce them the same way you have done in the preparation.

使用 Pregel 的更好方法

处理图的一种不那么繁琐的方法是使用 Pregel 方式来表达图计算.Pregel 正在使用以顶点为中心的模型,并且更容易表达这种广度优先的计算.

A less cumbersome way of dealing with graphs is to use the Pregel-way of expressing a graph computation. Pregel is using a vertex-centric model and makes it more easier to express such breadth-first computations.

这是使用 Apache Hama 的上述算法的简单示例:

Here is a simple example of the above algorithm using Apache Hama:

  public static class NthHopVertex extends Vertex<Text, NullWritable, HopMessage> {

    @Override
    public void compute(Iterable<HopMessage> messages) throws IOException {
      if (getSuperstepCount() == 0L) {
        HopMessage msg = new HopMessage();
        msg.origin = getVertexID().toString();
        msg.hopCounter = 0;
        sendMessageToNeighbors(msg);
      } else {
        for (HopMessage message : messages) {
          message.hopCounter += 1;
          if (message.hopCounter == 3) {
            getPeer().write(new Text(message.origin), getVertexID());
            voteToHalt();
          } else {
            sendMessageToNeighbors(message);
          }

        }
      }
    }
  }

顺便说一句,在您的示例中创建的新图表如下所示:

BTW The new graph that was created in your example looks like the following:

1=[1, 5, 6]
2=[2, 3, 6]
3=[2, 3, 6]
4=[4, 5]

这是完整的 Hama Graph 实现:

Here is the full Hama Graph implementation:

https://github.com/thomasjungblut/tjungblut-graph/blob/master/src/de/jungblut/graph/bsp/NthHop.java

相关文章