如何通过 Java 中的 Executor Framework 在 DynamoDb 中获得最佳批量插入率?
我正在使用 DynamoDB SDK for Java 在本地 Dynamo DB 中进行批量写入(大约 5.5k 项)的 POC.我知道每个批量写入不能超过 25 个写入操作,因此我将整个数据集划分为每个 25 个项目的块.然后我将这些块作为 Executor 框架中的可调用操作传递.尽管如此,我还是没有得到令人满意的结果,因为 5.5k 记录在 100 多秒内被插入.
I'm doing a POC on Bulk write (around 5.5k items) in local Dynamo DB using DynamoDB SDK for Java. I'm aware that each bulk write cannot have more than 25 write operations, so I am dividing the whole dataset into chunks of 25 items each. Then I'm passing these chunks as callable actions in Executor framework. Still, I'm not having a satisfactory result as the 5.5k records are getting inserted in more than 100 seconds.
我不确定我还能如何优化它.在创建表时,我将 WriteCapacityUnit 设置为 400(不确定我可以给出的最大值是多少)并对其进行了一些试验,但它从未有任何区别.我也尝试过更改执行器中的线程数.
I'm not sure how else can I optimize this. While creating the table I provisioned the WriteCapacityUnit as 400(not sure what's the maximum value I can give) and experimented with it a bit, but it never made any difference. I have also tried changing the number of threads in executor.
这是执行批量写入操作的主要代码:
This is the main code to perform the bulk write operation:
public static void main(String[] args) throws Exception {
AmazonDynamoDBClient client = new AmazonDynamoDBClient().withEndpoint("http://localhost:8000");
final AmazonDynamoDB aws = new AmazonDynamoDBClient(new BasicAWSCredentials("x", "y"));
aws.setEndpoint("http://localhost:8000");
JSONArray employees = readFromFile();
Iterator<JSONObject> iterator = employees.iterator();
List<WriteRequest> batchList = new ArrayList<WriteRequest>();
ExecutorService service = Executors.newFixedThreadPool(20);
List<BatchWriteItemRequest> listOfBatchItemsRequest = new ArrayList<>();
while(iterator.hasNext()) {
if (batchList.size() == 25) {
Map<String, List<WriteRequest>> batchTableRequests = new HashMap<String, List<WriteRequest>>();
batchTableRequests.put("Employee", batchList);
BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest();
batchWriteItemRequest.setRequestItems(batchTableRequests);
listOfBatchItemsRequest.add(batchWriteItemRequest);
batchList = new ArrayList<WriteRequest>();
}
PutRequest putRequest = new PutRequest();
putRequest.setItem(ItemUtils.fromSimpleMap((Map) iterator.next()));
WriteRequest writeRequest = new WriteRequest();
writeRequest.setPutRequest(putRequest);
batchList.add(writeRequest);
}
StopWatch watch = new StopWatch();
watch.start();
List<Future<BatchWriteItemResult>> futureListOfResults = listOfBatchItemsRequest.stream().
map(batchItemsRequest -> service.submit(() -> aws.batchWriteItem(batchItemsRequest))).collect(Collectors.toList());
service.shutdown();
while(!service.isTerminated());
watch.stop();
System.out.println("Total time taken : " + watch.getTotalTimeSeconds());
}
}
这是用于创建 dynamoDB 表的代码:
This is the code used to create the dynamoDB table:
public static void main(String[] args) throws Exception {
AmazonDynamoDBClient client = new AmazonDynamoDBClient().withEndpoint("http://localhost:8000");
DynamoDB dynamoDB = new DynamoDB(client);
String tableName = "Employee";
try {
System.out.println("Creating the table, wait...");
Table table = dynamoDB.createTable(tableName, Arrays.asList(new KeySchemaElement("ID", KeyType.HASH)
), Arrays.asList(new AttributeDefinition("ID", ScalarAttributeType.S)),
new ProvisionedThroughput(1000L, 1000L));
table.waitForActive();
System.out.println("Table created successfully. Status: " + table.getDescription().getTableStatus());
} catch (Exception e) {
System.err.println("Cannot create the table: ");
System.err.println(e.getMessage());
}
}
推荐答案
DynamoDB Local 是作为一种工具提供给需要为 DynamoDB 进行离线开发的开发人员,并且不是为扩展或性能而设计的.因此,它不适用于规模测试,如果您需要测试批量负载或其他高速工作负载,最好使用真实表.在实时表上进行开发测试所产生的实际成本通常非常小,因为这些表只需要在测试运行期间提供高容量.
DynamoDB Local is provided as a tool for developers who need to develop offline for DynamoDB and is not designed for scale or performance. As such it is not intended for scale testing, and if you need to test bulk loads or other high velocity workloads it is best to use a real table. The actual cost incurred from dev testing on a live table is usually quite minimal as the tables only need to be provisioned for high capacity during the test runs.
相关文章