首页 国际新闻正文

王思聪是谁,Spring Cloud Stream 进行服务之间的通讯,甜茶

Spring Cloud Stream

Srping cloud Bus的底层完结便是Spring Cloud Stream,Spring Cloud Stream的意图是用于构建依据音讯驱动(或事情驱动)的微服务架构。Spring Cloud Stream自身对Spring Messaging、Spring Integration、Spring Boot Actuator、Spring Boot External无界一点通官网ized Configuration等模块进行封装(整合)和扩展,下面咱们完结两个服务之间的通讯来演示Spring Cloud Stream的运用方法。

全体概述

服务要想与其他服务通讯要界说通道,一般会界说输出通道和输入通道,输出通道用于发送音讯,输入通道用于接纳音讯,每个通道都会有个姓名(输入和输出仅仅通道类型,能够用不同的姓名界说许多许多通道),不同通道的姓名不能相同不然会报错(输入通道和输出通道不同类型的通道称号也不能相同),绑定器是操作RabbitMQ或Kafka的笼统层,为了屏蔽操作这些音讯中间件的复杂性和不一致性,绑定器会用通道的姓名在音讯中间件中界说主题,一个主题内的音讯出产原生态法力者来自多个服务,一个主题内音讯的顾客也是多个服务,也便是说音讯的发布和消费是经过主题进行界说和组丁汉白织的,通道的姓名便是主题的姓名,在RabbitMQ中主题运用Exchanges完结,在Kafka中主题运用Topic完结。

预备环境

创立两个项目spring-cloud-stream-a和spring-cloud-stream-b,spring-cloud-stream-a咱们用Spring Cloud Stream完结通讯,spri王思聪是谁,Spring Cloud Stream 进行服务之间的通讯,甜茶ng-cloud-stream-b咱们用Spring Cloud Stream的底层模块Spring Integration完结通讯。

两个项意图POM文件依靠都是:



org.springframework.cloud
spring-cloud-stream


org.springframework.cloud
spring-cloud-stream-binder-rabbit


org.springframework.boo王思聪是谁,Spring Cloud Stream 进行服务之间的通讯,甜茶t
spri蔡英挺最新去向ng-boot-starter-test
test


org.springframework.cloud
spring-cloud-stream-test-support
test


spring-cloud-stream-binder-rabbit是指绑定器的完结运用RabbitMQ。

项目装备内容application.properties:

spring.y3290application.name=spring-cloud-stream-a
server.port=9010
#设置默许绑定器
spring.cloud.stream.defaultBinder = rabbit
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbit亚弗戈蒙mq.username=guest
spring.rabbitmq.password=guest
spr雷振球ing.application.name=spring-cloud-stream-b
server.port=9011
#设置默许绑定器
spring.cloud.stream.defaultBinder = rabbit
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabirvuebitmq.password=guest

发动一个rabbitmq:

docker pull rabbitmq:3-management
docker run -d --hostname my-rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

编写A项目代码

在A项目中界说一个输王思聪是谁,Spring Cloud Stream 进行服务之间的通讯,甜茶入通道一个输出通道,界说通道在接口中运用@Input和@Output注解界说,程序发动的时分Spring Cloud Stream会依据接口界说将完结类主动注入(Spring Cloud Stream主动完结该接口不王思聪是谁,Spring Cloud Stream 进行服务之间的通讯,甜茶需求写代码)。

A服务输入通道,通道称号ChatExch四亿名牌女anges.A.Input,接口界说输入通道有必要回来SubscribableChannel:

public interface ChatInput {
String INPUT = "ChatExchanges.A.Input";
@Input(ChatInput.INPUT)
SubscribableC风暴兵王hannel input();
}

A服务输出通道,通道称号ChatExchanges.A.Output,输出通道有必要回来MessageChannel:

public interface ChatOutput {
String OUTPUT = "ChatExchanges.A.Output";
@Output(ChatOutput.OUTPUT)
MessageChannel output();
}

界说音讯实体类:

