SpringCloud(Hoxton.SR3)基础篇:第四章、Hystrix请求熔断与服务降级

2021-02-11 21:17

阅读:615

  1.引入Hystrix相关的依赖如下依赖所示:


org.springframework.cloud
    spring-cloud-starter-netflix-hystrix

  2.在启动类中加入@EnableCircuitBreaker注解,表示允许断路器。如下代码所示:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.netflix.ribbon.RibbonClient;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

@SpringBootApplication
//该注解表明应用既作为eureka实例又为eureka client 可以发现注册的服务
@EnableEurekaClient
//在启动该微服务的时候就能去加载我们的自定义Ribbon配置类
@RibbonClient(name = "provider-user")
//Hystrix启动类注解,允许断路器
@EnableCircuitBreaker
public class ConsumerMovieRibbonApplication {
    
    @Bean
    @LoadBalanced
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }
    
    public static void main(String[] args) {
        SpringApplication.run(ConsumerMovieRibbonApplication.class, args);
    }
}

  3.在Controller端代码加上@HystrixCommand注解

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.qxj.cloud.entity.User;

@RestController
public class MovieController {

    @Autowired
    private RestTemplate restTemplate;

    /**
     * 注解方式使用
     * 
     * @param id
     * @return
     */
    @RequestMapping(value = "/movie/{id}", method = RequestMethod.GET, produces = "application/json;charset=UTF-8")
    @HystrixCommand(fallbackMethod = "findByIdFallback")
    public User findById(@PathVariable Long id) {
        // 微服务的虚拟id http://provider-user
        User user = this.restTemplate.getForObject("http://provider-user:7900/simple/" + id, User.class);
        return user;
    }

    // findById的fallback方法
    public User findByIdFallback(Long id) {
        User user = new User();
        user.setId(0L);
        return user;
    }
}

  4.application.yml配置,使用Ribbon的yml配置

 

server:
  port: 8010
spring:
  application:
      name: consumer-movie-ribbon-with-hystrix
#eureka客户端连接配置
eureka:
   client:
      healthcheck:
         enabled: true
      service-url:
      #注册中心地址
         defaultZone: http://user:password123@localhost:8761/eureka
   instance:
      #将ip注册到eureka上
      prefer-ip-address: true
      #微服务向eureka注册实例名${spring.cloud.client.ip-address} 表示ip地址 spring2.0以上为ip-address
      instance-id: ${spring.application.name}:${spring.cloud.client.ip-address}:${spring.application.instance_id:${server.port}}

 

  5.启动微服务

  先启动eureka注册中心,和provider-user微服务,接着再把Ribbon微服务启动起来,在浏览器上输入http://localhost:8010/movie/3

技术图片

   停止provider-user微服务,再次在浏览器请求http://localhost:8010/movie/3

技术图片

 

  到这里简单演示了用Hystrix的注解@HystrixCommand(fallbackMethod = "findByIdFallback"),来实现熔断和服务降级。这只是表面的东西而已,根本不清楚他背后的原理

 三、灵活的熔断和服务降级

  1.Hystrix给我们提供了HystrixCommand类,让我们去继承它,去实现灵活的熔断和服务降级。实现类如下

import org.springframework.web.client.RestTemplate;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.qxj.cloud.entity.User;

/**
 * 自定义HystrixCommand实现
 * @author computer
 *
 */
public class UserServiceCommand extends HystrixCommand{
    
    private RestTemplate restTemplate;
    private Long id;
    
    //自定义构造函数
    public UserServiceCommand(String group,RestTemplate restTemplate,Long id) {
        super(HystrixCommandGroupKey.Factory.asKey(group));
        this.restTemplate = restTemplate;
        this.id = id;
    }

    @Override
    protected User run() throws Exception {
        System.out.println(Thread.currentThread().getName());
        return this.restTemplate.getForObject("http://provider-user:7900/simple/" + id, User.class);
    }

