SpringCloud

SpringCloud-Alibaba

        <spring-cloud.version>Hoxton.SR8</spring-cloud.version>
<mybatis.plus.version>3.3.0</mybatis.plus.version>
<mysql.version>5.1.47</mysql.version>
<alibaba.version>2.2.5.RELEASE</alibaba.version>

<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- springCloud -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>


Http客户端

RestTemplate

  • spring自带,编程式发起网络请求,访问其他服务

1、配置

  • 在配置类注入
//发起Http请求  RestTemplate(spring工具类)

/**
* 创建RestTemplate并注入spring容器 -- 功能相当于 HttpClient
*/
@Bean
@LoadBalanced //开启负载均衡
public RestTemplate restTemplate(){
return new RestTemplate();
}


2、发起请求

  • 返回 – json,可以反序列化到指定的类型
    //1.利用RestTemplate发起http请求 (Get)
String url = "http://localhost:8091/user/" +order.getUserId();
//请求:postForObject、getForObject。。。
User user= restTemplate.getForObject(url, User.class); //(url,返回类型)


//2,拉取服务:url 不采用硬编码,为**注册服务的名称**
String url = "http://userservice/user/" +order.getUserId();

User user= restTemplate.getForObject(url, User.class);


@GlobalTransactional //分布式事务回滚
public ResponseResult fashOrder(Integer seckillId, OrderVO orderVO) throws InterruptedException {
RLock lock = redissonClient.getLock("lock:seckillInfo" + seckillId);
//尝试获取锁,(获取锁最大等待时间,锁释放时间,时间单位)
boolean isLock = lock.tryLock(5, 30, TimeUnit.SECONDS);
try {
if (isLock) {
//购买逻辑
SeckillCombined seckillCombined = seckillCombinedMapper.selectByPrimaryKeyAndProductId(seckillId, productId);
}else{
return ResponseResult.errorResult(SalesHttpCodeEnum.UNKOWN);
}
}catch (Exception e) {
}finally {
//释放锁
lock.unlock();
}
}





Feign

Feign是一个声明式的http客户端,官方地址:https://github.com/OpenFeign/feign

  • 底层依赖于Ribbon自动实现负载均衡

Feigin是通过寻找业务是接口通过url绑定的业务:根据当前接口绑定的url寻找业务

Dubbo是通过业务绑定的接口:根据接口,寻找绑定该接口的业务

  • Dubbo:Dubbo是一个由阿里巴巴开发的高性能RPC(远程过程调用)框架,用于构建分布式服务。它着重于远程方法调用和服务治理,包括服务注册、发现、负载均衡和熔断等。
  • OpenFeign:OpenFeign是一个声明式的HTTP客户端框架,用于方便地编写HTTP API客户端。它的主要目的是简化RESTful服务的调用,而不是像Dubbo那样用于RPC。


编写消费者(客户端)

  • 在要使用远程调用的服务上
  • 提供者(服务端)不需要配置

1、依赖

<!--feign客户端依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>


2、开启自动装配

  • 启动类上开启自动装配 @EnableFeignClients
@EnableFeignClients     //开启Feign自动装配


3、编写远程接口 (Feign模块)

  • 根据服务名称查找对应的服务 //远程接口名称
@FeignClient("userservice")  //value:远程服务名称  相当于发起请求, url和远程的提供者要一样 @GetMapping("/user/{id}")
public interface UserClient {

@GetMapping("/user/{id}") //url要和生产者一样
User findById(@PathVariable("id") Long id); //方法名可以不同,但参数要一致

}
   
//这是 生产者 的接口,通过上面进行调用这个接口

@GetMapping("/user/{id}")
public User queryById(@PathVariable("id") Long id) {
}


4、调用请求

@Resource
private UserClient userClient;

//2.使用feign远程调用
User user = userClient.findById(order.getUserId());


自定义配置

image-20230831100130045

日志

  • 两种配置方式:配置文件、java代码

1、在配置文件配置

feign:
client:
config:

default: #全局
logger-level: Basic #日志级别推荐:调试:Full。开发:Basic、none

xxx服务: #局部
logger-level: Basic


2、在java代码中配置

  • 声明一个Bean,类上不加配置类注解
public class DefauleFeignConfiguration {
@Bean
public Logger.Level logLevel(){
return Logger.Level.BASIC;
}
}

  • 局部配置:(在发起请求的类上) 加入到 @FeignClient 注解中
@FeignClient(value = "服务名称",configuration = DefauleFeignConfiguration.class)

  • 全局配置:(启动类上) 加入到 @EnableFeignClients 注解中
@EnableFeignClients(defaultConfiguration = DefauleFeignConfiguration.class)     //开启Feign自动装配


熔断降级

  • 服务降级是服务自我保护的一种方式,或者保护下游服务的一种方式,用于确保服务不会受请求突增影响变得不可用,确保服务不会崩溃

  • 服务降级虽然会导致请求失败,但是不会导致阻塞。


  • 编写降级逻辑
