深入详解java高并发热点数据更新

2023-05-18 05:05:59 并发 热点 详解

mysql update的时候到底是锁行还是锁表?

InnoDb 锁简单分类

按照数据操作的粒度

1)行级锁:锁住记录行

2)表级锁:锁住整张表

按照对数据操作的类型

1)读锁(共享锁):针对同一份数据,多个读操作可以同时进行而不会互相影响。  

 2) 写锁(排它锁):当前操作没有完成之前,它会阻断其他写锁和读锁。

mysql 在update时会根据where 条件的类型决定锁行还是锁表 where的过滤条件列,如果用索引,锁行,无法用索引,锁表。按照索引规则,如果能使用索引,锁行,不能使用索引,锁表。 行锁是排他锁,当一条记录已经被一条update语句锁住时会阻断其他的update操作,在高并发场景下,对于热点数据来说会进行频繁的更新操作造成其他update操作锁等待超时请求失败

背景

以旅游支付场景为例,伴随着业务量的增加,系统的并发量会逐渐上升,例如“北京长城度假区”的账户流水会变得十分频繁,每次支付或者退款操作都需要去更新一下账户余额,并发较低时并不会有什么问题,但当旅游高峰期到来时并发量上升,数据库更新的时候需要获得数据行锁,在未释放这个行锁之前,其他事务只能是等待。

解决方案

1.支付时异步入账,退款增加一个欠款垫资户

用户支付入款需要给账户加钱时此时商户对于资金的实时性要求不高,追求准确性,因此可以将账户加款放到异步线程池,达到错峰的目的 然而当用户发起退款时,我们必须及时并且准确的从账户扣款,因此退款采取同步进行,退款的订单相对于支付来说量就会少很多,满足要求。但是存在一个问题高并发状态下某一个热点账户余额时刻在变很有可能退款发起时账户余额充足但是实际扣除时由于上一笔支付未入账,造成金额不足

1.加分布式锁

对特定账户加锁,保证某一刻只有一笔退款请求获得该账户的操作权 弊端:多个用时同时退款时只有一笔成功,对用户不友好,pass掉

2.新增垫资商户

热点账户增加一个指定透支额度的垫资户,实际账户余额不足时从垫资户借款,然后定期核对垫资户透支额度从实际账户一次性扣款, 推荐

2.合并请求

合并多条需要更新余额的请求

将一段时间内的请求,先进行阻塞,合并各个账户需要更新的金额,一次性处理,然后将结果拆分,唤醒被阻塞的请求

demo实现

 * @Author: xiaokunkun
 * @CreateTime: 2023-04-23  14:37
 * @Description: 合并更新,可以不捕捉异常报错后外层调用方直接捕获异常事务回滚
 */
@Service
public class CommodityAmountService {
    class Request {
        AcctUpdateDto acctUpdateDto;
        //预留字段 可不使用
        String atomCode;
        //暂定返回结果为true或者false
        CompletableFuture<Boolean> future; // 接受结果
    }
 
    // 积攒请求(每隔N毫秒批量处理一次)
    LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue<>();
 
    // 定时任务的实现,N秒钟处理一次数据
    @PostConstruct
    public void init() {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            // 1、取出queue的请求,生成一次合并更新
            int size = queue.size();
            if (size == 0) {
                return;
            }
            ArrayList<Request> requests = new ArrayList<>();
            for (int i = 0; i < size; i++) {
                //队列出栈
                Request request = queue.poll();
                requests.add(request);
            }
            System.out.println("批量处理数据量:" + size);
            // 2、组装一个合并更新 key为账户value为sum(amount)
            Map<String, Long> amountMap = new HashMap<>();
            ArrayList<String> commodityCodes = new ArrayList<>();
            for (Request request : requests) {
                //todo 根据accountNo分组
            }
            for (String key : amountMap.keySet()) {
                Long amount = amountMap.get(key);
                //update mysql
            }
            // 3、将结果响应 分发给每一个单独的用户请求。由定时任务处理线程 --> n个用户的请求线程
            for (Request request : requests) {
                // 将结果返回到对应的请求线程,只要不报错此批次全部返回true,否则false
                request.future.complete(true);
            }}, 0, 1000, TimeUnit.MILLISECONDS);
    }
 
    @Autowired
    CommodityRemoteService commodityRemoteService;
 
    // 合并金额并更新,多个用户请求
    public Boolean updateMergeAmount(String movieCode)
            throws ExecutionException, InterruptedException {
        // 并非立刻发起接口调用,请求收集起来,再进行
        Request request = new Request();
        request.atomCode = movieCode;
        // 异步编程:获取异步处理的结果
        CompletableFuture<Boolean> future = new CompletableFuture<>();
        request.future = future;
        queue.add(request);
        return future.get(); // 此处get方法,会阻塞线程运行,直到future有返回
    }
}

测试类:

//模拟500的并发量
public void updateMerge() {
    AcctCmdDriver acctCmdDriver = new AcctCmdDriver();
    TradeAccntOrderDetail detail = new TradeAccntOrderDetail();
    AcctNoInfo acctNoInfo = new AcctNoInfo();
    OrderConsist consistForOrder = OrderConsist.newInstance("0200_202304", "trade_accnt_merchant_order");
    detail.setConsistForOrder(consistForOrder);
    acctCmdDriver.setDetail(detail);
    acctCmdDriver.setAcctNoInfo(acctNoInfo);
    detail.setAccountCateGory(AccountCategoryEnum.MERCHANT);
    detail.setAccountNo("02020001010000262977202304");
    detail.setAmount(6l);
    System.out.println("start build thread" + acctCmdDriver);
    Random rand = new Random();
    for (int i = 1; i <= 500; i++) {
        final String index = "code_" + i;
        Thread thread = new Thread(() -> {
            try {
                System.out.println("amount is:" + detail.getAmount());
                countDownLatch.await();
                Thread.sleep(rand.nextInt(150));
                Boolean res = updateMergeAmountService.mergeUpdate(acctCmdDriver);
                System.out.println("current i" + index + "res:" + res);
            } catch (InterruptedException e) {
                System.out.println("thread error is:" + e);
            }
        });
        thread.start();
        // 启动后,倒计时器倒计数减一,代表又有一个线程就绪了
        countDownLatch.countDown();
    }
}

以上就是深入详解java高并发热点数据更新的详细内容,更多关于java高并发热点数据更新的资料请关注其它相关文章!

相关文章