    @Override
    protected User getFallback() {
        User user = new User();
        user.setId(0L);
        return user;
    }
}

  2.Controller层的代码如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

import com.qxj.cloud.entity.User;
import com.qxj.cloud.service.UserServiceCommand;

@RestController
public class MovieController {

    @Autowired
    private RestTemplate restTemplate;

    /**
     * 自定义实现类使用HystrixCommand
     * 
     * @param id
     * @return
     */
    @RequestMapping(value = "/movieHystrixDiy/{id}", method = RequestMethod.GET, produces = "application/json;charset=UTF-8")
    public User findByIdDiy(@PathVariable Long id) {
        UserServiceCommand command = new UserServiceCommand("findByIdDiy", restTemplate, id);
        User user = command.execute();
        return user;
    }
}

在这里我们要注意一下,虽然我们在这里new了个UserServiceCommand,但是并没有调用UserServiceCommand的方法,而是用command.execute();方法来手工执行的。

  3.服务测试

  先启动eureka注册中心,和provider-user微服务,接着再把Ribbon微服务启动起来,在浏览器上输入http://localhost:8010/movieHystrixDiy/3

技术图片

 

 

   停止provider-user微服务,再次在浏览器请求http://localhost:8010/movieHystrixDiy/3

技术图片

  注意:如果provider-user是集群部署的,有多台服务停供使用,当其中一台服务宕机的时候,导致服务不可访问了,返回我们原先在代码中定义的服务降级后的结果id=0的User对象回来,当后面一段时间内还有请求再也不会轮询宕机的服务节点

 

 四、非阻塞式IO

  restTemplate.getForObject("http://provider-user:7900/simple/" + id, User.class)这是阻塞式的,因为这是阻塞式的,如果后面还有代码,必须等到网络请求restTemplate.getForObject("http://provider-user:7900/simple/" + id, User.class)返回结果后,你后面的代码才会执行。如果此刻,有一个请求过来,通过Ribbon客户端进来了,Ribbon客户端调用了三个服务,每一服务执行的时间都是2秒钟,那么这三个服务都是用阻塞IO来执行的话,那么耗时是2+2+2=6,一共就花了6秒钟。那么如果我们使用异步来执行的话,花费的时间就是这三个服务中哪一个耗时长就是总耗时时间,比如,此时耗时最多的一个服务是3秒钟,那么总共耗时就花了3秒钟。那么异步IO是什么意思呢?就是请求发出去以后,主线程不会在原地等着,会继续往下执行我的主线程,什么时候返回结果,我就什么时候过去取出来。等着三个服务执行完了我就一次性把结果取出来。

  非阻塞式IO有两个分别是:Future将来式,Callable回调式

 

  1).Future将来式:就是说你用Future将来式去请求一个网络IO之类的任务,它会以多线程的形式去实现,主线程不必卡死在哪里等待,等什么时候需要结果就通过Future的get()方法去取,不用阻塞。

  2).Callable回调式:预定义一个回调任务,Callable发出去的请求,主线程继续往下执行,等你请求返回结果执行完了,会自动调用你哪个回调任务。

  Future将来式入门实例

  1.自定义继承HystrixCommand的Future示例

    UserServiceCommand类几面不用变,只需要改变一下在Controller层的command的调用方式即可,command的调用方式如下:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

import com.qxj.cloud.entity.User;
import com.qxj.cloud.service.UserServiceCommand;

@RestController
public class MovieController {

    @Autowired
    private RestTemplate restTemplate;