//1.实现降级类
@Component
public class IArticleClientFallback implements IArticleClient {
//降级
@Override
public ResponseResult saveArticle(ArticleDto articleDto) {
return ResponseResult.errorResult(AppHttpCodeEnum.SERVER_ERROR,"获取数据失败");
}
}

  • 远程接口中指向降级代码
//2.指向
//fallback 指向降级实现类
@FeignClient(value = "leadnews-article",fallback = (IArticleClientFallback.class))

  • 扫描降级代码类的包
//3.客户端开启扫描降级类的包
@Configuration
@ComponentScan("com.heima.api.article.fallback")
public class InitConfig {
}

  • 开启服务降级,也可以指定服务响应的超时的时间
//4.配置文件开启服务降级处理

feign:
# 开启feign对hystrix熔断降级的支持
hystrix:
enabled: true
# 修改调用超时时间
client:
config:
default:
connectTimeout: 2000
readTimeout: 2000


性能优化

使用HttpClient

  • 依赖
<!--HttpClient依赖-->
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
</dependency>

  • 配置文件开启
feign:

httpclient:
enabled: true #支持HttpClient的开关
max-connections: 200 #最大连接数
max-connections-per-route: 50 #单个路径的最大连接数


实践方案

1、继承

image-20230831105658885


2、抽取模块

image-20230831105900048

①把需要抽取的类加入到 feign-api 模块(maven)中

  • 创建模块
  • 引入依赖
  • 将 客户端 的 ***client、实体类、feign配置集成到 该模块中
//userclient

@FeignClient(value = "userservice",configuration = DefauleFeignConfiguration.class)
public interface UserClient {

@GetMapping("/user/{id}")
User findById(@PathVariable("id") Long id);

}


②在客户端模块中依赖添加 feign-api 模块依赖

<!--引入fegin的统一api-->
<dependency>
<groupId>cn.itcast</groupId>
<artifactId>feign-api</artifactId>
<version>1.0</version>
</dependency>


③由于当前服务模块启动没有扫描到 feign-api 模块 的 @FeignClient 请求类。无法自动注入

  //调用	
@Resource
private UserClient userClient;

userClient.findById(1112);
  • 需要在服务的启动类中的自动装配Feign的注解中加入 clients 设置扫描指定的包
//方式一 : 指定FeginClient的包
@EnableFeignClients( basePackages = "扫描feign-api模块的clients包")


//方式二 : 指定FeignClient字节码 --- 用哪个使用哪个
@EnableFeignClients( clients = {UserClient.class} )


//方式三 : 扫描fei-api包下的组件注解
@ComponentScan("cn.itcast.feign")

//配置文件--日志输出--扫描
@EnableFeignClients(
clients = {UserClient.class}
defaultConfiguration = DefauleFeignConfiguration.class
)





Eureka【注册中心】

image-20230830105046732

服务提供者启动时向eureka注册自己的信息

服务提供者会每隔30秒向EurekaServer发送心跳请求,报告健康状态

服务消费者利用负载均衡算法,从服务列表中挑选一个



【*】搭建服务中心

1、引入依赖 – server

<!--eureka服务端-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>


2、【*】开启

//自动装配 EnableEurekaServer  开启eureka
@EnableEurekaServer

@SpringBootApplication
public class EurekaApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaApplication.class,args);
}
}


3、配置服务注册

  • 本身就是一个服务,需要注册到eureka中去
server:
port: 10086


#服务注册
spring:
application:
name: eurekaserver #服务名称

eureka:
client:
service-url:
defaultZone: http://127.0.0.1:10086/eureka #eureka地址信息


4、运行界面查看所有注册的服务



【*】服务注册

1、引入依赖client

<!--eureka客户端-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>



2、配置eureka地址:进行服务注册

#服务注册
spring:
application:
name: eurekaserver #当前服务名称

eureka:
client:
service-url:
defaultZone: http://127.0.0.1:10086/eureka #eureka地址信息


总结:

1.服务注册

  • 引入eureka-client依赖

  • 在application.yml中配置eureka地址

2.无论是消费者还是提供者,引入eureka-client依赖

  • 知道eureka地址后,都可以完成服务注册


【*】服务发现

  • 和服务注册使用同一个依赖 和 配置eurake

1、URL

  • url 不采用硬编码,为注册服务的名称
String url = "http://userservice/user/" +order.getUserId();

User user= restTemplate.getForObject(url, User.class);


2、开启负载均衡

  • @LoadBalanced //负载均衡

  • 在发起请求的bena(RestTemplate)中开启,eurake 主动实现

@Bean
@LoadBalanced //负载均衡
public RestTemplate restTemplate(){
return new RestTemplate();
}


