Elim的博客

用来记录一些原创性的总结


Elim

基于RocketMQ的Stream实现

Spring Cloud Stream是一个消息收发的框架,它提供了一套标准,应用程序只需要按照它的标准进行消息的收发,而不用关注具体的实现机制。具体的实现可以基于不同的消息中间件进行不同的实现,比如Kafka的实现、RabbitMQ的实现、RocketMQ的实现等。官方已经提供了Kafka和RabbitMQ的实现,RocketMQ的实现由Alibaba负责。本文将基于RocketMQ的实现进行讲解。

Binder和Binding

org.springframework.cloud.stream.binder.Binder是Spring Cloud对消息容器的抽象,不同的消息容器有不同的实现,通过它可以屏蔽各消息容器的内部细节。

public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {

	/**
	 * Bind the target component as a message consumer to the logical entity identified by
	 * the name.
	 * @param name the logical identity of the message source
	 * @param group the consumer group to which this consumer belongs - subscriptions are
	 * shared among consumers in the same group (a <code>null</code> or empty String, must
	 * be treated as an anonymous group that doesn't share the subscription with any other
	 * consumer)
	 * @param inboundBindTarget the app interface to be bound as a consumer
	 * @param consumerProperties the consumer properties
	 */
	Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);

	/**
	 * Bind the target component as a message producer to the logical entity identified by
	 * the name.
	 * @param name the logical identity of the message target
	 * @param outboundBindTarget the app interface to be bound as a producer
	 * @param producerProperties the producer properties
	 */
	Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);

}

Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。

使用Stream

使用Spring Cloud Stream时需要在pom.xml中引入一个Spring Cloud Stream的具体实现,笔者选择的是RocketMQ的实现。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

然后需要通过在配置类上使用@EnableBinding指定需要使用的Binding,它指定的是一个接口,在对应的接口中会定义一些标注了@Input@Output的方法,它们就对应一个Binding。@Output对应的是org.springframework.messaging.MessageChannel@Input对应的是org.springframework.messaging.SubscribableChannel。Spring Cloud内置的@EnableBinding可使用的接口有org.springframework.cloud.stream.messaging.Sourceorg.springframework.cloud.stream.messaging.Sinkorg.springframework.cloud.stream.messaging.Processor。Source的定义如下,它定义了一个OUTPUT类型的Binding,名称为output,当不通过@Output指定Binding的名称时,默认会使用方法名作为Binding的名称。

public interface Source {

	String OUTPUT = "output";
	
	@Output(Source.OUTPUT)
	MessageChannel output();

}

Sink的定义如下,它定义了一个INPUT类型的Binding,名称为input,当不通过@Input指定Binding的名称时,默认会使用方法名作为Binding的名称。

public interface Sink {

	String INPUT = "input";

	@Input(Sink.INPUT)
	SubscribableChannel input();

}

在一个接口中可以同时定义多个Binding,只需要定义多个@Input@Output标注的方法。Processor接口同时继承了Source和Sink接口,所以当@EnableBinding指定了Processor接口时相当于同时应用了两个Binding。在下面代码中我们指定了@EnableBinding接口为Source接口,即启用了名称为output的OUTPUT类型的Binding。Spring Cloud会自动实现该Binding的实现,也会提供Binding接口的实现,并注册到bean容器中。即可以在程序中自动注入Source类型的bean,也可以注入MessageChannel类型的bean。

@EnableBinding(Source.class)
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    
}

OUTPUT类型的Binding是用来发消息的,Spring Cloud会自动提供@EnableBinding指定接口的实现,所以在需要发送消息的时候我们可以直接注入Source类型的bean,然后通过Source的output()获取MessageChannel实例,通过它的send()方法进行消息发送。

@Component
public class SourceProducer {

    @Autowired
    private Source source;
    
    public void sendMessages(String msg) {
        String payload = msg;
        Map<String, Object> headers = new HashMap<>();
        headers.put(MessageConst.PROPERTY_TAGS, "testTag");
        MessageHeaders messageHeaders = new MessageHeaders(headers);
        Message<String> message = MessageBuilder.createMessage(payload, messageHeaders);
        this.source.output().send(message);
    }
    
}

那发送的消息究竟会发送到哪里呢?这就需要我们来定义对应的Binding和实际消息容器的生产者的映射了。可以通过spring.cloud.stream.bindings.<bindingName>.*的形式定义Binding的一些属性,可以定义的属性可参考org.springframework.cloud.stream.config.BindingProperties。这里我们通过其destination属性指定该Binding对应的实际的目的地,对应于RocketMQ就是一个Topic,即我们上面发送的消息将发到RocketMQ的名为test-topic的Topic。

