Java接口异步调用优化技巧详解

2023-05-19 08:05:33 调用 接口 详解

在日常项目中,我们经常采用多线程异步调用的方式来提高接口的响应时间。

在实际情况下,我们如何通过异步方式优化我们的接口呢,有以下几种常见思路

1,自己new线程或者线程池

如下我们把三个耗时操作交给新的线程或者线程池执行。

当请求过来的时候Tomcat线程会等待子线程全部执行完成,然后汇总结果进行返回。

 
    @GetMapping("getAllEGone")
    public Map<String, Object> getAllEgOne() throws ExecutionException, InterruptedException {
        FutureTask<String> stringFutureTaskOne = new FutureTask<>(asyncService::getOne);
        FutureTask<String> stringFutureTaskTwo = new FutureTask<>(asyncService::getTwo);
        FutureTask<String> stringFutureTaskThree = new FutureTask<>(asyncService::getThree);
        new Thread(stringFutureTaskOne).start();
        new Thread(stringFutureTaskTwo).start();
        new Thread(stringFutureTaskThree).start();
        Map<String, Object> result = new HashMap<>();
        result.put("one", stringFutureTaskOne.get());
        result.put("two", stringFutureTaskTwo.get());
        result.put("three", stringFutureTaskThree.get());
        return result;
    }

2,Sping Mvc

我们返回一个Callable 这时候会开启一个新的线程不会阻塞tomcat的线程

 
    @GetMapping("getAllEgTwo")
    public Callable<Map<String, Object>> getAllEgTwo() {
        return () -> {
            FutureTask<String> stringFutureTaskOne = new FutureTask<>(asyncService::getOne);
            FutureTask<String> stringFutureTaskTwo = new FutureTask<>(asyncService::getTwo);
            FutureTask<String> stringFutureTaskThree = new FutureTask<>(asyncService::getThree);
            new Thread(stringFutureTaskOne).start();
            new Thread(stringFutureTaskTwo).start();
            new Thread(stringFutureTaskThree).start();
            Map<String, Object> result = new HashMap<>(3);
            result.put("one", stringFutureTaskOne.get());
            result.put("two", stringFutureTaskTwo.get());
            result.put("three", stringFutureTaskThree.get());
            return result;
        };
    }

3,修改单个任务为批量任务

在项目中我们有很多数据库的查询,批量查询要快于单个查询,中间省了很多io操作。

思考能不能吧单个调用转换成批量呢,针对并发比较高的接口。合并多个用户的调用,转换成一批进行查询。

把一个时间段内的请求放进队列,然后通过定时任务进行批量查询,然后进行响应分发。

import com.example.demo.conf.SnowFlake;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;
import org.springframework.WEB.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

@RestController
@RequestMapping("async")
@Slf4j
public class AsyncController {
    @Autowired
    private AsyncService asyncService;
    private final SnowFlake worker = new SnowFlake(1, 1, 1);
    private final LinkedBlockingQueue<RequestBody<Long, UserInfo>> queue = new LinkedBlockingQueue<>();
    @PostConstruct
    public void doWork() {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
        service.scheduleAtFixedRate(() -> {
            //每次运行的时候 去拿MQ中的数据量
            int size = queue.size();
            if (size == 0) {
                return;
            }
            log.info("批量获取任务:{}-{}", Thread.currentThread().getName(), size);
            //多次请求收集到一起一块去批量请求下面的需要的数据
            List<Long> requestBodyList = new ArrayList<>();
            List<RequestBody<Long, UserInfo>> requestBodies = new ArrayList<>();
            for (int i = 0; i < size; i++) {
                RequestBody<Long, UserInfo> requestBody = queue.poll();
                requestBodies.add(requestBody);
                Long requestParam = requestBody.getRequestParam();
                requestBodyList.add(requestParam);
            }
            List<UserInfo> fourBatch;
            try {
                fourBatch = asyncService.getFourBatch(requestBodyList);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            if (CollectionUtils.isEmpty(fourBatch)) {
                return;
            }
            for (UserInfo x : fourBatch) {
                for (RequestBody<Long, UserInfo> y : requestBodies) {
                    if (x.getId().equals(y.getRequestParam())) {
                        y.getResult().complete(x);
                        break;
                    }
                }
            }
        }, 1000L, 50L, TimeUnit.MILLISECONDS);
    }
    
    @GetMapping("getAllEgFour")
    public UserInfo getAllEgFour(Long userId) throws ExecutionException, InterruptedException {
        if (userId == null) {
            userId = worker.nextId();
        }
        log.info("开始获取数据: {}: {}", Thread.currentThread().getName(), userId);
        RequestBody<Long, UserInfo> objectObjectRequestBody = new RequestBody<>();
        CompletableFuture<UserInfo> completableFuture = new CompletableFuture<>();
        objectObjectRequestBody.setRequestParam(userId);
        objectObjectRequestBody.setResult(completableFuture);
        queue.add(objectObjectRequestBody);
        UserInfo userInfo = completableFuture.get();
        log.info("完成获取数据: {}: {}", Thread.currentThread().getName(), userInfo);
        return userInfo;
    }
}

到此这篇关于Java接口异步调用优化技巧详解的文章就介绍到这了,更多相关Java接口异步调用内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

相关文章