在 RIAK 上获取 MapReduce 结果(使用 Java 客户端)

2022-01-13 00:00:00 mapreduce java riak

我在 RIAK 上存储 Person POJO(4 个字符串字段 - id、name、lastUpdate、Data),然后尝试使用 MapReduce 获取这些对象.

I am storing Person POJOs (4 string fields - id, name, lastUpdate, Data) on RIAK, then trying to fetch these objects with MapReduce.

我的做法与 Basho 文档非常相似:

I am doing it very similar to Basho documentation:

    BucketMapReduce m = riakClient.mapReduce("person");
    m.addMapPhase(new NamedJSFunction("Riak.mapByFields"), true);
    MapReduceResult result = m.execute();
    Collection<Person> tmp = result.getResult(Person.class);

调用了 Person 的 String 构造函数:

the Person's String constructor is invoked:

public Person(String str){}

(我必须有这个构造函数,否则我会因为它丢失而出现异常)在那里,我将对象作为字符串 - 一个字符串中的对象字段带有一个奇怪的分隔符.

(I must have this constructor, otherwise I get an exception for it is missing) In there I get the object as a String - the Object's fields in one string with a strange delimiter.

为什么我没有将对象自动转换为我的 POJO?我真的需要遍历字符串并反序列化它吗?我做错了什么吗?s

why am I not getting the object automatically converted to my POJO? do I really need to go over the string and deserialize it? am i doing something wrong?s

推荐答案

您使用的 JS 函数并没有按照您的想法执行 :) 它根据您必须提供的特定值的字段来选择对象作为相位的参数.

The JS function you're using doesn't do what you think it does :) It selects objects based on a field with a specific value you have to supply as an argument to the phase.

我认为您正在寻找的是 mapValuesJson 它将执行您似乎想要做的事情.

I think what you're looking for is mapValuesJson which will do what you seem to be wanting to do.

此外,您的 POJO 中根本不需要构造函数.

Also, you don't need a constructor at all in your POJO.

下面的代码应该为您指明正确的方向(显然这是超级简单的,POJO 中的所有公共字段都没有注释):

The code below should point you in the right direction (obviously this is super-simple with all public fields in the POJO and no annotations):

public class App {

    public static void main( String[] args ) throws IOException, RiakException
    {
        IRiakClient client = RiakFactory.httpClient();
        Bucket b = client.fetchBucket("test_mr").execute();

        b.store("myobject", new Person()).execute();
        IRiakObject o = b.fetch("myobject").execute();
        System.out.println(o.getValueAsString());


        BucketMapReduce m = client.mapReduce("test_mr");
        m.addMapPhase(new NamedJSFunction("Riak.mapValuesJson"), true);
        MapReduceResult result = m.execute();
        System.out.println(result.getResultRaw());
        Collection<Person> tmp = result.getResult(Person.class);

        for (Person p : tmp)
        {
            System.out.println(p.data);
        }


        client.shutdown();
    }
}

class Person 
{
    public String id = "12345";
    public String name = "my name";
    public String lastUpdate = "some time";
    public String data = "some data";


}

相关文章