spring.cloud.stream.bindings.output.destination=test-topic

在测试的时候可以在启动RocketMQ时指定autoCreateTopicEnable=true以开启自动创建Topic的功能,如mqbroker -n localhost:9876 autoCreateTopicEnable=true

RocketMQ是需要指定NameServer的,所以在发送消息前,还需要基于RocketMQ这个Binder配置其NameServer的地址。

spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876

在启动了RocketMQ的NameServer和Broker之后,就可以利用上面的代码进行消息发送了。测试代码如下。

@RunWith(SpringRunner.class)
@SpringBootTest(classes=Application.class)
public class SourceProducerTest {

    @Autowired
    private SourceProducer producer;
    
    @Test
    public void test() {
        for (int i=0; i<10; i++) {
            this.producer.sendMessages("Message-" + i);
        }
    }
    
}

也可以直接注入Binding对应的MessageChannel进行消息发送,但是一个应用中可能有多个MessageChannel,尤其是会有Spring Cloud自动建立的用于异常处理的MessageChannel,所以在注入MessageChannel时需要指定bean的名称,默认是与Binding的名称一致,所以这里我们指定的是output。

@Component
public class SourceProducer {

    @Autowired
    @Qualifier(Source.OUTPUT)
    private MessageChannel messageChannel;
    
    public void sendMessages(String msg) {
        String payload = msg;
        Map<String, Object> headers = new HashMap<>();
        headers.put(MessageConst.PROPERTY_TAGS, "testTag");
        MessageHeaders messageHeaders = new MessageHeaders(headers);
        Message<String> message = MessageBuilder.createMessage(payload, messageHeaders);
        this.messageChannel.send(message);
    }
    
}

接收消息

接收消息也需要定义相应的Binding,也需要通过@EnableBinding进行指定。Spring Cloud的Sink接口中已经定义好一个名为input的Binding,如果只需要一个接收Binding,可以直接拿来用。

@EnableBinding(Sink.class)
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    
}

作为消费者的Binding也必须指定对应的目的地,还必须指定一个消费者分组group,相同group的消费者可以共同消费相同destination的消息,分担压力,比如一个作为消费者的应用部署了三份,它们的group都是一样的,如果来了三条消息,那么可能三台应用都分别消费了其中的一条消息。而如果部署三份的group都不一样,则每台应用都将消费全部的三条消息。

spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.group=test-group

作为消费者的应用也需要定义Binder的相关信息,如spring.cloud.stream.rocketmq.binder.namesrv-addr=127.0.0.1:9876

消费者需要在方法上通过@StreamListener进行标注,表示它将监听消费某个Binding的消息。对应的Binding可以通过@StreamListener的value或target属性进行指定。比如下面的代码指定了消费者对应的Binding是名为input的Binding。而根据上面的配置该Binding对应的destination是test-topic,对于RocketMQ来说就是从名为test-topic的Topic获取消息。

@Component
@Slf4j
public class SinkConsumer {

    @StreamListener(Sink.INPUT)
    public void inputConsumer(String message) {
        log.info("从Binding-{}收到信息-{}", Sink.INPUT, message);
    }
    
}

自定义Binding声明接口

除了使用Spring Cloud默认提供的Sink、Source和Processor接口外,用户还可以根据需要新建自己的Binding接口。下面的代码中就自定义了一个声明Binding的接口,其中声明了两个Binding,名为input1的INPUT类型的Binding和名为output1的OUTPUT类型的Binding。

public interface CustomBinding {

    String INPUT1 = "input1";
    String OUTPUT1 = "output1";
    
    @Input
    SubscribableChannel input1();
    
    @Output
    MessageChannel output1();
    
}

定义@Input@Ouput时如果没有自定Binding的名称,默认获取当前方法的名称作为Binding的名称。

应用的时候跟其它Binding接口一样通过@EnableBinding进行声明,其它的配置方式等都是一样的。

@EnableBinding({CustomBinding.class, Sink.class})
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    
}

发送和接收对象

Spring Cloud Stream默认在接收和发送消息时对应的消息格式类型都是JSON格式,可以通过Binding的contentType属性进行指定,如spring.cloud.stream.bindings.output.content-type=application/json。当发送和接收消息时都会被org.springframework.messaging.converter.MessageConverter进行转换。

现假设我们有如下这样一个User类型,我们期望发送的消息就是发送的User类型。

@Data
public class User {

    private Long id;
    private Date createTime = new Date();
    private String name;
    
}