public class ChatMessage implements Serializable {
private String name;
private String message;
private Date chatDate;
//没有无参数的结构函数并行化会犯错
private ChatMessage(){}
public ChatMessage(String name,String messa王思聪是谁,Spring Cloud Stream 进行服务之间的通讯,甜茶ge,Date chatDate){
this.name = name;
this.message = message;
this.chatDate = chatDate;
}
public String getName(){
return this.name;
}
public String g公交顶etMessage(){
return this.message;
}
public Date getChatDate() { return this.chatDate; }
public String ShowMessage(){
return String.format("谈天音讯:%s的时分,%s说%s。",this.chatDate,this.name,this.message)陈纳;
}
}

在事务处理类上用@EnableBinding注解绑定输入通道和输出通道,这个绑定动作其实便是创立并注册输入近邻小姐姐和输出通道的完结类到Bean中,所以能够直接是运用@Autowired进行注入运用,别的音讯的串行化默许运用application/json格局(com.fastexml.jackson),最后用@StreamListener注解进行指定通道音讯的监听:

//ChatInput.class的输入通道不在这儿绑定,监听到数据会找不到AClient类的引证。
//Input和Output通道界说的姓名不能相同,不然程序发动会抛反常。
@EnableBinding({ChatOutput.class,ChatInput.class})
public class AClient {
private static Logger logger = LoggerFactory.getLogger(AClient.class);
@Autowired
private ChatOutput chatOutput;
//StreamListener自带了Json转目标的才能,收到B的音讯打印并回复B一个新的音讯。
@StreamListener(ChatInput.INPUT)
public void PrintInput(ChatMessage message) {
logger.info(message.ShowMessage());
ChatMessage replyMessage = new ChatMessage("ClientA","A To B Message.", new Date());
chatOutput.output().send(MessageBuilder.withPayload(replyMessage).build());
}
}

到此A项目代码编写完结。

编写B项目代码

B项目运用Spring Integration完结音讯的发布和消费,界说通道时咱们要交流输入通道和输出通道的称号:

public interface ChatProcessor {
String OUTPUT = "ChatExchanges.A.Input";
String INPUT = "ChatExchanges.A.Output";
@Input(ChatProcessor.INPUT)
SubscribableChannel input();
@Output(ChatProcessor.OUTPUT)
MessageChannel output();
}

音讯实体类:做受

public class ChatM试开城际轻轨essage {
private String name;
private String message;
private Date chatDate;
//没有无参数的结构函数并行化会犯错
private ChatMessage(){}
public ChatMessage(String name,String message,Date chatDate){
this.name = name;
this.message = message;
this.chatDate = 王思聪是谁,Spring Cloud Stream 进行服务之间的通讯,甜茶chatDate;
}
public String getName(){
return this.name;
}
public String ge撸奶奶tMessage(){
return this.message;
}
public Date getChatDate() { return this.chatDate; }
public String ShowMessage(){
return String.format("谈天音讯:%s的时分,%s说%s。",this.chatDate,this.name,this.message);
}
}

事务处理类用@ServiceActivator注解替代@StreamListener,用@InboundChannelAdapter注解发布音讯:

@EnableBinding(ChatProcessor.class)
public class BClient {
private static Logger logger = LoggerFactory.getLogger(BClient.class);
//@ServiceActivator没有Json转目标的才能需求凭借@Transformer注解
@ServiceActivator(inputChannel=ChatProcessor.INPUT)
public void PrintInput(ChatMessage message) {
logger.info(message.ShowMessage());
}
@Transformer(inputChannel = ChatProcessor.INPUT,outputChannel = ChatProcessor.INPUT)
public ChatMessage transform(String message) throws Exception{
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(m朴映宣essage,ChatMessage.class);
}
//每秒宣布一个音讯给A
@Bean
@InboundChannelAdapter(value = ChatProcessor.OUTPUT,poller = @Poller(fixedDelay="1000"))
public GenericMessage SendChatMessage王思聪是谁,Spring Cloud Stream 进行服务之间的通讯,甜茶(){
ChatMessage断椎 message = new ChatMessage("ClientB","B To A Message.", new Date());
GenericMessage gm = new GenericMessage<>(message);
return gm;
}
}

运转程序

发动A项目和B项目:

源码

Github库房:https://gi刘雯刚thub.com/sunweisheng/spring-cloud-example

原文:https://www.cnblogs.com/bluersw/p/11675139.html

版权声明

本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。