看到这篇文章的你是幸运的,因为不必再去搜索各种SpringCloudStream千篇一律并且繁琐的配置文件。
笔者在经过大量搜索后,发现全网并没有一种好的方式简化Stream的配置。
本文提出一种解决方案,希望能够帮助大家。
最终使用方式
yaml文件
spring:
cloud:
stream:
default:
group: ${spring.application.name}-${spring.profiles.active}
rocketmq:
binder:
name-server: 127.0.0.1:9876
其他配置自己找文档...
是的,你没有看错,我们不再需要在配置文件中指定每一个input和output并且为两者绑定destination
启动类配置
//自定义注解
@StreamBindingsScan
@SpringBootApplication
public class Application{
@Bean
public BindingsConfig bindingsConfig(BindingServiceProperties bindingServiceProperties) {
BindingsConfig bindingsConfig = new BindingsConfig();
bindingsConfig.setBindingServiceProperties(bindingServiceProperties);
bindingsConfig.resetBindingProperties();
return bindingsConfig;
}
}
事件接口
//自定义注解
//bindings接管了真正与中间件交互的topic或者exchange,此处定义的即为在中间件中真实创建的主题或通道
@StreamBindings("test-topic")
public interface TestEventStream {
String INPUT = "test-topic-input";
String OUTPUT = "test-topic-output";
//该注解意义在于创建代理Bean放入Spring中,INPUT常量仅存在于Spring中,而非exchange或者topic
@Input(INPUT)
SubscribableChannel input();
//该注解意义在于创建代理Bean放入Spring中,OUTPUT常量仅存在于Spring中,而非exchange或者topic
@Output(OUTPUT)
MessageChannel output();
}
创建消费者监听
@EnableBinding({EventStream.class})
public class TestListener {
@StreamListener(EventStream.INPUT)
public void consumer(@Payload String msg) {
System.out.println("收到消息了");
System.out.println(msg);
}
}
生产者发出消息
@RestController
@RequestMapping("/app/test")
public class TestController {
@Autowired
private TestEventStream eventStream;
@GetMapping
public String test() {
eventStream.output().send(MessageBuilder.withPayload("test msg").build());
return "success";
}
}
实现代码
annotation
StreamBindings
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 被该注解标注的接口,会注入input与output的绑定关系,绑定值为value
*
* @author zhangbowen.mail@gmail.com
* @since 2021-02-18 21:06
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface StreamBindings {
/**
* @return 相当于目标地址(destination),即topic or exchange
*/
String value();
}
StreamBindingsScan
import org.springframework.context.annotation.Import;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 扫描{@link StreamBindings}
*
* @author zhangbowen.mail@gmail.com
* @since 2021-02-18 21:06
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Import(BindingsRegistrar.class)
public @interface StreamBindingsScan {
/**
* @return 扫描的包
*/
String[] basePackages() default {};
}
BindingsRegistrar
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.boot.autoconfigure.AutoConfigurationPackages;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.context.ResourceLoaderAware;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.context.annotation.ScannedGenericBeanDefinition;
import org.springframework.core.annotation.AnnotationAttributes;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.io.ResourceLoader;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.Objects;
import java.util.Set;
/**
* 扫描所有的StreamBindings,input 与 output 绑定关系注入
*
* @author zhangbowen.mail@gmail.com
* @since 2021-02-18 21:09
*/
@Slf4j
public class BindingsRegistrar implements BeanFactoryAware, ImportBeanDefinitionRegistrar, ResourceLoaderAware {
private BeanFactory beanFactory;
private ResourceLoader resourceLoader;
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
@Override
public void setResourceLoader(ResourceLoader resourceLoader) {
this.resourceLoader = resourceLoader;
}
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
AnnotationAttributes annotationAttributes = AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes(StreamBindingsScan.class.getCanonicalName()));
if (annotationAttributes == null) {
log.warn("No Any StreamBindingsScan...");
return;
}
ClassPathStreamBindingScanner classPathGrpcServiceScanner = new ClassPathStreamBindingScanner(registry);
classPathGrpcServiceScanner.setResourceLoader(this.resourceLoader);
classPathGrpcServiceScanner.addIncludeFilter(new AnnotationTypeFilter(StreamBindings.class));
List<String> basePackages = AutoConfigurationPackages.get(this.beanFactory);
for (String pkg : annotationAttributes.getStringArray("basePackages")) {
if (StringUtils.hasText(pkg)) {
basePackages.add(pkg);
}
}
Set<BeanDefinitionHolder> beanDefinitions = classPathGrpcServiceScanner.doScan(StringUtils.toStringArray(basePackages));
if (beanDefinitions.isEmpty()) {
log.warn("No StreamBindings was found.");
return;
}
processBeanDefinitions(beanDefinitions);
}
/**
* 处理扫描到的StreamBindings
*
* @param beanDefinitions beanDefinitions
*/
private void processBeanDefinitions(Set<BeanDefinitionHolder> beanDefinitions) {
for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitions) {
ScannedGenericBeanDefinition definition = (ScannedGenericBeanDefinition) beanDefinitionHolder.getBeanDefinition();
AnnotationMetadata annotationMetadata = definition.getMetadata();
String destination = (String) Objects.requireNonNull(annotationMetadata.getAnnotationAttributes(StreamBindings.class.getName())).get("value");
Class<?> currentClass = ClassUtils.resolveClassName(annotationMetadata.getClassName(), null);
ReflectionUtils.doWithMethods(currentClass, method -> {
Input input = AnnotationUtils.findAnnotation(method, Input.class);
if (input != null) {
String name = BindingBeanDefinitionRegistryUtils.getBindingTargetName(input, method);
BindingProperties bindingProperties = new BindingProperties();
bindingProperties.setDestination(destination);
BindingsConfig.cacheMap.put(name, bindingProperties);
}
Output output = AnnotationUtils.findAnnotation(method, Output.class);
if (output != null) {
String name = BindingBeanDefinitionRegistryUtils.getBindingTargetName(output, method);
BindingProperties bindingProperties = new BindingProperties();
bindingProperties.setDestination(destination);
BindingsConfig.cacheMap.put(name, bindingProperties);
}
});
}
}
}
ClassPathStreamBindingScanner
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.annotation.ClassPathBeanDefinitionScanner;
import java.util.LinkedHashSet;
import java.util.Set;
/**
* @author zhangbowen.mail@gmail.com
* @since 2019/1/15
*/
@Slf4j
public class ClassPathStreamBindingScanner extends ClassPathBeanDefinitionScanner {
public ClassPathStreamBindingScanner(BeanDefinitionRegistry registry) {
super(registry, false);
}
/**
* {@inheritDoc}
*/
@Override
protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
return beanDefinition.getMetadata().isInterface() && beanDefinition.getMetadata().isIndependent();
}
@Override
public Set<BeanDefinitionHolder> doScan(String... basePackages) {
Set<BeanDefinitionHolder> beanDefinitions = new LinkedHashSet<>();
for (String basePackage : basePackages) {
Set<BeanDefinition> candidates = findCandidateComponents(basePackage);
for (BeanDefinition candidate : candidates) {
BeanDefinitionHolder definitionHolder = new BeanDefinitionHolder(candidate, candidate.getBeanClassName());
beanDefinitions.add(definitionHolder);
}
}
return beanDefinitions;
}
}
BindingsConfig
import lombok.Data;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.util.StringUtils;
import java.util.HashMap;
import java.util.Map;
/**
* 重置bindings配置
*
* @author zhangbowen.mail@gmail.com
* @since 2021-02-18 22:20
*/
@Data
public class BindingsConfig {
private BindingServiceProperties bindingServiceProperties;
public static Map<String, BindingProperties> cacheMap = new HashMap<>();
/**
* 重置绑定配置
*/
public void resetBindingProperties() {
Map<String, BindingProperties> existBindings = bindingServiceProperties.getBindings();
String defaultGroup = bindingServiceProperties.getGroup("default");
cacheMap.entrySet().stream().filter(
item -> existBindings.containsKey(item.getKey())
).forEach(item -> {
String name = item.getKey();
BindingProperties value = item.getValue();
BindingProperties existValue = existBindings.get(name);
existValue.setDestination(value.getDestination());
String group = StringUtils.isEmpty(defaultGroup) ? value.getDestination() : (defaultGroup + "-" + value.getDestination());
existValue.setGroup(group);
existBindings.put(name, existValue);
});
cacheMap.clear();
cacheMap = null;
}
}