admin管理员组文章数量:1564197
文章目录
- 手写springcloud|熔断hystrix
- github示例代码
- 服务调用和熔断的方式
- 熔断的流程
- Spring Cloud Hystrix 熔断细节
- 基本步骤
- 原版使用核心代码
- 实现过程
- 添加依赖
- springboot
- springcloud
- 其他依赖
- 简易版本
- await()
- sayLevel1()
- errorContent()
- 测试
- 中级版本
- @Timeout注解
- WebMvcConfig配置
- 拦截器
- sayLevelTimeout()
- 测试
- 高级版本
- @GPHystrixCommand注解
- AOP切面类
- 调用方法
- 完整版本
- 其他类
- BeforeAdviceMethodInvocationAdapter
- MethodInvocation
- ServerController
- 感谢
手写springcloud|熔断hystrix
github示例代码
github对应代码仓库地址:https://github/huajiexiewenfeng/spring-cloud-project-learn
spring-application 手写spring cloud系列
spring-cloud-client-appliaction 客户端
负载均衡 loadbalanced
服务调用 feign
eventBus
spring-cloud-server-application 服务端
- 熔断 hystrix
spring-boot-2.0-samples SpringBoot编程思想系列
- 理解@SpringBootApplication
- @Enable 模块驱动
spring-cloud-application 手写spring cloud系列 基础组件
- spring-cloud-servlet-gateway 网关
- spring-cloud-config-server 配置中心服务端
- spring-cloud-config-client 配置中心客户端
服务调用和熔断的方式
假设:
- web容器为tomcat
- MVC为Spring Web MVC
hystrix是在controller调用DB 或者 controller调用其他service 过程中通过编码的方式进行服务调用的熔断或者限流
熔断的流程
一般有两种方式
- 超时时间
- 当调用的方法执行时间超过设置的超时时间,服务端返回容错对象
- 信号量
- 单位时间内,服务端的访问量超过一定数量,线程阻塞
- 单位时间内,服务端的访问量超过一定数量,线程阻塞
Spring Cloud Hystrix 熔断细节
基本步骤
- 激活熔断
@EnableCircuitBreaker
- 配置Hystrix注解或者实现HystrixCommand抽象类
- 配置Hystrix属性和fallback方法
- 实现fallback方法
原版使用核心代码
本文将实现类似的效果
@HystrixCommand(
fallbackMethod = "errorContent",
commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",
value = "100")
}
)
@GetMapping("/say")
public String say(@RequestParam("message") String message) throws InterruptedException {
int value = random.nextInt(200);
System.out.println("say() cost " + value + "ms");
Thread.sleep(value);
System.out.println("port:" + getPort() + ",接收到消息-say:" + message);
return "port:" + getPort() + ",Hello," + message;
}
实现过程
添加依赖
springboot
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.3.RELEASE</version>
</parent>
springcloud
<spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
其他依赖
-
本环境将
server服务
注册到zookeeper
,如果不需要可以注释掉zookeeper
相关的依赖 -
hystrix
依赖是为了测试原版功能
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zookeeper-all</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.12</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
简易版本
通过简易版本了解基本的熔断实现过程
await()
模拟方法执行,随机等待200ms以内
private String await(String message) throws InterruptedException, TimeoutException {
int value = random.nextInt(200);
System.out.println("say() cost " + value + "ms");
Thread.sleep(value);
System.out.println("port:" + getPort() + ",接收到消息-say:" + message);
return "port:" + getPort() + ",Hello," + message;
}
sayLevel1()
实现调用和熔断功能
使用线程池ExecutorService
+Future
- future.get(100, TimeUnit.MILLISECONDS)
- 超过100毫秒,会抛出
TimeoutException
异常
- 超过100毫秒,会抛出
- 当
await()
方法执行超过100毫秒- 抛出
TimeoutException
异常 - 执行
errorContent()
方法
- 抛出
...
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
...
/**
* 简易版本
*
* @param message
* @return
* @throws InterruptedException
*/
@GetMapping("/sayLevel1")
public String sayLevel1(@RequestParam("message") String message) throws Exception {
Future<String> future = executorService.submit(() -> {
return await(message);
});
String res = null;
//100毫秒超时时间
try {
res = future.get(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException e) {
return errorContent(message);
}
return res;
}
errorContent()
容错返回方法
public String errorContent(String message) {
return "Fault";
}
测试
浏览器输入
http://127.0.0.1:8080/sayLevel1?message=xwf
中级版本
使用spring web mvc
中的interceptor
拦截器实现+自定义注解
- 拦截所有request的请求
- 判断方法上是否有
@Timeout
注解 - Future+超时控制+反射执行原方法
- 捕捉异常
- 反射执行
fallback
方法
@Timeout注解
import java.lang.annotation.*;
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Timeout {
int timeout() default 100;//超时时间
String fallback() default "";//回退方法
}
WebMvcConfig配置
增加CircuitBreakerHandlerInterceptor
拦截器
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new CircuitBreakerHandlerInterceptor());
}
}
拦截器
import com.huajie.spring.cloud.server.annotation.Timeout;
import org.springframework.core.MethodParameter;
import org.springframework.util.StringUtils;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.servlet.HandlerInterceptor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.*;
import java.util.stream.Stream;
public class CircuitBreakerHandlerInterceptor implements HandlerInterceptor {
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (handler instanceof HandlerMethod) {
HandlerMethod handlerMethod = (HandlerMethod) handler;
Timeout timeOut = handlerMethod.getMethodAnnotation(Timeout.class);
if (null != timeOut) {
int timeout = timeOut.timeout();
String fallback = timeOut.fallback();
Object bean = handlerMethod.getBean();
Method method = handlerMethod.getMethod();
Map<String, String[]> parameterMap = request.getParameterMap();
Class<?>[] parameterTypes = method.getParameterTypes();
//实参
Object[] paramValues = new Object[parameterTypes.length];
int i=0;
for (Map.Entry<String, String[]> param : parameterMap.entrySet()) {
String value = Arrays.toString(param.getValue()).replaceAll("\\[|\\]", "")
.replaceAll("\\s", ",");
paramValues[i] = caseStringValue(value, parameterTypes[i]);
i++;
}
Future<Object> future = executorService.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
return method.invoke(bean,paramValues);
}
});
Object returnValue = null;
try {
returnValue = future.get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
if (StringUtils.hasText(fallback)) {
returnValue = invokeFallbackMethod(handlerMethod, fallback,paramValues);
}else{
returnValue="server is busy";
}
}
response.getWriter().write(String.valueOf(returnValue));
return false;
}
}
return false;
}
private Object invokeFallbackMethod(HandlerMethod handlerMethod, String fallback, Object[] paramValues) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Object bean = handlerMethod.getBean();
//fallback方法签名要和原方法保持一致(参数类型和个数相同)
MethodParameter[] methodParameters = handlerMethod.getMethodParameters();
Class[] parameterTypes = Stream.of(methodParameters).map(MethodParameter::getParameterType)
.toArray(Class[]::new);
Method method = bean.getClass().getMethod(fallback, parameterTypes);
return method.invoke(bean,paramValues);
}
private Object caseStringValue(String value, Class<?> parameterType) {
if (Integer.class == parameterType) {
return Integer.valueOf(value);
} else if (Double.class == parameterType) {
return Double.valueOf(value);
} else if (String.class == parameterType) {
return String.valueOf(value);
} else {
if (value != null) {
return value;
} else {
return null;
}
}
}
}
sayLevelTimeout()
/**
* 高级版本
*
* @param message
* @return
* @throws Exception
*/
@Timeout(timeout = 150,fallback = "errorContent")
@GetMapping("/sayLevelTimeout")
public String sayLevelTimeout(@RequestParam("message") String message) throws Exception {
return await(message);
}
public String errorContent(String message) {
return "Fault";
}
测试
浏览器输入
http://127.0.0.1:8080/sayLevelTimeout?message=xwf
高级版本
此版本测试必须要注释调用中级版本,会互相影响
- 将中级版本中的拦截方式修改为AOP拦截即可
- 信号量使用
JUC
中的semaphore
实现
spring Aop + 自定义注解 + 超时时间和信号量两种方式
-
超时时间
- 切换线程Future
-
信号量
- 不切换线程,阻塞线程
@GPHystrixCommand注解
import com.netflix.hystrix.contrib.javanica.annotation.HystrixException;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixProperty;
import com.netflix.hystrix.contrib.javanica.annotation.ObservableExecutionMode;
import java.lang.annotation.*;
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface GPHystrixCommand {
int timeout() default 0;
int semaphore() default 0;//信号量
String fallback() default "";
}
AOP切面类
package com.huajie.spring.cloud.server.aspect;
import com.huajie.spring.cloud.server.annotation.GPHystrixCommand;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.lang.reflect.Method;
import java.util.concurrent.*;
@Component
@Aspect
public class HystrixAnnotationAuthorizingAspect {
private ExecutorService executorService = Executors.newFixedThreadPool(20);
private Semaphore semaphore = null;
@Pointcut("@annotation(com.huajie.spring.cloud.server.annotation.GPHystrixCommand)")
void anyRPCAnnotatedMethodCall() {
}
@Around("anyRPCAnnotatedMethodCall()")
public Object executeAnnotatedMethod(ProceedingJoinPoint aJoinPoint) throws Throwable {
BeforeAdviceMethodInvocationAdapter mi = BeforeAdviceMethodInvocationAdapter.createFrom(aJoinPoint);
Method method = mi.getMethod();
Object[] args = mi.getArguments();
Object res = null;
if (method.isAnnotationPresent(GPHystrixCommand.class)) {
GPHystrixCommand annotation = method.getAnnotation(GPHystrixCommand.class);
int timeout = annotation.timeout();
int semaphoreValue = annotation.semaphore();
String fallback = annotation.fallback();
if (0 == timeout && 0 == semaphoreValue || 0 != timeout) {
Future<Object> future = executorService.submit(() -> {
Object returnValue = null;
try {
returnValue = aJoinPoint.proceed(args);
} catch (Throwable throwable) {
throw new Exception(throwable);
}
return returnValue;
});
//100毫秒超时时间
try {
res = future.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException e) {
future.cancel(true);
res = invokeFallbackMethod(method, aJoinPoint.getTarget(), fallback, args);
}
}
if (0 != semaphoreValue) {
if (semaphore == null) {
semaphore = new Semaphore(semaphoreValue);
}
try {
semaphore.acquire();
res = aJoinPoint.proceed(args);
} finally {
semaphore.release();
}
}
return res;
}
return null;
}
private Object invokeFallbackMethod(Method method, Object bean, String fallback, Object[] arguments) throws Exception {
// 查找 fallback 方法
Method fallbackMethod = findFallbackMethod(method, bean, fallback);
return fallbackMethod.invoke(bean, arguments);
}
private Method findFallbackMethod(Method method, Object bean, String fallbackMethodName) throws
NoSuchMethodException {
// 通过被拦截方法的参数类型列表结合方法名,从同一类中找到 fallback 方法
Class beanClass = bean.getClass();
Method fallbackMethod = beanClass.getMethod(fallbackMethodName, method.getParameterTypes());
return fallbackMethod;
}
@PreDestroy
private void destroy() {
executorService.shutdown();
}
}
调用方法
/**
* 高级版本超时时间
*
* @param message
* @return
* @throws Exception
*/
@GPHystrixCommand(timeout = 150,fallback = "errorContent")
@GetMapping("/sayLevel3")
public String sayLevel3(@RequestParam("message") String message) throws Exception {
return await(message);
}
/**
* 高级版本信号量
*
* @param message
* @return
* @throws Exception
*/
@GPHystrixCommand(semaphore = 5)
@GetMapping("/sayLevel4")
public String sayLevel4(@RequestParam("message") String message) throws Exception {
return await(message);
}
public String errorContent(String message) {
return "Fault";
}
完整版本
其他类
BeforeAdviceMethodInvocationAdapter
Aop封装工具类
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.reflect.AdviceSignature;
import org.aspectj.lang.reflect.MethodSignature;
import java.lang.reflect.Method;
public class BeforeAdviceMethodInvocationAdapter implements MethodInvocation {
private Object _object;
private Method _method;
private Object[] _arguments;
public static BeforeAdviceMethodInvocationAdapter createFrom(JoinPoint aJoinPoint) {
if (aJoinPoint.getSignature() instanceof MethodSignature) {
return new BeforeAdviceMethodInvocationAdapter(aJoinPoint.getThis(),
((MethodSignature) aJoinPoint.getSignature()).getMethod(),
aJoinPoint.getArgs());
} else if (aJoinPoint.getSignature() instanceof AdviceSignature) {
return new BeforeAdviceMethodInvocationAdapter(aJoinPoint.getThis(),
((AdviceSignature) aJoinPoint.getSignature()).getAdvice(),
aJoinPoint.getArgs());
} else {
throw new IllegalArgumentException("The joint point signature is invalid: expected a MethodSignature or an AdviceSignature but was " + aJoinPoint.getSignature());
}
}
public BeforeAdviceMethodInvocationAdapter(Object anObject, Method aMethod, Object[] someArguments) {
_object = anObject;
_method = aMethod;
_arguments = someArguments;
}
public Object[] getArguments() {
return _arguments;
}
public Method getMethod() {
return _method;
}
public Object proceed() throws Throwable {
return null;
}
public Object getThis() {
return _object;
}
}
MethodInvocation
Aop工具接口类
import java.lang.reflect.Method;
public interface MethodInvocation {
Object proceed() throws Throwable;
Method getMethod();
Object[] getArguments();
Object getThis();
}
ServerController
controller层测试类
package com.huajie.spring.cloud.server.controller;
import com.huajie.spring.cloud.server.annotation.GPHystrixCommand;
import com.huajie.spring.cloud.server.annotation.Timeout;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixProperty;
import org.springframework.core.env.Environment;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.*;
import java.util.concurrent.*;
@RestController
public class ServerController {
private final Environment environment;
private final static Random random = new Random();
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
public ServerController(Environment environment) {
this.environment = environment;
}
public String getPort() {
return environment.getProperty("local.server.port");
}
@HystrixCommand(
fallbackMethod = "errorContent",
commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",
value = "100")
}
)
@GetMapping("/say")
public String say(@RequestParam("message") String message) throws InterruptedException {
int value = random.nextInt(200);
System.out.println("say() cost " + value + "ms");
Thread.sleep(value);
System.out.println("port:" + getPort() + ",接收到消息-say:" + message);
return "port:" + getPort() + ",Hello," + message;
}
/**
* 简易版本
*
* @param message
* @return
* @throws InterruptedException
*/
@GetMapping("/sayLevel1")
public String sayLevel1(@RequestParam("message") String message) throws Exception {
Future<String> future = executorService.submit(() -> {
return await(message);
});
String res = null;
//100毫秒超时时间
try {
res = future.get(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException e) {
return errorContent(message);
}
return res;
}
private String await(String message) throws InterruptedException {
int value = random.nextInt(200);
System.out.println("say() cost " + value + "ms");
Thread.sleep(value);
System.out.println("port:" + getPort() + ",接收到消息-say:" + message);
return "port:" + getPort() + ",Hello," + message;
}
/**
* 中级版本
*
* @param message
* @return
* @throws Exception
*/
@GetMapping("/sayLevel2")
public String sayLevel2(@RequestParam("message") String message) throws Exception {
Future<String> future = executorService.submit(() -> {
return await(message);
});
String res = null;
try {
res = future.get(100, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
future.cancel(true);
throw e;
}
return res;
}
/**
* 高级版本
*
* @param message
* @return
* @throws Exception
*/
@Timeout(timeout = 150,fallback = "errorContent")
@GetMapping("/sayLevelTimeout")
public String sayLevelTimeout(@RequestParam("message") String message) throws Exception {
return await(message);
}
/**
* 高级版本
*
* @param message
* @return
* @throws Exception
*/
@GPHystrixCommand(timeout = 150,fallback = "errorContent")
@GetMapping("/sayLevel3")
public String sayLevel3(@RequestParam("message") String message) throws Exception {
return await(message);
}
/**
* 高级版本
*
* @param message
* @return
* @throws Exception
*/
@GPHystrixCommand(semaphore = 5)
@GetMapping("/sayLevel4")
public String sayLevel4(@RequestParam("message") String message) throws Exception {
return await(message);
}
public String errorContent(String message) {
return "Fault";
}
}
感谢
- 小马哥的分享 博客地址https://mercyblitz.github.io/
本文标签: SpringCloudHystrix
版权声明:本文标题:手写springcloud|熔断 hystrix 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://www.elefans.com/xitong/1727467511a1115915.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论