饥饿、懒加载

  • 懒加载【默认】eurake运行并不会一次性加载全部Ribbon客户端,首次访问才会去加载LoadBalanceClient,放入缓存中,下次即可快速访问

  • 饥饿加载

    肌饿加载则会在项目启动时创建全部,降低第一次访问的耗时,通过下面配置开启饥饿加载[随意服务处]

ribbon:
eager-load:
enabled: true #开启肌饿加载
clients: userservice #指定肌饿加载的访问名称
- #list
- userservice
- **service





Ribbon负载均衡

  • 概述流程

image-20230830142812543



  • 原理

image-20230830143552370



  • 策略

image-20230830144100292





Ribbon – 负载均衡规则

  • 默认 轮询,可通过 配置类 或者 配置文件 进行自定义规则

1、【方式一】置类更改规则为:随机

@Bean
public IRule randowmIRule(){
return new RandomRule(); //随机规则
}


2、【方式二】配置文件 – 针对服务设置负载均衡规则

调用者

userservice:    #消费者针对生产者服务 userservice 服务配置负载均衡规则
ribbon:
NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RoundRobinRule #负载均衡规则:轮询
userservice:    #消费者针对生产者服务 userservice 服务配置负载均衡规则
ribbon:
NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule #负载均衡规则:本地集群优先





Nacos【注册中心】

Nacos可以用于服务注册、发现、配置管理和健康检查等功能,是微服务架构中的关键组件之一。

Dubbo提供了服务注册、发现、负载均衡和远程调用等功能,使分布式应用程序能够相互通信。

安装

GitHub主页:https://github.com/alibaba/nacos

GitHub的Release下载页:https://github.com/alibaba/nacos/releases


搭建

  • 启动

bin:启动脚本

conf:配置文件

Nacos的默认端口是8848

# 进入bin - cmd  单机
startup.cmd -m standalone

  • 访问

地址:http://127.0.0.1:8848/nacos

默认的账号和密码都是nacos



客户端

父工程:管理版本

<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.2.5.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>

客户端:

<!-- nacos客户端服务发现依赖包 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>


配置文件

spring:
cloud:
nacos:
server-addr: localhost:8848 #nacos服务地址 [*]
discovery:
cluster-name: GZ #集群 [+]
namespace: 309a7db3-c802-4329-94df-2b3b3f6556d9 #命名空间ID [+]
ephemeral: false #是否临时实例 [+]


集群

1、创建多个服务实例

  • 服务上 ctrl + d 复制配置
  • 命名
  • vm设置:-Dserver.port=端口 (避免原端口冲突)


2、配置文件

spring:
cloud:
nacos:
server-addr: localhost:8848 #nacos服务地址

discovery:
cluster-name: GZ #集群

image-20230830155335732



负载均衡规则

  • 默认:轮询 – 即使相同集群 也会轮询访问其他集群中的服务

1、 同集群优先 - NacosRule

调用者配置文件设置负载均衡的IRule为NacosRule,这个规则优先会寻找与自己同集群的服务

userservice:    #针对 userservice 服务配置负载均衡规则
ribbon:
NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule #负载均衡规则:本地集群优先

优先选择同集群服务实例列表

优先同集群,在本地集群的多个服务中随机负载均衡

本地集群找不到提供者,才去其它集群寻找,并且会报警告



2、权重负载均衡

Nacos提供了权重配置来控制访问频率,权重越大则访问频率越高

  • 控制台设置权值

    ①Nacos控制台可以设置实例的权重值,0~1之间
    ②同集群内的多个实例,权重越高被访问的频率越高

    ③权重设置为0侧完全不会被访问



3、环境隔离 - namespace

  • 不同 namespace 下的服务不可见

  • 不同的命名空间内的不同实例互相不可以访问,需要同一个命名空间里面的才可以访问服务


①控制台开启新的命名空间 – 获取命名空间的ID

② 在配置文件写入命名空间的id

spring:
cloud:
nacos:
server-addr: localhost:8848 #nacos服务地址
discovery:
cluster-name: GZ #集群

namespace: 309a7db3-c802-4329-94df-2b3b3f6556d9 #命名空间ID 设置为dev环境


与Eurake区别

image-20230830163957315

1.Nacos与eureka的共同点
①都支持服务注册和服务拉取
②都支持服务提供者跳方式做健康检测

2.Nacos与Eureka的区别
①Nacos支持服务端主动检测提供者状态:临时实例采用心跳模式,非临时实例采用主动检测模式
②临时实例心跳不正常会被剔除,非临时实例则不会被剔除
③Nacos.支持服务列表变更的消息推送模式,服务列表更新更及时
④Nacos:集群默认采用AP方式,当集群中存在非临时实例时,采用CP模式;Eureka采用AP方式






统一配置管理

1、Nacos控制台创建配置

  • 格式推荐yaml全称 : [服务名称]-[环境].[格式]
  • 配置内容:写入需要热更新的内容

image-20230830170540709



2、读取nacos中的配置文件

image-20230830170928890