    /**
     * 非阻塞式IO 自定义实现类使用HystrixCommand
     * 
     * @param id
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @RequestMapping(value = "/movieHystrixDiyFuture/{id}", method = RequestMethod.GET, produces = "application/json;charset=UTF-8")
    public User findByIdDiyFuture(@PathVariable Long id) throws InterruptedException, ExecutionException {
        UserServiceCommand command = new UserServiceCommand("findByIdDiyFuture", restTemplate, id);
        Future future = command.queue();
        return future.get();
    }
}

  2.@HystrixCommand注解方式的Future示例

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.command.AsyncResult;
import com.qxj.cloud.entity.User;

@RestController
public class MovieController {

    @Autowired
    private RestTemplate restTemplate;

    // findById的fallback方法
    public User findByIdFallback(Long id) {
        User user = new User();
        user.setId(0L);
        return user;
    }

    /**
     * 非阻塞式IO 注解方式使用
     * 
     * @param id
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @RequestMapping(value = "/movieHystrixFuture/{id}", method = RequestMethod.GET, produces = "application/json;charset=UTF-8")
    @HystrixCommand(fallbackMethod = "findByIdFallback")
    public User findByIdFuture(@PathVariable Long id) throws InterruptedException, ExecutionException {

        Future future = new AsyncResult() {

            @Override
            public User get() {
                return invoke();
            }

            @Override
            public User invoke() {
                System.out.println("-------外部访问");
                return restTemplate.getForObject("http://provider-user:7900/simple/" + id, User.class);
            }
        };

        User user = future.get();
        System.out.println("-------执行");
        // 微服务的虚拟id http://provider-user
        return user;
    }
}

运行结果跟上面的一样。

 

五、HystrixObservableCommand请求多个服务

  那么接下来我们又有另外一个需求就是,我发多个请求出去请求多个服务,我需要把请求结果汇总起来,一起返回给我,上面的例子,什么同步异步都不太好办。很麻烦,要写N个Future。

  这时候Hystrix又给我们提供了另外一种模式HystrixObservableCommand来让我们继承这个类,其实这种模式就运用了Java的RX编程中的观察者模式,如下:

技术图片

 

    1.自定义继承HystrixObservableCommand的示例 

  接下来我们新建一个名为UserServiceObservableCommand的类,来继承Hystrix给我们提供的HystrixObservableCommand类,同样HelloServiceObserveCommand类也不是交由Spring管理的,需要我们通过Controller层注入RestTemplate,放在构造方法来注入,代码如下所示:

import org.springframework.web.client.RestTemplate;

import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;
import com.qxj.cloud.entity.User;

import rx.Observable;
import rx.Subscriber;

/**
 * 自定义HystrixObservableCommand实现
 * @author computer
 *
 */
public class UserServiceObservableCommand extends HystrixObservableCommand{
    
    private RestTemplate restTemplate;
    private Long id;
    
    public UserServiceObservableCommand(String group,RestTemplate restTemplate,Long id) {
        super(HystrixCommandGroupKey.Factory.asKey(group));
        this.restTemplate=restTemplate;
        this.id = id;
    }

    @Override
    protected Observable construct() {
        Observable observable = Observable.unsafeCreate(new Observable.OnSubscribe() {

            @Override
            public void call(Subscriber super User> subscriber) {
                if(!subscriber.isUnsubscribed()) {
                    System.out.println("方法执行....");
                    
                    User user = restTemplate.getForObject("http://provider-user:7900/simple/" + id, User.class);
                    
                    //这个方法是监听方法,是传递结果的,请求多次的结果通过它返回去汇总起来。
                    subscriber.onNext(user);
                    
                    User user2 = restTemplate.getForObject("http://provider-user:7900/simple/" + id, User.class);
                    //这个方法是监听方法,传递结果的
                    subscriber.onNext(user2);
                    
                    subscriber.onCompleted();
                }
            }
        });
        return observable;
    }
    
    @Override
    protected Observable resumeWithFallback() {
        Observable observable = Observable.unsafeCreate(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber super User> subscriber) {
                if (!subscriber.isUnsubscribed()) {
                    User user = new User();
                    user.setId(0L);
                    subscriber.onNext(user);
                    subscriber.onCompleted();
                }
            }
            
        });
        return observable;
    }
}

  Controller层调用如下代码所示

import java.util.ArrayList;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

import com.qxj.cloud.entity.User;
import com.qxj.cloud.service.UserServiceObservableCommand;

