infinispan5的MapReduce使用

2022-06-27 00:00:00 专区 订阅 付费 链接 原文

参考

https://docs.jboss.org/author/display/ISPN/Infinispan+Distributed+Execution+Framework#InfinispanDistributedExecutionFramework-MapReducemodel





首先下载我们的infinispan5的运行环境

https://github.com/infinispan/infinispan-quickstart.git





中org.infinispan.quickstart.clusteredcache.distribution.NodeClient类为





package org.infinispan.quickstart.clusteredcache.distribution;

import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.Map.Entry;

import org.infinispan.Cache;
import org.infinispan.config.Configuration;
import org.infinispan.config.GlobalConfiguration;
import org.infinispan.config.Configuration.CacheMode;
import org.infinispan.distexec.mapreduce.Collator;
import org.infinispan.distexec.mapreduce.Collector;
import org.infinispan.distexec.mapreduce.MapReduceTask;
import org.infinispan.distexec.mapreduce.Mapper;
import org.infinispan.distexec.mapreduce.Reducer;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;

/**
* @author :xiaofancn
* @version :2011-11-30 下午10:15:44
*
*/

public class NodeClient {

public static EmbeddedCacheManager createCacheManagerProgramatically() {

return new DefaultCacheManager(GlobalConfiguration
.getClusteredDefault().fluent().transport()
.addProperty("configurationFile", "jgroups.xml").build(),
new Configuration().fluent().mode(CacheMode.DIST_ASYNC).hash()
.numOwners(2).build());
}

static class WordCountMapper implements
Mapper<String, String, String, Integer> {
private static final long serialVersionUID = 1L;

@Override
public void map(String key, String value, Collector<String, Integer> c) {
StringTokenizer tokens = new StringTokenizer(value);
while (tokens.hasMoreElements()) {
System.out.println(key + "=================" + value);
String s = (String) tokens.nextElement();
c.emit(s, 1);
}
}
}

static class WordCountReducer implements Reducer<String, Integer> {
/** The serialVersionUID */
private static final long serialVersionUID = 1901016598354633256L;

@Override
public Integer reduce(String key, Iterator<Integer> iter) {
int sum = 0;
while (iter.hasNext()) {
Integer i = (Integer) iter.next();
sum += i;
}
System.out.println(key+":"+sum);
return sum;

}
}

public static void main(String[] args) {

Cache<String, String> cache = createCacheManagerProgramatically()
.getCache("Demo");


int count = 1000000000;
for (int i = 0; i < count; i++) {
cache.put("" + i, "" + i);
}
cache.put("" + count, "" + 4);

System.out.println(new Date());
Long start = System.currentTimeMillis();

MapReduceTask<String, String, String, Integer> task = new MapReduceTask<String, String, String, Integer>(
cache);
task.mappedWith(new WordCountMapper()).reducedWith(
new WordCountReducer());
String mostFrequentWord = task
.execute(new Collator<String, Integer, String>() {
@Override
public String collate(Map<String, Integer> reducedResults) {
String mostFrequent = "";
int maxCount = 0;
for (Entry<String, Integer> e : reducedResults
.entrySet()) {
Integer count = e.getValue();
if (count > maxCount) {
maxCount = count;
mostFrequent = e.getKey();
}
}
return mostFrequent;
}

});
System.out.println(new Date());
System.out.println(System.currentTimeMillis() - start);
System.out.println("The most frequent word is " + mostFrequentWord);

}

}

分别运行

org.infinispan.quickstart.clusteredcache.distribution.Node0

org.infinispan.quickstart.clusteredcache.distribution.Node1

org.infinispan.quickstart.clusteredcache.distribution.Node2

org.infinispan.quickstart.clusteredcache.distribution.NodeClient
————————————————
版权声明:本文为CSDN博主「xiaofancn」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/xiaofancn/article/details/84085204

相关文章