那我们的发送消息代码可以类似于如下这样写,下面的代码中往名为output1这个Binding中发送的消息就是User类型的。

@Component
public class UserProducer {

    @Autowired
    @Qualifier(CustomBinding.OUTPUT1)
    private MessageChannel messageChannel;
    
    public void sendUser(User user) {
        Map<String, Object> headers = new HashMap<>();
        headers.put(MessageConst.PROPERTY_TAGS, "userTag");
        MessageHeaders messageHeaders = new MessageHeaders(headers);
        Message<User> message = MessageBuilder.createMessage(user, messageHeaders);
        this.messageChannel.send(message);
    }
    
}

如果你的消费者端的代码还像以前一样写,接收的是String类型的,你会发现接收到的字符串是JSON格式的,因为发送端默认会把对象转换为JSON格式进行发送。

@Component
@Slf4j
public class UserConsumer {

    @StreamListener(CustomBinding.INPUT1)
    public void consumeUser(String user) {
        log.info("从Binding-{}收到User类型的消息-{}", CustomBinding.INPUT1, user);
    }
    
}

这里也可以直接以User类型进行接收,此时Spring Cloud将自动将接收到的JSON字符串转换为消费者方法的入参对象,比如下面这样。

@Component
@Slf4j
public class UserConsumer {

    @StreamListener(CustomBinding.INPUT1)
    public void consumeUser(User user) {
        log.info("从Binding-{}收到User类型的消息-{}", CustomBinding.INPUT1, user);
    }
    
}

上面应用的名为output1和input1的Binding的配置如下。

spring.cloud.stream.bindings.output1.destination=test-topic1

spring.cloud.stream.bindings.input1.destination=test-topic1
spring.cloud.stream.bindings.input1.group=test-group1

自定义MessageConverter

Spring Cloud Stream在进行对象和JSON转换时默认使用的是org.springframework.messaging.converter.MappingJackson2MessageConverter。如果有需要你也可以实现自己的MessageConverter。在实现自定义的MessageConverter时通常不直接实现MessageConverter接口,而是继承org.springframework.messaging.converter.AbstractMessageConverter,然后重写其supports(..)convertFromInternal(..)convertToInternal。比如下面的代码中实现了一个只能转换User对象的MessageConverter,底层使用的是FastJson,在进行发送消息时重置了user的name属性。

public class UserMessageConverter extends AbstractMessageConverter {

    public UserMessageConverter() {
        super(MimeType.valueOf("application/json"));
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return clazz.equals(User.class);
    }

    @Override
    protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
        Object payload = message.getPayload();
        if (payload instanceof byte[]) {
            return JSON.parseObject((byte[])payload, targetClass);
        }
        return JSON.parseObject(payload.toString(), targetClass);
    }

    @Override
    protected Object convertToInternal(Object payload, MessageHeaders headers, Object conversionHint) {
        User user = (User) payload;
        user.setName("Converted by UserMessageConverter-------" + user.getName());
        return JSON.toJSONString(user);
    }

}

然后为了使它生效,我们需要把它定义为一个bean,并标注@StreamMessageConverter,比如下面这样。

@EnableBinding(CustomBinding.class)
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    
    @Bean
    @StreamMessageConverter
    public UserMessageConverter userMessageConverter() {
        return new UserMessageConverter();
    }
    
}

如果在转换为JSON时不希望使用默认的Jackson实现,而希望使用Alibaba的FastJson也是可以的。FastJson已经提供了MessageConverter的实现类com.alibaba.fastjson.support.spring.messaging.MappingFastJsonMessageConverter。所以如果希望使用FastJson的实现,只需要进行类似如下这样的定义。

@EnableBinding(CustomBinding.class)
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    
    @Bean
    @StreamMessageConverter
    public MappingFastJsonMessageConverter mappingFastJsonMessageConverter() {
        return new MappingFastJsonMessageConverter();
    }
    
}

Spring Cloud已经内置实现了一些MessageConverter(可以通过IDE工具查看对应的实现类),当用户指定了自定义的MessageConverter时,自定义的MessageConverter将加入到列表的最前面,即将优先使用自定义的MessageConverter进行消息的转换。

基于自定义的MessageConverter,用户也可实现自定义的MimeType。

异常处理

在接收消息时,如果消息处理失败,Spring Cloud会把失败的消息转到名为<destination>.<group>.errors的Channel,并可通过@ServiceActivator方法进行接收。比如有如下Binding定义。

spring.cloud.stream.bindings.input1.destination=test-topic1
spring.cloud.stream.bindings.input1.group=test-group1