import rx.Observable;
import rx.Observer;

@RestController
public class MovieController {

    @Autowired
    private RestTemplate restTemplate;


    /**
     * 请求多个服务
     * 自定义实现类使用HystrixObservableCommand
     * 
     * @param id
     * @return
     * @throws InterruptedException
     */
    @RequestMapping(value = "/movieHystrixDiyMreq/{id}", method = RequestMethod.GET, produces = "application/json;charset=UTF-8")
    public List findByIdDiyMreq(@PathVariable Long id) throws InterruptedException{
        //结果集
        List list = new ArrayList();
        
        UserServiceObservableCommand command = new UserServiceObservableCommand("findByIdDiyMreq", restTemplate, id);
        //热执行  不等待 事件注册完(onCompleted(),onError,onNext这三个事件注册)执行业务代码construct方法
        //Observable observable = command.observe();
        //冷执行  等待 事件注册完(onCompleted(),onError,onNext这三个事件注册)才执行业务代码call方法
        Observable observable = command.toObservable();
        //订阅
        observable.subscribe(new Observer() {
            
            //请求完成的方法
            @Override
            public void onCompleted() {
                System.out.println("会聚完了所有查询请求");
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }
            
            //订阅调用事件,结果会聚的地方,用集合去装返回的结果会聚起来。
            @Override
            public void onNext(User t) {
                System.out.println("结果来了.....");
                list.add(t);
            }
        });
        return list;
    }
}

运行结果如下:

技术图片

 

 

  前面的例子有异步和同步这两种方式,这里HystrixObservableCommand也有两个中执行方式,分别是,冷执行,和热执行。刚刚HystrixObservableCommand中的command.toObservable()冷执行方式。

  什么是热执行方式呢?

    所谓的热执行就是不管你事件有没有注册完(onCompleted(),onError,onNext这三个事件注册)就去执行我的业务方法即(HystrixObservableCommand实现类中的construct()方法)

 

  什么是冷执行呢?

    所谓的冷执行就是,先进行事件监听方法注册完成后,才执行业务方法

 

  2.注解使用HystrixObservableCommand的示例 

  Controller层调用如下代码所示

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.ObservableExecutionMode;
import com.qxj.cloud.entity.User;

import rx.Observable;
import rx.Subscriber;

@RestController
public class MovieController {

    @Autowired
    private RestTemplate restTemplate;

    // findById的fallback方法
    public User findByIdFallback(Long id) {
        User user = new User();
        user.setId(0L);
        return user;
    }
    
    /**
     * 请求多个服务
     * 注解方式实现类使用HystrixObservableCommand
     * ObservableExecutionMode.EAGER热执行  ObservableExecutionMode.LAZY冷执行
     * @param id
     * @return
     */
    @RequestMapping(value = "/movieHystrixMreq/{id}", method = RequestMethod.GET, produces = "application/json;charset=UTF-8")
    @HystrixCommand(fallbackMethod = "findByIdFallback",observableExecutionMode = ObservableExecutionMode.LAZY)
    public Observable findByIdMreq(@PathVariable Long id) throws InterruptedException{
        Observable observable = Observable.unsafeCreate(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber super User> subscriber) {
                if(!subscriber.isUnsubscribed()) {
                    System.out.println("方法执行....");
                    
                    User user = restTemplate.getForObject("http://provider-user:7900/simple/" + id, User.class);
                    
                    //这个方法监听方法,是传递结果的,请求多次的结果通过它返回去汇总起来。
                    subscriber.onNext(user);
                    
                    User user2 = restTemplate.getForObject("http://provider-user:7900/simple/" + id, User.class);
                    //这个方法是监听方法,传递结果的
                    subscriber.onNext(user2);
                    
                    subscriber.onCompleted();
                }
            }
        });
        return observable;
    }
}

运行结果如下:

技术图片

 参考文献:https://www.cnblogs.com/huangjuncong/p/9026949.html

 


评论


亲,登录后才可以留言!