①引入Nacos的配置管理依赖

<!--引入Nacos的配置管理依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>

② resoure 创建 bootstrap.yml 引导文件,优先级高于 application.yml

  • 服务名 - 环境 - 地址 - 文件后缀名 : 匹配 == userservice-dev.yaml文件
spring:
application:
name: userservice #服务名
profiles:
active: dev #环境
cloud:
nacos:
server-addr: localhost:8848 #nacos地址
config:
file-extension: yaml #文件后缀名


③读取nacos中的配置文件

  • 和读取本地配置文件一样,启动之前会把nacos的配置文件和本地的配置文件合并
//读取的配置数据
@Value("${pattern.dateformat}")
private String dateformat;
@Data
@Component
@ConfigurationProperties(prefix = "pattern")
public class PatterProperties {
private String dateformat; //和配置文件的属性名一致
}


3、配置自动刷新

①**@RefreshScope**

  • 热更新:即不需要项目重启,配置文件在线更改项目热更新读取
  • 通过@Value注解注入,结合@RefreshScope来刷新
@RefreshScope  //Nacos配置文件热更新 -- 在要读取配置文件的类上
public class UserController {
//读取的配置数据
@Value("${pattern.dateformat}")
private String dateformat;
}


②通过**@ConfigurationProperties注入,自动刷新**

@ConfigurationProperties(prefix = "pattern")
public class PatterProperties {
private String dateformat; //和配置文件的属性名一致
}


多环境配置共享

image-20230830205113326

  • [服务名称 spring.application.name] - [环境 spring.profiles.activel].yaml
  • 无论服务是哪个环境的配置文件,按需引入加载,但设置一个不分环境的配置,无论什么环境都会被加载,因此多环境共享配置可以写入这个文件
  • 创建一个 [服务名称 spring.application.name] .yaml

例:

userservice.yaml [必定加载:多环境共享配置]

userservice-dev.yaml [按需加载]

image-20230830202557558



优先级

服务名-profile.yaml > 服务名称.yaml > 本地配置

image-20230830205042963






集群搭建

image-20210409211355037

  • image-20230830221313435

1、配置nacos数据库

  • Nacos默认数据存储在内嵌数据库Derby中,不属于生产可用的数据库。
  • 官方推荐的最佳实践是使用带有主从的高可用数据库集群,这里以单点的数据库为例

https://chen-1317386995.cos.ap-guangzhou.myqcloud.com/Java/nacos.sql



2、配置nacos

  • 进入nacos的conf目录,修改配置文件cluster.conf.example,重命名为cluster.conf:
127.0.0.1:8845
127.0.0.1.8846
127.0.0.1.8847

  • 然后修改application.properties文件,添加数据库配置
spring.datasource.platform=mysql

db.num=1

db.url.0=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
db.user.0=root
db.password.0=123

  • 复制 : 将nacos文件夹复制三份,分别命名为:nacos1、nacos2、nacos3

然后分别修改三个文件夹中的application.properties,

  • 同一电脑需要设置不同的端口

nacos1:

server.port=8845

nacos2:

server.port=8846

nacos3:

server.port=8847

然后分别启动三个nacos节点:

startup.cmd


3、nginx反向代理

  • 代理 Nacos 的 ,从8848端口变成现在的监听端口【80】打开控制台,对集群进行负载均衡

  • 修改conf/nginx.conf文件,配置如下,在http内添加

upstream nacos-cluster {
server 127.0.0.1:8845;
server 127.0.0.1:8846;
server 127.0.0.1:8847;
}

server {
listen 80;
server_name localhost;

location /nacos {
proxy_pass http://nacos-cluster;
}
}

  • 开启
#进入一级目录
start nginx.exe

访问:http://localhost/nacos即可。



4、java代码更改Nacos地址

spring:
cloud:
nacos:
server-addr: localhost:80 # Nacos地址





统一网关 Gateway

  • 对用户请求做身份认证、权限校验
  • 将用户请求路由到微服务,并实现负载均衡
  • 对用户请求做限流

网关为独立服务,需要注册到Nacos中去

image-20230831140815665


1、搭建

  • 创建springboot项目继承
<!-- nacos客户端服务发现依赖包 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!--网关gateway依赖-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

2、配置类

  • 配置application.yml,包括服务基本信息、nacos地址、路由
server:
port: 10010 #设置端口
spring:
application:
name: gateway
cloud:
nacos:
server-addr: localhost:8848 #nacos地址

