Aerospike-Java

2022-06-21 00:00:00 操作 记录 默认 副本 超时

这篇主要讲aerospike的客户端java使用优化

参考1,在example里有很多的使用实例,这里就不介绍了。

读写策略
public class Policy {

/**
* 优先级
*/
public Priority priority = Priority.DEFAULT;

/**
* 读取操作都一致性
* 默认读取一个副本
*/
public ConsistencyLevel consistencyLevel = ConsistencyLevel.CONSISTENCY_ONE;

/**
* 用于确定单个记录命令的目标节点的副本算法--->record udf
* 批处理,扫描和查询不受副本算法的影响
*/
public Replica replica = Replica.SEQUENCE;

/**
* 处理数据库命令时的超时
* 0:不限制
* 默认: 0 (不限制)
*/
public int socketTimeout;

/**
* 总事务超时
* totalTimeout = socketTimeout * maxRetries
* 如果 totalTimeout < socketTimeout,那么socketTimeout = totalTimeout
* 默认: 0 (不限制)
*/
public int totalTimeout;

/**
* 如果请求超时,那么该连接延迟关闭时间.如果在延迟时间内,下个请求用该连接请求成功了,那么放回连接池
* ,默认: 0 (连接超时,立刻回收)
*/
public int timeoutDelay;

/**
* 重试次数
* 警告:非幂等操作,不应该重试
* 默认: 2 (initial attempt + 2 retries = 3 attempts)
*/
public int maxRetries = 2;

/**
* 重试之间的休眠时间为毫秒。输入零以跳过睡眠
*/
public int sleepBetweenRetries;

/**
* 用户定义都key 是否保存在记录中
* 默认: false (不保存)
*/
public boolean sendKey;
}

public final class WritePolicy extends Policy {
/**
* 当记录已经存在时,的策略
* @see RecordExistsAction
*/
public RecordExistsAction recordExistsAction = RecordExistsAction.UPDATE;

/**
* 类似于乐观锁
*/
public GenerationPolicy generationPolicy = GenerationPolicy.NONE;

/**
* 在服务器上提交事务时所需的一致性保证
* 默认:主节点等所有都副本都保存成功才算成功
*/
public CommitLevel commitLevel = CommitLevel.COMMIT_ALL;

/**
* version
*/
public int generation;

/**
* TTL,过期时间
* -2:当更新时不修改ttl
* -1:从不过期
* 0:根据namespace的"default-ttl"配置
* >0:单位是秒、单位是秒、单位是秒重要都说三遍
* </ul>
*/
public int expiration;

/**
* 对于客户端操作(),返回每个操作的结果。
* 有些操作默认不返回结果(例如ListOperation.clear())
* 这会使得难以确定返回的期望的结果偏移量
* 将respondAllOps设置为true可以更容易地识别所需的结果偏移量
*(结果偏移等于箱的操作顺序)
* <p>
* 默认: false
*/
public boolean respondAllOps;

/**
* 如果交易导致记录删除,请为记录留下一个墓碑。
* 这样可以防止节点故障后删除的记录再次出现。
*/
public boolean durableDelete;
}

public class QueryPolicy extends Policy {
/**
* Maximum number of concurrent requests to server nodes at any point in time.
* If there are 16 nodes in the cluster and maxConcurrentNodes is 8, then queries
* will be made to 8 nodes in parallel. When a query completes, a new query will
* be issued until all 16 nodes have been queried.
* Default (0) is to issue requests to all server nodes in parallel.
* 这个没看懂????知道的说下
*/
public int maxConcurrentNodes;

/**
* 阻塞队列的大小,在使用 queryAggregate 会用到
* 没用过。。。
*/
public int recordQueueSize = 5000;

/**
* Indicates if bin data is retrieved. If false, only record digests (and user keys
* if stored on the server) are retrieved.
* 这个也不懂
*/
public boolean includeBinData = true;
}

看了以上的api可以确定以下几点
1、以CAS的方式支持行事务
2、在非幂等更新时,需要将重试机制关闭
3、数据是否强一致性 commitLevel
4、TTL机制

Map api
设置排序 UNORDERED KEY_ORDERED KEY_VALUE_ORDERED
add(), add_items(), increment(), decrement(), clear()
remove_by_key(), remove_by_index(), remove_by_rank()
remove_by_key_interval(), remove_by_index_range()
remove_by_value_interval(), remove_by_rank_range(), remove_all_by_value()
remove_all_by_key_list(), remove_all_by_value_list()
size()
get_by_key(), get_by_index(), get_by_rank()
get_by_key_interval(), get_by_index_range()
get_by_value_interval(), get_by_rank_range(), get_all_by_value()
get_all_by_key_list(), get_all_by_value_list()
List api
排序 UNORDERED ORDERED
append(), insert(), insert_items(), add_items()
set()
increment(), sort()
clear(), size()
remove_by_index(), remove_by_index_range()
remove_by_rank(), remove_by_rank_range()
remove_by_value(), remove_by_value_interval(), remove_all_by_value(), remove_all_by_value_list()
get_by_index(), get_by_index_range()
get_by_rank(), get_by_rank_range()
get_by_value(), get_by_value_interval(), get_all_by_value(), get_all_by_value_list()
以上的复杂数据类型操作也是比较灵活的

UDF
是使用运行在Aerospike数据库服务器上的Lua编程语言编写的。可以使用UDF来扩展Aerospike数据库引擎的功能和性能。 这里就是aerospike的实时计算所在

注册UDF
RegisterTask task = client.register(null, MyClass.class.getClassLoader(), "udf/example.lua", "example.lua", Language.LUA);
// Poll cluster for completion every second for a maximum of 10 seconds.
task.waitTillComplete(1000, 10000);

调用UDF
String result = (String) client.execute(
null, key, "examples", "readBin", Value.get("name")
);

examples.lua

function readBin(r, name)
return r[name]
end

分类
Record-UDF
Record-UDF 是针对单odiao条记录的操作

list尾部追加.lua

function appendListBin(r, binname, value)
local l = r[binname]

if l == nil then
l = list()
end

list.append(l, value)
r[binname] = l
aerospike:update(r)
end


Stream-UDF
Stream-UDF是针对批量的数据进行 map-reduce的计算

求平均.lua

function average(s,binName)

local function mapper(out, rec)
out['sum'] = (out['sum'] or 0) + (rec[binName] or 0)
out['count'] = (out['count'] or 0) + 1
return out
end

local function reducer(a, b)
local out = map()
out['sum'] = a['sum'] + b['sum']
out['count'] = a['count'] + b['count']
return out
end

return s : aggregate(map{sum = 0, count = 0}, mapper) : reduce(reducer)
end
————————————————
版权声明:本文为CSDN博主「飞天的猪猪」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/u012092620/article/details/79883652

相关文章