(05)使用DeferredResult多线程异步处理请求

2021-01-05 01:29

阅读:413

标签:ping   void   阻塞   使用   row   异步   监听器   vat   tco   

  通常处理HTTP请求时使用同步处理的方式,但有时根据业务场景和性能要求异步处理可能更合适。简单说下概念。

  同步处理:一个HTTP请求进入一个主线程,主线程处理完后给出一个HTTP响应。

  异步处理:一个HTTP请求进入一个主线程,主线程调用一个副线程,副线程处理业务逻辑,当副线程处理完后,主线程把结果返回给给客户端。在副线程处理逻辑的同时,主线程可以空闲出来处理其他请求。因为服务器同时处理的线程数量有限,所以异步处理提升了服务器的吞吐量。

  异步处理请求有Runnable和DeferredResult两种方式,下面举例说一下。

  1、Runnable异步处理REST服务

@RestController
public class AsyncController {    
    
    private Logger logger=LoggerFactory.getLogger(getClass());
    
    @RequestMapping("/order")
    public Callable order(){
        logger.info("主线程开始");
        Callable result=new Callable() {
            @Override 
            public String call() throws Exception {
                logger.info("副线程开始");
                Thread.sleep(5000);
                logger.info("副线程结束");
                return "success"; 
            }
        };
        logger.info("主线程返回");
        return result;
    }
}

  启动服务,浏览器访问http://localhost/order,页面5秒后返回success,查看日志如下 :

  技术图片

  日志中显示了两个线程,主线程返回后过了5秒副线程结束。

  2、DeferredResult异步处理Rest服务

  通过上面的例子可以看到使用Runnable异步处理,副线程由主线程调起。这样有些业务场景不能满足,所以使用DeferredResult。举例如下:

  技术图片

  接受下单请求和处理下单逻辑的应用不在一台服务器上。线程1接受HTTP请求,并把该请求放到一个消息队列里。应用2监听消息队列,当监听到消息队列里有下单请求后处理下单逻辑,处理完后把结果再放到这个消息队列里。应用1中线程2监听消息队列,当监听到订单处理结果的消息后,会根据这个消息的结果返回一个HTTP响应给客户端。在这个场景中,线程1和线程2是完全隔离的,谁也不知道谁的存在。

  模拟消息队列:MockQueue.java,setPlaceOrder方法表示队列中处理下单请求,创建一个Thread模拟其它应用环境。

@Component
public class MockQueue {
    
    private Logger logger=LoggerFactory.getLogger(getClass());
    
    private String placeOrder;
    private String completeOrder;
    
    public String getPlaceOrder() {
        return placeOrder;
    }
    public void setPlaceOrder(String placeOrder){
        new Thread(()->{
            logger.info("接到下单请求:"+placeOrder);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.completeOrder = placeOrder;
            logger.info("下单请求处理完毕:"+placeOrder);
        }).start();
    }
    public String getCompleteOrder() {
        return completeOrder;
    }
    public void setCompleteOrder(String completeOrder) {
        this.completeOrder = completeOrder;
    }
}

  在线程1和线程2之间传递DeferredResult对象:DeferredResultHolder.java,Map中key是订单号,value是处理结果。

@Component
public class DeferredResultHolder {

    private Map> map=new HashMap>();

    public Map> getMap() {
        return map;
    }
    public void setMap(Map> map) {
        this.map = map;
    }
}

  处理请求:AsyncController.java

@RestController
public class AsyncController {    
    
    private Logger logger=LoggerFactory.getLogger(getClass());
    
    @Autowired
    private MockQueue mockQueue;
    @Autowired
    private DeferredResultHolder deferredResultHolder;
    
    @RequestMapping("/order")
    public DeferredResult order(){
        logger.info("主线程开始");
        String orderNumber=RandomStringUtils.randomNumeric(8);
        mockQueue.setPlaceOrder(orderNumber);
        DeferredResult result=new DeferredResult();
        deferredResultHolder.getMap().put(orderNumber, result);
        logger.info("主线程返回");
        return result;
    } 
}

  模拟线程2的代码,监听器:QueueListener.java。ContextRefreshedEvent事件是整个Spring容器初始化完毕的事件。

@Component
public class QueueListener implements ApplicationListener {
    
    private Logger logger=LoggerFactory.getLogger(getClass());
    
    @Autowired
    private MockQueue mockQueue;
    @Autowired
    private DeferredResultHolder deferredResultHolder;
    
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        new Thread(()->{//开启一个线程,否则Spring容器阻塞,无法启动
            while(true) {
                if(StringUtils.isNotBlank(mockQueue.getCompleteOrder())) {
                    String orderNumber=mockQueue.getCompleteOrder();
                    logger.info("返回订单处理结果:"+orderNumber);
                    deferredResultHolder.getMap().get(orderNumber).setResult("place order success");
                    mockQueue.setCompleteOrder(null);
                }else {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
}

  启动容器:请求3次http://localhost/order,页面输出place order success,打印日志如下:

  技术图片

(05)使用DeferredResult多线程异步处理请求

标签:ping   void   阻塞   使用   row   异步   监听器   vat   tco   

原文地址:https://www.cnblogs.com/javasl/p/12983131.html


评论


亲,登录后才可以留言!