当消息消费失败时将转发包装了失败原因的消息到名为test-topic1.test-group1.errors的Channel,我们可以通过在某个bean中定义一个@ServiceActivator方法处理相应的异常。

@ServiceActivator(inputChannel = "test-topic1.test-group1.errors")
public void handleConsumeUserError(ErrorMessage message) {
    log.info("收到处理失败的消息{}", message.getPayload());
}

上面介绍的方法是处理某个特定Binding的消息消费异常的,如果你的消息消费异常的处理方式都是一样的,你可能希望有一个统一的入口来处理所有的消息消费异常,而不用管当前的Binding是什么。Spring Cloud Stream也考虑到了这一点,它提供了一个名为errorChannel的Binding,所有的消息消费异常都会转发到该Binding,所以如果我们想有一个统一的处理所有的消息消费异常的入口则可以定义一个Binding名为errorChannel的@StreamListener方法。

@StreamListener("errorChannel")
public void handleErrors(ErrorMessage message) {
    log.info("默认的消息失败处理器收到处理失败的消息: {},headers:{}", message.getOriginalMessage(), message.getHeaders());
}

重试机制

Spring Cloud Stream在进行消息的接收处理时也是利用Spring Retry进行了包装的。当消息消费失败时默认会最多试3次(加上第一次),使用的是Spring Retry的RetryTemplate的默认配置。如果默认的重试逻辑不能满足你的需求,你也可以定义自己的RetryTemplate,但是需要使用@StreamRetryTemplate进行标注(StreamRetryTemplate上标注了@Bean)。比如下面的代码中就应用了自定义的RetryTemplate,指定最多尝试5次的消息消费,尝试5次后仍然失败将走前面介绍的异常处理逻辑,即投递消息到相应的异常处理的Channel。

@EnableBinding(CustomBinding.class)
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    
    @StreamRetryTemplate
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy(5));
        return retryTemplate;
    }
    
}

最多尝试次数也可以通过Binding的consumer.maxAttempts参数进行指定,比如如果需要指定名为input1的Binding在消费某条消息时最多允许尝试5次,则可以进行如下定义。如果将该属性值定义为1,则表示不允许进行重试。

spring.cloud.stream.bindings.input1.consumer.maxAttempts=5

重试的初始时间间隔默认是1秒,之后依次翻倍,最大时间间隔是10秒,即第一次重试的间隔时间是1秒,第二次是2秒,第三次是4秒,以此类推,最大时间间隔超过10秒后取10秒。下面的配置定义了重试的初始时间间隔是500豪秒,每次翻3倍,最大的间隔时间是8秒,即第一次的重试间隔是500毫秒,第二次是1500毫秒,第三次是4500毫秒,第四次以后是8000毫秒。

spring.cloud.stream.bindings.input1.consumer.backOffInitialInterval=500
spring.cloud.stream.bindings.input1.consumer.backOffMultiplier=3
spring.cloud.stream.bindings.input1.consumer.backOffMaxInterval=8000

使用了自定义的RetryTemplate后,通过配置文件指定的重试配置都将失效。

定制消费者线程数

默认情况下,每个Binding对应的消费者线程数是1,可以通过spring.cloud.stream.bindings.<bindingName>.consumer.concurrency属性进行指定,比如下面的配置就指定了名称为input1的Binding的消费者线程是3,即Spring Cloud Stream将同时启动3个线程用于从名为input1的Binding进行消费。

spring.cloud.stream.bindings.input1.consumer.concurrency=3

更多关于consumer可配置的参数可以参考org.springframework.cloud.stream.binder.ConsumerProperties

边接收边发送

所谓的边接收边发送是指接收到消息经过处理后可以产生新的消息,然后允许通过配置指定新的消息的发送目的地。比如下面的代码就定义了从名为input的Binding接收消息,经过处理(加了前缀receiveAndSend:)后再返回,然后经过方法上的@SendTo指定返回的内容将发送到名为output的Binding。

@StreamListener(Sink.INPUT)
@SendTo(Source.OUTPUT)
public String receiveAndSend(String message) {
    return "receiveAndSend:" + message;
}

也可以通过@Output指定返回的内容将发送到的Binding的名称。

@StreamListener(Sink.INPUT)
@Output(Source.OUTPUT)
public String receiveAndSend(String message) {
    return "receiveAndSend:" + message;
}

Binding的配置还是类似的,比如下面这样,即从名为test-topic2的destination接收消息,经过处理后发送到名为test-topic的destination。

spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.input.destination=test-topic2
spring.cloud.stream.bindings.input.group=test-group2

