使用 Eclipse 开发、测试和调试 Hadoop map/reduce 作业
在 Eclipse 中开发 Java Map Reduce 作业有哪些选择?我的最终目标是在我的 amazon Hadoop 集群上运行我开发的 map/reduce 逻辑,但我想先在本地机器上测试逻辑并在其中放置断点,然后再将其部署到更大的集群.
What are my options for developing Java Map Reduce jobs in Eclipse? My final goal is to run my developed map/reduce logic on my amazon Hadoop cluster but I would like to test the logic on my local machine first and put break points in it before deploying it to a larger cluster.
我看到有一个适用于 Eclipse 的 Hadoop 插件,它看起来很旧(如果我错了,请纠正我),一家名为 Karmasphere 的公司为 ecplise 和 Hadoop 提供了一些东西,但我不确定它是否仍然可用.
I see there is a Hadoop Plug-in for Eclipse which looks old (correct me if I am wrong) and a company called Karmasphere had something for ecplise and Hadoop but I am not sure if that is still available.
您如何使用 Eclipse 开发、测试和调试您的 map/reduce 作业?
How do you go about developing, testing and debugging your map/reduce job with Eclipse?
推荐答案
我在 Eclipse 中开发 Cassandra/Hadoop 应用程序:
I develop Cassandra/Hadoop applications in Eclipse by:
使用 maven (m2e) 为我的 Eclipse 项目收集和配置依赖项(Hadoop、Cassandra、Pig 等)
Using maven (m2e) to gather and configure the dependencies (Hadoop, Cassandra, Pig, etc.) for my Eclipse projects
创建测试用例(src/test/java 中的类)来测试我的映射器和化简器.诀窍是使用扩展 RecordWriter 和 StatusReporter 的内部类动态构建上下文对象.如果你这样做,那么在你调用 setup/map/cleanup 或 setup/reduce/cleanup 之后,你可以断言正确的键/值对和上下文信息是由 mapper 或 reducer 编写的.mapred 和 mapreduce 中的上下文构造函数看起来很难看,但您会发现这些类很容易实例化.
Creating test cases (classes in src/test/java) to test my mappers and reducers. The trick is to build a context object on the fly using inner classes that extend RecordWriter and StatusReporter. If you do this then after you invoke setup/map/cleanup or setup/reduce/cleanup you can assert the correct key/value pairs and context info were written by the mapper or reducer. The constructors for contexts in both mapred and mapreduce look ugly, but you'll find the classes are pretty easy to instantiate.
一旦您编写了这些测试,maven 就会在您每次构建时自动调用它们.
Once you write these tests maven will invoke them automatically every time you build.
您可以通过选择项目并执行 Run --> Maven Test 手动调用测试.事实证明这非常方便,因为测试是在调试模式下调用的,您可以在映射器和化简器中设置断点,并执行 Eclipse 允许您在调试中执行的所有很酷的事情.
You can invoke the tests manually by selecting the project and doing a Run --> Maven Test. This turns out to be really handy because the tests are invoked in debug mode and you can set breakpoints in your mappers and reducers and do all the cool things Eclipse lets you do in debug.
一旦您对代码的质量感到满意,就可以使用 Maven 在一个 jar 中构建一个带有依赖关系的 jar,hadoop 非常喜欢.
Once you're happy with the quality of your code, use Maven to build a jar-with-dependencies for that all in one jar that hadoop likes so much.
顺便说一句,我已经基于 Eclipse 中的 M2T JET 项目构建了许多代码生成工具.它们为我上面提到的所有内容生成了基础设施,我只是为我的映射器、reducer 和测试用例编写逻辑.我想如果你考虑一下,你可能会想出一组可重用的类,你可以扩展它们来做几乎相同的事情.
Just as a side note, I've built a number of code generation tools based on the M2T JET project in Eclipse. They generate out the infrastructure for everything I've mentioned above and I just write the logic for my mappers, reducers and test cases. I think if you gave it some thought you could probably come up with a set of reusable classes that you could extend to do pretty much the same thing.
这是一个示例测试用例类:
Here's a sample test case class:
/*
*
* This source code and information are provided "AS-IS" without
* warranty of any kind, either expressed or implied, including
* but not limited to the implied warranties of merchantability
* and/or fitness for a particular purpose.
*
* This source code was generated using an evaluation copy
* of the Cassandra/Hadoop Accelerator and may not be used for
* production purposes.
*
*/
package com.creditco.countwords.ReadDocs;
// Begin imports
import java.io.IOException;
import java.util.ArrayList;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.junit.Test;
// End imports
public class ParseDocsMapperTest extends TestCase {
@Test
public void testCount() {
TestRecordWriter recordWriter = new TestRecordWriter();
TestRecordReader recordReader = new TestRecordReader();
TestOutputCommitter outputCommitter = new TestOutputCommitter();
TestStatusReporter statusReporter = new TestStatusReporter();
TestInputSplit inputSplit = new TestInputSplit();
try {
// Begin test logic
// Get an instance of the mapper to be tested and a context instance
ParseDocsMapper mapper = new ParseDocsMapper();
Mapper<LongWritable,Text,Text,IntWritable>.Context context =
mapper.testContext(new Configuration(), new TaskAttemptID(),recordReader,recordWriter,outputCommitter,statusReporter,inputSplit);
// Invoke the setup, map and cleanup methods
mapper.setup(context);
LongWritable key = new LongWritable(30);
Text value = new Text("abc def ghi");
mapper.map(key, value, context);
if (recordWriter.getKeys().length != 3) {
fail("com.creditco.countwords:ParseDocsMapperTest.testCount() - Wrong number of records written ");
}
mapper.cleanup(context);
// Validation:
//
// recordWriter.getKeys() returns the keys written to the context by the mapper
// recordWriter.getValues() returns the values written to the context by the mapper
// statusReporter returns the most recent status and any counters set by the mapper
//
// End test logic
} catch (Exception e) {
fail("com.creditco.countwords:ParseDocsMapperTest.testCount() - Exception thrown: "+e.getMessage());
}
}
final class TestRecordWriter extends RecordWriter<Text, IntWritable> {
ArrayList<Text> keys = new ArrayList<Text>();
ArrayList<IntWritable> values = new ArrayList<IntWritable>();
public void close(TaskAttemptContext arg0) throws IOException, InterruptedException { }
public void write(Text key, IntWritable value) throws IOException, InterruptedException {
keys.add(key);
values.add(value);
}
public Text[] getKeys() {
Text result[] = new Text[keys.size()];
keys.toArray(result);
return result;
}
public IntWritable[] getValues() {
IntWritable[] result = new IntWritable[values.size()];
values.toArray(result);
return result;
}
};
final class TestRecordReader extends RecordReader<LongWritable, Text> {
public void close() throws IOException { }
public LongWritable getCurrentKey() throws IOException, InterruptedException {
throw new RuntimeException("Tried to call RecordReader:getCurrentKey()");
}
public Text getCurrentValue() throws IOException, InterruptedException {
throw new RuntimeException("Tried to call RecordReader:getCurrentValue()");
}
public float getProgress() throws IOException, InterruptedException {
throw new RuntimeException("Tried to call RecordReader:getProgress()");
}
public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { }
public boolean nextKeyValue() throws IOException, InterruptedException {
return false;
}
};
final class TestStatusReporter extends StatusReporter {
private Counters counters = new Counters();
private String status = null;
public void setStatus(String arg0) {
status = arg0;
}
public String getStatus() {
return status;
}
public void progress() { }
public Counter getCounter(String arg0, String arg1) {
return counters.getGroup(arg0).findCounter(arg1);
}
public Counter getCounter(Enum<?> arg0) {
return null;
}
};
final class TestInputSplit extends InputSplit {
public String[] getLocations() throws IOException, InterruptedException {
return null;
}
public long getLength() throws IOException, InterruptedException {
return 0;
}
};
final class TestOutputCommitter extends OutputCommitter {
public void setupTask(TaskAttemptContext arg0) throws IOException { }
public void setupJob(JobContext arg0) throws IOException { }
public boolean needsTaskCommit(TaskAttemptContext arg0) throws IOException {
return false;
}
public void commitTask(TaskAttemptContext arg0) throws IOException { }
public void cleanupJob(JobContext arg0) throws IOException { }
public void abortTask(TaskAttemptContext arg0) throws IOException { }
};
}
这是一个示例 maven pom.请注意,引用的版本有点过时,但只要这些版本保存在某个 maven 存储库中,您就可以构建此项目.
and here's a sample maven pom. Note that the referenced versions are a bit out of date, but as long as those versions are kept in a maven repository somewhere, you'll be able to build this project.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.creditco</groupId>
<artifactId>wordcount.example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.2</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>1.0.6</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit</artifactId>
<version>1.0.1.1</version>
<type>jar</type>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>hamcrest-all</artifactId>
<groupId>org.hamcrest</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<version>0.9.1</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20090211</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
相关文章