#####################################################################################################
gateway:
routes:
- id: user-service #路由标示,必须唯一
uri: lb://userservice #路由的目标地址 , lb / http
predicates: #路由断言列表
- Path=/user/** #判断路径是否是以/user开头,如果是则符合


- id: order-service
uri: lb://orderservice
predicates:
- Path=/order/**
- Before=2023-09-20T17:42:47.789-07:00[Asia/Shanghai] #增加自定义断言规则

3、启动项目

访问 :localhost:10010/order/101

实际:localhost:orderservice/order/101



断言工厂

读取用户配置的断言规则,进行设置判断访问条 件

image-20230831141126455

routes:
- id: user-service #路由标示,必须唯一
uri: lb://userservice #路由的目标地址 , lb / http
predicates: #路由断言列表
- Path=/user/** #判断路径是否是以/user开头,如果是则符合


- id: order-service
uri: lb://orderservice
predicates:
- Path=/order/**
- Before=2023-09-20T17:42:47.789-07:00[Asia/Shanghai] #增加自定义断言规则


当前路由过滤器

  • 对路由的请求或响应做加工处理,比如添加请求头
  • 配置在路由下的过滤器只对当前路由的请求生效
gateway:
routes:
- id: user-service #路由标示,必须唯一
uri: lb://userservice #路由的目标地址 , lb / http
predicates: #路由断言列表
- Path=/user/** #判断路径是否是以/user开头,如果是则符合
filters:
- AddRequestHeader=Truth,Itcast is freaking aowsome! #自定义添加请求头 =Key,Value

//接收
public User queryById(@RequestHeader(value = "Truth",required = false) String truth)


全局路由过滤器

  • 使用 default-filters: 全局生效,所有的路由实现
gateway:
routes:
- id: user-service #路由标示,必须唯一
- id: order-service

default-filters: #全局路由过滤器
- AddRequestHeader=Truth,Itcast is freaking aowsome! #自定义添加请求头

读取:

  • Spring 根据 请求头的名字进行 匹配值

  • required(非必空) :true/false

@RequestHeader(value = "Truth",required = false)





全局过滤器 GlobalFilter


1、自定义过滤器

  • 在网关服务中编写过滤器,注册为组件

过滤器优先级:越小越高

1、注解:@Order(-1)

2、实现 Ordered接口 ,实现方法


  • 实现GlobalFilter接口和重写filter方法

例:请求中携带身份信息:authorization=admin

@Order(-1)  //【方式一】:设置当前过滤器优先级
@Component
public class AuthorizeFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

//1.获取请求参数
ServerHttpRequest request = exchange.getRequest();
MultiValueMap<String, String> params = request.getQueryParams();

//2.获取请求参数中的 (authorization) 参数
String auth = params.getFirst("authorization"); //获取第一个匹配

//3.判断是否符合要求
if ("admin".equals(auth)){
//4.满足放行
return chain.filter(exchange); //转发,过滤链,已通过当前过滤器
}

//5.不满足,拦截
//5.1 设置状态码
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);

//拦截请求
return exchange.getResponse().setComplete();

}


//【方式二】:设置当前过滤器优先级
@Override
public int getOrder() {
return -1;
}
}


过滤器执行顺序

image-20230831152129103



跨域

gateway:
routes:

globalcors: # 全局的跨域处理
add-to-simple-url-handler-mapping: true # 解决options请求被拦截问题
corsConfigurations:
'[/**]':
allowedOrigins: # 允许哪些网站的跨域请求
- "http://localhost:5500"
- "http://www.leyou.com"
- "*" #所有
allowedMethods: # 允许的跨域ajax的请求方式
- "GET"
- "POST"
- "DELETE"
- "PUT"
- "OPTIONS"
allowedHeaders: "*" # 允许在请求中携带的头信息
allowCredentials: true # 是否允许携带cookie
maxAge: 360000 # 这次跨域检测的有效期





MQ - 消息队列

  • 同步调用【*】

image-20230901225917645


  • 异步调用

image-20230901230558776

image-20230901231119548






Kafka

  • Topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

安装

  • zookeeper

Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所l以安装Kafka之前必须先安装zookeeper

#zookeeper镜像
docker pull zookeeper:3.4.14

#创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
#kafka镜像
docker pull wurstmeister/kafka:2.12-2.3.1

#创建容器
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1

  • 依赖
<!-- kafkfa -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>


使用

  • 生产者
//1.kafka的配置信息
Properties properties = new Properties();
//kafka的连接地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.238.3:9092");
//发送失败,失败的重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,5);
//消息key的序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//消息value的序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

//2.生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

//封装发送的消息
ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");

//3.发送消息
producer.send(record);

//4.关闭消息通道,必须关闭,否则消息发送不成功
producer.close();
  • 消费者
//1.添加kafka的配置信息
Properties properties = new Properties();
//kafka的连接地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.238.3:9092");
//消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
//消息的反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

//2.消费者对象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

//3.订阅主题
consumer.subscribe(Collections.singletonList("itheima-topic"));

//当前线程一直处于监听状态
while (true) {
//4.获取消息
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key());
System.out.println(consumerRecord.value());
}
}

  • 多个消费者订阅同一个主题只能有一个消费者收到消息(一对一)
//消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
  • 每个消费者的组不同,实现全部收到消息


分区机制

image-20230911165937399



集群

image-20230911170118687



备份机制

image-20230911170534166

image-20230911170624952



生产者

发送类型

  • 同步发送
//3.发送消息
//同步
RecordMetadata recordMetadata = producer.send(record).get();
System.out.println(recordMetadata.offset()); //偏移量

  • 异步发送
//异步
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e!=null){
System.out.println("记录异常信息到日志表中");
}
System.out.println(recordMetadata.offset());
}
});

消息确认机制

  • ack

是否发送成功

image-20230911172046771

//ack配置,消息确认机制
properties.put(ProducerConfig.ACKS_CONFIG,"all");

  • retries

失败重试

//发送失败,失败的重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,5);

  • 消息压缩

image-20230911172902838

//数据压缩
properties.put(ProducerConfig.CONFIG_PROVIDERS_CONFIG,"lz4");


消费者

image-20230911200954980






微服务保护


Sentinel

image-20230923200931724

image-20230923201115506


image-20230923201211023



安装控制台

java -jar sentinel-dashboard-1.8.6.jar
  • 访问 默认8080端口 账号和密码都是 sentinel

image-20230923202542337

  • 修改 在后面加上 -D配置项,单次启动有效


JMeter测试高并发



整合

  • 哪个服务需要则哪个服务都需要导入

  • 依赖

<!--sentinel依赖-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>

  • 配置文件
cloud:
nacos:
sentinel:
transport:
dashboard: localhost:8080

  • 访问端点(接口)即可 触发sentinel监控


限流规则 - 流控

image-20230923210247396

  • 使用JMeter实现并发测试

流控模式

image-20230923213012308


  • QPS : 每秒并发数

直接

image-20230923210448868

单机阈值:1s访问次数

直接设置当前端点的阈值



关联

image-20230923210629244


  • 两个端点 update 和 query ,对query进行流控(关联 update,当update的单机阈值达到5的时候对 query 进行限流)

image-20230923211222345



链路

image-20230923211527407


测试:两个接口都访问一个方法,要监控访问的方法,限流哪个接口来访问这个方法

//-----------------接口共同访问 一个方法

@GetMapping("/query")
public String queryTest(){
orderService.queryGoods();
return "quey";
}
@GetMapping("/save")
public String saveTest(){
orderService.queryGoods();
return "save";
}

//--------------------监控被共同访问的方法
@SentinelResource("goods") //自定义名称
public void queryGoods(){
System.err.println("查询商品");
}

Sentinel默认会将Controller方法做context整合,导致链路模式的流控失效,
需要修改application.yml,添加配置:

sentinel:
transport:
dashboard: localhost:8080
web-context-unify: false #关闭context整合

  • 流控

image-20230923212647631

此时,query访问 good 方法被阻塞,save访问 不会被阻塞



流控效果

image-20230923231207320

image-20230923225731489


Warm up

  • 预热模式:避免冷启动时刻高并发,在预热时间内慢慢达到最大阈值QPS

image-20230923225947143


测试:给资源设置限流,最大QPS为10(单机阈值),预热时长为5s

image-20230923230304224



排队等待

  • 每个请求根据OPS平均时间为一个请求的处理时间,无论是否提早完成,下一个请求都需要等待这个处理时间完成

image-20230923230800038


测试:

image-20230923231027973



热点参数限流

image-20230923231650387

  • 加注解 @SentinelResource(“ “)
@SentinelResource("hot")//自定义名称
@GetMapping("{orderId}")

image-20230923231400843

image-20230923231504543


测试:

image-20230923231559068

image-20230923232008375



隔离和降级

image-20230923232602575


降级

Feign整合Sentinel

客户端保护 : 整合Feign和Sentinel

1、配置文件

feign:
sentinel:
enabled: true #开启Feign的Sentinel功能

2、给FeignClient 编写失败后的降级逻辑

方式一:FallbackClass,无法对远程调用的异常做处理

  • 远程模块:创建类实现降级逻辑
@Slf4j
public class UserClientFallbackFactor implements FallbackFactory<UserClient> {

@Override
public UserClient create(Throwable throwable) {
return new UserClient() {
@Override
public User findById(Long id) {
log.error("查询用户异常",throwable);
return new User();
}
};
}
}

  • 远程模块:代码配置类

把降级逻辑类注册为Bean,类上不加配置类注解

public class DefauleFeignConfiguration {

@Bean
public Logger.Level logLevel(){
return Logger.Level.BASIC;
}

@Bean
public UserClientFallbackFactor userClientFallbackFactor(){
return new UserClientFallbackFactor();
}

}

  • 远程模块:远程接口上注明 fallbackFactory
@FeignClient(
value = "userservice",
configuration = DefauleFeignConfiguration.class,
fallbackFactory = UserClientFallbackFactor.class
)

  • 客户端启动类
@ComponentScan("cn.itcast.feign")		//扫描远程模块的包
@EnableFeignClients(
clients = {UserClient.class},
defaultConfiguration = DefauleFeignConfiguration.class
) //开启Feign自动装配

SpringCloud版本 10 以下

<spring-cloud.version>Hoxton.SR8</spring-cloud.version>

线程隔离

image-20230924011320993

image-20230924011602853



熔断降级

image-20230924012039392

image-20230924013200392


熔断策略 - 慢调用

image-20230924012216661

统计 10 s 内最小10的请求数量 如果超过最大Rt(慢调用)的比例达到50% 触发 熔断


测试:

image-20230924012433745

image-20230924012825026


异常比例、异常数

image-20230924012943344

  • 在代码中出现异常(抛出异常 throw new RuntimeException(“异常提醒,熔断降级”) )


授权规则

  • 例子:判断是否网关转发请求过来

image-20230924013427065

给网关的请求增加请求头,而浏览器请求没有特定的请求头

  • 网关 : 利用网关的过滤器 添加名为 gateway的origin 头

image-20230924014103203

#配置文件
spring:
cloud:
gateway:
default-filters:
- AddRequestHeader=origin,gateway #自定义添加请求头 名,值

  • 服务 : 设置一个bean,获取request中名为origin的请求头
@Component
public class HeaderOriginParser implements RequestOriginParser {
@Override
public String parseOrigin(HttpServletRequest httpServletRequest) {
//1.获取请求头
String origin = httpServletRequest.getHeader("origin");

//2.非空判断
if (StringUtils.isEmpty(origin)){
origin = "blank";
}

return origin;
}
}

  • 直接访问访问且不带请求头的响应异常 : 限流异常

image-20230924143411857


  • 自定义异常

image-20230924143514852

image-20230924143550478


  • 定义bean,实现 BlockExceptionHandler 接口
@Component
public class SentinelExceptionHandler implements BlockExceptionHandler {
@Override
public void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception {
String msg = "未知异常";
int status = 429;

if (e instanceof FlowException) {
msg = "请求被限流了";
} else if (e instanceof ParamFlowException) {
msg = "请求被热点参数限流";
} else if (e instanceof DegradeException) {
msg = "请求被降级了";
} else if (e instanceof AuthorityException) {
msg = "没有权限访问";
status = 401;
}

response.setContentType("application/json;charset=utf-8");
response.setStatus(status);
response.getWriter().println("{\"msg\": " + msg + ", \"status\": " + status + "}");
}
}


规则持久化


原始模式

  • 默认保存在缓存中,服务重启消失

pull 模式

  • 保存在本地文件或数据库,定时去读取

image-20230924144204484


push 模式

  • 保存在nacos,监听变更实时更新

image-20230924144230815


搭建

md文档https://chen-1317386995.cos.ap-guangzhou.myqcloud.com/Java/Utils/sentinel%E8%A7%84%E5%88%99%E6%8C%81%E4%B9%85%E5%8C%96.md


依赖

<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-datasource-nacos</artifactId>
</dependency>

服务配置

sentinel:
datasource:
flow: #限流
nacos:
server-addr: 192.168.238.3:8848 # nacos地址
dataId: orderservice-flow-rules #限流规则
groupId: SENTINEL_GROUP
rule-type: flow # 还可以是:degrade(#降级)、authority、param-flow
degrade: #降级
nacos:
server-addr: 192.168.238.3:8848 # nacos地址
dataId: orderservice-degrade-rules #限流规则
groupId: SENTINEL_GROUP
rule-type: degrade # 还可以是:degrade(#降级)、authority、param-flow

重启服务 – 》

修改sentienl 源码

sentinel修改持久化jar包

  • 如果nacos地址不是本地8848:java -jar -Dnacos.addr=192.168.238.3:8848 sentinel-dashboard.jar





分布式事务

image-20230924153804080


Base理论

image-20230924154747853



Seata

image-20230924155910068

image-20230924201115527


部署TC服务

sentinel的部署和集成md


微服务集成Seata


1、依赖

<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<!--版本较低,1.3.0,因此排除-->
<exclusion>
<artifactId>seata-spring-boot-starter</artifactId>
<groupId>io.seata</groupId>
</exclusion>
</exclusions>
</dependency>
<!--seata starter 采用1.4.2版本-->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>${seata.version}</version>
</dependency>


2、配置文件

image-20230924161755796

seata:
registry: # TC服务注册中心的配置,微服务根据这些信息去注册中心获取tc服务地址
# 参考tc服务自己的registry.conf中的配置
type: nacos
nacos: # tc
server-addr: 192.168.238.3:8848
namespace: ""
group: DEFAULT_GROUP
application: seata-tc-server # tc服务在nacos中的服务名称
username: nacos
password: nacos
tx-service-group: seata-demo # 事务组,根据这个获取tc服务的cluster名称
service:
vgroup-mapping: # 事务组与TC服务cluster的映射关系
seata-demo: SH

  • 启动服务进行注册到seata

image-20230924164427276



XA模式

  • 统一提交

image-20230924165001043

image-20230924165249497

image-20230924165417686


  • 实践

image-20230924165509538

1、配置文件

seata:
data-source-proxy-mode: XA #开启数据源代理的XA模式

2、全局事务入口方法 上添加注解 - 实现全部服务回滚(需要在方法内抛出异常)

@GlobalTransactional  //分布式事务回滚
public Long create(Order order) {
try {
} catch (FeignException e) {
throw new RuntimeException(e.contentUTF8(), e);
}
}


AT模式

  • 先提交记录快照

image-20230924184305901

image-20230924184404456


  • 脏写问题

当事务1和事务二要进行操作时,

事务1:先拿到DB锁,先保存快照->执行业务->提交->释放DB锁

事务2:拿到DB锁,先保存快照->执行业务->提交->释放DB锁

当事务1需要回滚的时候,由于保存的快照是最开始的快照,回到这个阶段,则事务二所有的操作无效

image-20230924185133059


  • 解决脏写 – 全局锁

image-20230924185626282

image-20230924185755198

image-20230924185919677


  • 实践

1、配置文件

seata:
data-source-proxy-mode: AT #开启数据源代理的XA模式

2、全局事务入口方法 上添加注解 - 实现全部服务回滚(需要在方法内抛出异常)

@GlobalTransactional  //分布式事务回滚
public Long create(Order order) {
try {
} catch (FeignException e) {
throw new RuntimeException(e.contentUTF8(), e);
}
}





TCC模式

image-20230924191653725

image-20230924191901194

image-20230924192016912


实践:

image-20230924192240033

image-20230924192637038


image-20230924192922803

1、定义接口 – 和旧业务的接口一样(需要分布式事务功能的接口)

@LocalTCC
public interface AccountTCCService {

//try (try名称,confirm名称,cancel名称)
@TwoPhaseBusinessAction(name = "deduct",commitMethod = "confirm",rollbackMethod = "cancel")
void deduct(@BusinessActionContextParameter(paramName = "userId") String userId,
@BusinessActionContextParameter(paramName = "money") int money);

boolean confirm(BusinessActionContext context);

boolean cancel(BusinessActionContext context);

}

2、实现类 – 替换之前的业务(需要分布式事务的业务)

@Component
public class AccountTCCServiceImpl implements AccountTCCService {


@Resource
private AccountMapper accountMapper;

@Resource
private AccountFreezeMapper accountFreezeMapper;

//资源检测和预留
@Override
public void deduct(String userId, int money) {
//0.获取事务id
String xid = RootContext.getXID();

//0-1.判断freeze中是否有冻结记录,如果有,一定是CANCEL执行过,我要拒绝业务
AccountFreeze oldFreeze = accountFreezeMapper.selectById(xid);
if (oldFreeze != null){
//CANCEL执行过,拒绝业务
return;
}

//1.扣减资源
accountMapper.deduct(userId,money);

//2.冻结资源,事务状态
AccountFreeze accountFreeze = new AccountFreeze();
accountFreeze.setUserId(userId);
accountFreeze.setFreezeMoney(money);
accountFreeze.setState(AccountFreeze.State.TRY);
accountFreeze.setXid(xid);

accountFreezeMapper.insert(accountFreeze);

}

//提交
@Override
public boolean confirm(BusinessActionContext context) {
//1.获取事务id
String xid = context.getXid();

//2.根据id删除冻结记录
int count = accountFreezeMapper.deleteById(xid);

return count == 1;
}

//回滚
@Override
public boolean cancel(BusinessActionContext context) {
//查询冻结记录
String xid = context.getXid();
String userId = context.getActionContext("userId").toString();
AccountFreeze accountFreeze = accountFreezeMapper.selectById(xid);

//0.空回滚判断:判断freeze是否为null,为null则证明try没执行,需要空回滚
if (accountFreeze == null){
//证明try没执行,需要空回滚
accountFreeze = new AccountFreeze();
accountFreeze.setUserId(userId);
accountFreeze.setFreezeMoney(0);
accountFreeze.setState(AccountFreeze.State.CANCEL);
accountFreeze.setXid(xid);
accountFreezeMapper.insert(accountFreeze);
return true;
}
//0-1.幂等判断
if (accountFreeze.getState() == AccountFreeze.State.CANCEL){
//已经处理过一次cancel了,无需重复处理
return true;
}

//1.恢复可以余额
accountMapper.refund(accountFreeze.getUserId(),accountFreeze.getFreezeMoney());

//2.将冻结金额清零,状态改为CANCEL
accountFreeze.setFreezeMoney(0);
accountFreeze.setState(AccountFreeze.State.CANCEL);
int count = accountFreezeMapper.updateById(accountFreeze);
return count == 1;

}
}





Saga模式

image-20230924201006828






高可用-集群

sentinel的集群部署md

image-20230924201521894