Reactive支持

Spring Cloud Stream也提供了对Reactive的支持,为了使用Reactive需要加入spring-cloud-stream-reactive依赖。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-reactive</artifactId>
</dependency>

通过reactive方式接收消息时方法参数需要定义为Flux,然后就可以基于Flux进行消息处理了。

@StreamListener(Sink.INPUT)
public void reactiveReceive(Flux<String> flux) {
    flux.subscribe(message -> {
        log.info("通过reactive方式收到信息: {}", message);
    });
}

也可以在方法参数上使用@Input来指定对应的INPUT类型的Binding的名称,比如下面这样。

@StreamListener
public void reactiveReceive(@Input(Sink.INPUT) Flux<String> flux) {
    flux.subscribe(message -> {
        log.info("通过reactive方式收到信息: {}", message);
    });
}

如果需要边接收边发送,则接收消息的方法的返回值类型也需要定义为Flux,然后在方法上通过@Output@SendTo指定返回结果需要发送到的Binding的名称。

@StreamListener
@Output(CustomBinding.OUTPUT2)
public Flux<String> reactiveReceiveAndSend(@Input(Sink.INPUT) Flux<String> flux) {
    return flux.map(message -> "通过reactive方式接收并处理后转发的新消息:" + message);
}

也可以通过定义org.springframework.cloud.stream.reactive.FluxSender类型的参数,在参数上使用@Output指定发送消息对应的Binding,然后通过FluxSender进行消息发送。

@StreamListener
public void reactiveReceiveAndSend(@Input(Sink.INPUT) Flux<String> flux, @Output(CustomBinding.OUTPUT2) FluxSender sender) {
    sender.send(flux.map(message -> "通过reactive方式接收并处理后转发的新消息:" + message));
}

接收最原始的Message

如果有需要你想接收包含Headers的最原始的消息,只需要将@StreamListener方法的参数定义为org.springframework.messaging.Message即可。

@StreamListener(Sink.INPUT)
public void inputConsumer(Message<String> message) {
    String payload = message.getPayload();
    MessageHeaders headers = message.getHeaders();
    log.info("从Binding-{}收到信息-{}, headers:{}", Sink.INPUT, payload, headers);
}

RocketMQ特性配置

通过tag接收消息

RocketMQ建议我们一个应用就使用一个Topic,不同的消息类型通过Tag来区分。之前我们发送的消息都在header里面加入了消息对应的Tag。如果我们的某个Binding只希望接收某些Tag的消息,则可以通过spring.cloud.stream.rocketmq.bindings.<bindingName>.consumer.tags属性来指定期望接收的Tag,多个Tag之间通过双竖线分隔。比如下面代码中就指定了名为input1的Binding期望接收的消息的Tag是tag0或tag1。

spring.cloud.stream.rocketmq.bindings.input1.consumer.tags=tag0||tag1

指定RocketMQ特性配置的属性前缀是spring.cloud.stream.rocketmq,如果是Binder的配置则后面可以接binder,如果是Binding的配置则后面接binding。之后的配置就跟通用的是一样的。关于这些可以查看org.springframework.cloud.stream.binder.rocketmq.config.RocketMQBinderAutoConfigurationorg.springframework.cloud.stream.binder.rocketmq.properties.RocketMQBinderConfigurationPropertiesorg.springframework.cloud.stream.binder.rocketmq.properties.RocketMQExtendedBindingProperties。RocketMQ消费者端可以配置的属性可以参考org.springframework.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties

以广播方式接收消息

RocketMQ的消息消费有两种方式,CLUSTERING和BROADCASTING,默认是CLUSTERINIG。CLUSTERING的意思是同一消费组的多个消费者共享同一消息队列,彼此分担压力。比如消息队列中有100条消息,当同时有3个相同消费者组的消费者按照CLUSTERING方式进行消息消费时,它们总的消息的消费数量是100,但是分摊到每个消费者的数量可能是40、30、30。BROADCASTING的意思是广播,即可以理解为每个消费者都有唯一的消息队列与之对应。当消息队列中有100条消息时,如果有相同消费者组的3个消费者时,每个消费者都将完整的消费这100条消息。可以通过spring.cloud.stream.rocketmq.bindings.<bindingName>.consumer.broadcasting=true指定该消费者将通过广播的方式进行消费。比如下面代码中指定了名为input1的Binding的消息消费方式是BROADCASTING。

spring.cloud.stream.rocketmq.bindings.input1.consumer.broadcasting=true

(注:本文是基于Spring cloud Finchley.SR1所写)