diff --git a/.classpath b/.classpath index 6d7587a..a97cb0e 100644 --- a/.classpath +++ b/.classpath @@ -9,12 +9,14 @@ + + @@ -27,5 +29,29 @@ + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/.project b/.project index f84930e..4e30ca7 100644 --- a/.project +++ b/.project @@ -20,4 +20,15 @@ org.eclipse.jdt.core.javanature org.eclipse.m2e.core.maven2Nature + + + 1751124793989 + + 30 + + org.eclipse.core.resources.regexFilterMatcher + node_modules|\.git|__CREATED_BY_JAVA_LANGUAGE_SERVER__ + + + diff --git a/pom.xml b/pom.xml index 6ce8363..6a298b0 100644 --- a/pom.xml +++ b/pom.xml @@ -20,9 +20,9 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - org.apache.rocketmq + com.zteits.rocketmq spring-boot-starter-rocketmq - 1.0.8-SNAPSHOT + 1.1.1 Spring Boot Rocket Starter Starter for messaging using Apache RocketMQ @@ -107,12 +107,12 @@ nexus_releases core Release Repository - http://maven.renniting.cn/repository/maven-releases/ + https://maven2.renniting.cn/repository/maven-releases/ nexus_snapshots core Snapshots Repository - http://maven.renniting.cn/repository/maven-snapshots/ + https://maven2.renniting.cn/repository/maven-snapshots/ @@ -169,4 +169,4 @@ - \ No newline at end of file + diff --git a/src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java b/src/main/java/zteits/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java index 2d65d3d..7b3052f 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java +++ b/src/main/java/zteits/rocketmq/spring/starter/AliyunRocketMQAutoConfiguration.java @@ -15,19 +15,19 @@ * limitations under the License. */ -package org.apache.rocketmq.spring.starter; - -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.METHOD_DESTROY; -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUMER_GROUP; -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_MODE; -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_THREAD_MAX; -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_MESSAGE_MODEL; -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_OBJECT_MAPPER; -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_LISTENER; -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_TEMPLATE; -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_EXPRESS; -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_TYPE; -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.*; +package zteits.rocketmq.spring.starter; + +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.METHOD_DESTROY; +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUMER_GROUP; +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_MODE; +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_CONSUME_THREAD_MAX; +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_MESSAGE_MODEL; +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_OBJECT_MAPPER; +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_LISTENER; +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_ROCKETMQ_TEMPLATE; +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_EXPRESS; +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.PROP_SELECTOR_TYPE; +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.*; import java.util.Map; import java.util.Objects; @@ -37,12 +37,11 @@ import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Resource; -import com.aliyun.openservices.shade.com.alibaba.fastjson.JSON; -import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener; -import org.apache.rocketmq.spring.starter.core.AliyunRocketMQListenerContainer; -import org.apache.rocketmq.spring.starter.core.RocketMQListener; -import org.apache.rocketmq.spring.starter.core.RocketMQTemplate; -import org.json.JSONObject; +import com.aliyun.openservices.shade.com.alibaba.fastjson.JSONObject; +import zteits.rocketmq.spring.starter.annotation.RocketMQMessageListener; +import zteits.rocketmq.spring.starter.core.AliyunRocketMQListenerContainer; +import zteits.rocketmq.spring.starter.core.RocketMQListener; +import zteits.rocketmq.spring.starter.core.RocketMQTemplate; import org.springframework.aop.support.AopUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; @@ -82,6 +81,7 @@ public class AliyunRocketMQAutoConfiguration { @ConditionalOnMissingBean(RocketMQProperties.Producer.class) @ConditionalOnProperty(prefix = "spring.rocketmq", value = {"environmentPrefix", "producer.group"}) public Producer mqProducer(RocketMQProperties rocketMQProperties) { + log.info("注册生产者mqProducer:"+ JSONObject.toJSON(rocketMQProperties)); RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer(); String groupName = producerConfig.getGroup(); @@ -90,11 +90,12 @@ public class AliyunRocketMQAutoConfiguration { Assert.hasText(accessKey, "[spring.rocketmq.accessKey] must not be null"); String secretKey = rocketMQProperties.getSecretKey(); Assert.hasText(secretKey, "[spring.rocketmq.secretKey] must not be null"); - String onsAddr = rocketMQProperties.getOnsAddr(); - Assert.hasText(secretKey, "[spring.rocketmq.onsAddr] must not be null"); + // String onsAddr = rocketMQProperties.getOnsAddr(); + String namesrvAddr = rocketMQProperties.getNameSrvAddr(); + Assert.hasText(namesrvAddr, "[spring.rocketmq.nameSrvAddr] must not be null"); String environmentPrefix = rocketMQProperties.getEnvironmentPrefix(); Assert.hasText(secretKey, "[spring.rocketmq.environmentPrefix] must not be null"); - + Properties producerProperties = new Properties(); //生成者ProducerId添加前缀:PID_+环境标识_+groupName String pid = "PID_"+environmentPrefix+"_"+groupName; @@ -102,11 +103,9 @@ public class AliyunRocketMQAutoConfiguration { producerProperties.setProperty(PropertyKeyConst.ProducerId, pid); producerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey); producerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey); - producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, onsAddr); - log.info("注册生产者producerProperties:"+ JSON.toJSONString(producerProperties)); + producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr); //producerProperties.setProperty(PropertyKeyConst.ONSAddr, onsAddr); Producer producer = ONSFactory.createProducer(producerProperties); - log.info("注册生产者完成:"+ JSON.toJSONString(producer)); return producer; } @@ -121,9 +120,9 @@ public class AliyunRocketMQAutoConfiguration { @ConditionalOnBean(Producer.class) @ConditionalOnMissingBean(name = "rocketMQTemplate") public RocketMQTemplate rocketMQTemplate(Producer mqProducer,RocketMQProperties rocketMQProperties, - @Autowired(required = false) - @Qualifier("rocketMQMessageObjectMapper") - ObjectMapper objectMapper) { + @Autowired(required = false) + @Qualifier("rocketMQMessageObjectMapper") + ObjectMapper objectMapper) { RocketMQTemplate rocketMQTemplate = new RocketMQTemplate(); rocketMQTemplate.setAliyunProducer(mqProducer); rocketMQTemplate.setEnvironmentPrefix(rocketMQProperties.getEnvironmentPrefix()); @@ -149,16 +148,16 @@ public class AliyunRocketMQAutoConfiguration { private RocketMQProperties rocketMQProperties; private ObjectMapper objectMapper; - + @Autowired private RocketMQTemplate rocketMQTemplate; - + public ListenerContainerConfiguration() { } @Autowired(required = false) public ListenerContainerConfiguration( - @Qualifier("rocketMQMessageObjectMapper") ObjectMapper objectMapper) { + @Qualifier("rocketMQMessageObjectMapper") ObjectMapper objectMapper) { this.objectMapper = objectMapper; } @@ -177,10 +176,8 @@ public class AliyunRocketMQAutoConfiguration { } private void registerContainer(String beanName, Object bean) { - String uuid = UUID.randomUUID().toString(); - log.info(uuid+"开始注册消费者,beanName:"+beanName); - log.info(uuid+"开始注册消费者,rocketMQProperties:"+JSON.toJSONString(rocketMQProperties)); - + String uuid = UUID.randomUUID().toString(); + log.info(uuid+"开始注册消费者,beanName:"+beanName); Class clazz = AopUtils.getTargetClass(bean); if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { @@ -189,8 +186,8 @@ public class AliyunRocketMQAutoConfiguration { RocketMQListener rocketMQListener = (RocketMQListener) bean; RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); BeanDefinitionBuilder beanBuilder = BeanDefinitionBuilder.rootBeanDefinition(AliyunRocketMQListenerContainer.class); - // beanBuilder.addPropertyValue(PropertyKeyConst.NAMESRV_ADDR, rocketMQProperties.getOnsAddr()); - beanBuilder.addPropertyValue(PROP_NAMESRV_ADDR, rocketMQProperties.getOnsAddr()); + beanBuilder.addPropertyValue(PROP_NAMESRV_Addr, rocketMQProperties.getNameSrvAddr()); + // beanBuilder.addPropertyValue(PROP_ONS_Addr, rocketMQProperties.getOnsAddr()); String topic = rocketMQProperties.getEnvironmentPrefix()+"_"+environment.resolvePlaceholders(annotation.topic()); log.info(uuid+"订阅的主题topic:"+topic); beanBuilder.addPropertyValue(PROP_TOPIC, topic); diff --git a/src/main/java/org/apache/rocketmq/spring/starter/RocketMQProperties.java b/src/main/java/zteits/rocketmq/spring/starter/RocketMQProperties.java index c422df1..94fa432 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/RocketMQProperties.java +++ b/src/main/java/zteits/rocketmq/spring/starter/RocketMQProperties.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.spring.starter; +package zteits.rocketmq.spring.starter; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -24,16 +24,18 @@ import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties(prefix = "spring.rocketmq") @Data public class RocketMQProperties { - /** - * 环境前缀 - */ - private String environmentPrefix; - /** + /** + * 环境前缀 + */ + private String environmentPrefix; + /** * 消息队列服务接入点 */ - private String onsAddr; + private String onsAddr; + + private String nameSrvAddr; - /** + /** * AccessKey, 用于标识、校验用户身份 */ private String accessKey; @@ -41,7 +43,7 @@ public class RocketMQProperties { * SecretKey, 用于标识、校验用户身份 */ private String secretKey; - + private Producer producer; @Data public static class Producer { @@ -82,5 +84,32 @@ public class RocketMQProperties { * Maximum allowed message size in bytes. */ private int maxMessageSize = 1024 * 1024 * 4; // 4M + + /** + * 消费失败消息主题 + */ + private String consumeFailedTopic = "ZTEITS_RNT_CLOUD"; + + /** + * 消费失败消息标签 + */ + private String consumeFailedTag = "ConsumeMsgFailed"; + + // 对应的getter和setter方法 + public String getConsumeFailedTopic() { + return consumeFailedTopic; + } + + public void setConsumeFailedTopic(String consumeFailedTopic) { + this.consumeFailedTopic = consumeFailedTopic; + } + + public String getConsumeFailedTag() { + return consumeFailedTag; + } + + public void setConsumeFailedTag(String consumeFailedTag) { + this.consumeFailedTag = consumeFailedTag; + } } } diff --git a/src/main/java/org/apache/rocketmq/spring/starter/annotation/RocketMQMessageListener.java b/src/main/java/zteits/rocketmq/spring/starter/annotation/RocketMQMessageListener.java index 9e768a8..79b4ea9 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/annotation/RocketMQMessageListener.java +++ b/src/main/java/zteits/rocketmq/spring/starter/annotation/RocketMQMessageListener.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.spring.starter.annotation; +package zteits.rocketmq.spring.starter.annotation; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; @@ -23,8 +23,8 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -import org.apache.rocketmq.spring.starter.enums.ConsumeMode; -import org.apache.rocketmq.spring.starter.enums.SelectorType; +import zteits.rocketmq.spring.starter.enums.ConsumeMode; +import zteits.rocketmq.spring.starter.enums.SelectorType; import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java b/src/main/java/zteits/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java index b379593..39b761d 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java +++ b/src/main/java/zteits/rocketmq/spring/starter/core/AliyunRocketMQListenerContainer.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package org.apache.rocketmq.spring.starter.core; +package zteits.rocketmq.spring.starter.core; -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.CONSUMEFAILED_TAG; -import static org.apache.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.CONSUMEFAILED_TOPIC; +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.CONSUMEFAILED_TAG; +import static zteits.rocketmq.spring.starter.core.DefaultRocketMQListenerContainerConstants.CONSUMEFAILED_TOPIC; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; @@ -28,12 +28,12 @@ import java.util.List; import java.util.Objects; import java.util.Properties; -import org.apache.rocketmq.spring.starter.enums.ConsumeMode; -import org.apache.rocketmq.spring.starter.enums.SelectorType; -import org.apache.rocketmq.spring.starter.exception.ConvertMsgException; -import org.apache.rocketmq.spring.starter.msgvo.ConsumeFailedMsgVO; -import org.apache.rocketmq.spring.starter.utils.ExceptionUtil; -import org.apache.rocketmq.spring.starter.utils.IPUtil; +import zteits.rocketmq.spring.starter.enums.ConsumeMode; +import zteits.rocketmq.spring.starter.enums.SelectorType; +import zteits.rocketmq.spring.starter.exception.ConvertMsgException; +import zteits.rocketmq.spring.starter.msgvo.ConsumeFailedMsgVO; +import zteits.rocketmq.spring.starter.utils.ExceptionUtil; +import zteits.rocketmq.spring.starter.utils.IPUtil; import org.springframework.beans.factory.InitializingBean; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -51,7 +51,6 @@ import com.aliyun.openservices.ons.api.order.ConsumeOrderContext; import com.aliyun.openservices.ons.api.order.MessageOrderListener; import com.aliyun.openservices.ons.api.order.OrderAction; import com.aliyun.openservices.ons.api.order.OrderConsumer; -import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.MessageSelector; import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQClientException; import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel; import com.fasterxml.jackson.databind.ObjectMapper; @@ -73,7 +72,7 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket */ @Setter private String secretKey; - + @Setter @Getter private String consumerGroup; @@ -84,9 +83,13 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket @Getter private String onsAddr; +// @Setter +// @Getter +// private String nameServerAddr; + @Setter @Getter - private String nameServerAddr; + private String nameSrvAddr; @Setter @Getter @@ -132,7 +135,7 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket private OrderConsumer orderConsumer; /**批量消息*/ private BatchConsumer batchConsumer; - + private Class messageType; /** * 环境前缀 @@ -142,7 +145,7 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket @Setter private RocketMQTemplate rocketMQTemplate; - + public void setupMessageListener(RocketMQListener rocketMQListener) { this.rocketMQListener = rocketMQListener; } @@ -214,7 +217,7 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket this.sendConsumeMsgFailed(message,e,consumeBeginTime); return Action.ReconsumeLater; } - + return Action.CommitMessage; } /** @@ -225,8 +228,8 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket */ private void sendConsumeMsgFailed(Message message, Exception e,Date consumeBeginTime) { log.info("消费消息失败,开始发送消费失败MQ"); - String topic = CONSUMEFAILED_TOPIC; - String tag = CONSUMEFAILED_TAG; + String topic = environmentPrefix+"_"+CONSUMEFAILED_TOPIC; + String tag = CONSUMEFAILED_TAG; try{ Date consumeEndTime = new Date(); ConsumeFailedMsgVO consumeFailedMsgVO = new ConsumeFailedMsgVO(); @@ -250,9 +253,8 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket rocketMQTemplate.sendOneWay(topic, tag, consumeFailedMsgVO); log.info("发送消息消费失败MQ成功"); }catch(Exception e1){ - log.info("发送消息消费失败MQ异常",e); + log.error("发送消息消费失败MQ异常", e1); } - } } @@ -273,7 +275,7 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket return OrderAction.Success; } } - + public class DefaultMessageListenerBatchs implements BatchMessageListener{ @Override @@ -304,7 +306,7 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket } return Action.CommitMessage; } - + /** * 发送消息消费失败消息 * @param message @@ -314,7 +316,7 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket private void sendConsumeMsgFailed(Message message, Exception e,Date consumeBeginTime) { log.info("消费消息失败,开始发送消费失败MQ"); String topic = environmentPrefix+"_"+CONSUMEFAILED_TOPIC; - String tag = CONSUMEFAILED_TAG; + String tag = CONSUMEFAILED_TAG; try{ Date consumeEndTime = new Date(); ConsumeFailedMsgVO consumeFailedMsgVO = new ConsumeFailedMsgVO(); @@ -338,9 +340,8 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket rocketMQTemplate.sendOneWay(topic, tag, consumeFailedMsgVO); log.info("发送消息消费失败MQ成功"); }catch(Exception e1){ - log.info("发送消息消费失败MQ异常",e); + log.error("发送消息消费失败MQ异常", e1); } - } } @Override @@ -396,14 +397,14 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required"); Assert.notNull(consumerGroup, "Property 'consumerGroup' is required"); - Assert.notNull(nameServerAddr, "Property 'nameServer' is required"); + Assert.notNull(nameSrvAddr, "Property 'nameServer' is required"); Assert.notNull(topic, "Property 'topic' is required"); Properties consumerProperties = new Properties(); consumerProperties.setProperty(PropertyKeyConst.ConsumerId, consumerGroup); consumerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey); consumerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey); - consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameServerAddr); + consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr); consumerProperties.setProperty(PropertyKeyConst.ConsumeThreadNums, consumeThreadMax+""); consumerProperties.setProperty(PropertyKeyConst.MessageModel, messageModel.getModeCN()); //允许用户自己设置该consumer的一些配置 @@ -434,7 +435,7 @@ public class AliyunRocketMQListenerContainer implements InitializingBean, Rocket default: throw new IllegalArgumentException("Property 'consumeMode' was wrong."); } - + } } diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQPushConsumerLifecycleListener.java b/src/main/java/zteits/rocketmq/spring/starter/core/AliyunRocketMQPushConsumerLifecycleListener.java index e0b1860..ebb5a23 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/core/AliyunRocketMQPushConsumerLifecycleListener.java +++ b/src/main/java/zteits/rocketmq/spring/starter/core/AliyunRocketMQPushConsumerLifecycleListener.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.spring.starter.core; +package zteits.rocketmq.spring.starter.core; import java.util.Properties; diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainer.java b/src/main/java/zteits/rocketmq/spring/starter/core/DefaultRocketMQListenerContainer.java index 9e9889c..9e9889c 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainer.java +++ b/src/main/java/zteits/rocketmq/spring/starter/core/DefaultRocketMQListenerContainer.java diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainerConstants.java b/src/main/java/zteits/rocketmq/spring/starter/core/DefaultRocketMQListenerContainerConstants.java index 777b951..f535eff 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/core/DefaultRocketMQListenerContainerConstants.java +++ b/src/main/java/zteits/rocketmq/spring/starter/core/DefaultRocketMQListenerContainerConstants.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.spring.starter.core; +package zteits.rocketmq.spring.starter.core; /** * Constants Created by aqlu on 2017/11/16. @@ -36,7 +36,8 @@ public final class DefaultRocketMQListenerContainerConstants { public static final String PROP_ONS_Addr = "onsAddr"; public static final String PROP_ACCESS_KEY = "accessKey"; public static final String PROP_SECRET_KEY = "secretKey"; - public static final String PROP_NAMESRV_ADDR = "nameServerAddr"; + public static final String PROP_NAMESRV_Addr = "nameSrvAddr"; + /** * 环境前缀 */ diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQConsumerLifecycleListener.java b/src/main/java/zteits/rocketmq/spring/starter/core/RocketMQConsumerLifecycleListener.java index 37ebedb..c2a4f54 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQConsumerLifecycleListener.java +++ b/src/main/java/zteits/rocketmq/spring/starter/core/RocketMQConsumerLifecycleListener.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.spring.starter.core; +package zteits.rocketmq.spring.starter.core; public interface RocketMQConsumerLifecycleListener { void prepareStart(final T consumer); diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQListener.java b/src/main/java/zteits/rocketmq/spring/starter/core/RocketMQListener.java index 64daa5f..6e20c8b 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQListener.java +++ b/src/main/java/zteits/rocketmq/spring/starter/core/RocketMQListener.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.spring.starter.core; +package zteits.rocketmq.spring.starter.core; public interface RocketMQListener { /** diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQListenerContainer.java b/src/main/java/zteits/rocketmq/spring/starter/core/RocketMQListenerContainer.java index 7667eed..a986167 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQListenerContainer.java +++ b/src/main/java/zteits/rocketmq/spring/starter/core/RocketMQListenerContainer.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.spring.starter.core; +package zteits.rocketmq.spring.starter.core; import org.springframework.beans.factory.DisposableBean; diff --git a/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java b/src/main/java/zteits/rocketmq/spring/starter/core/RocketMQTemplate.java index 5d88f62..6004d64 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/core/RocketMQTemplate.java +++ b/src/main/java/zteits/rocketmq/spring/starter/core/RocketMQTemplate.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.spring.starter.core; +package zteits.rocketmq.spring.starter.core; import java.nio.charset.Charset; import java.util.Map; @@ -50,7 +50,7 @@ public class RocketMQTemplate implements InitializingBean, DisposableBean { @Getter @Setter private String charset = "UTF-8"; - + /** * 环境前缀 */ @@ -63,7 +63,7 @@ public class RocketMQTemplate implements InitializingBean, DisposableBean { * @param tag 消息标签, 请使用合法标识符, 尽量简短且见名知意 * @param key 业务主键 * @param payload 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述. - * @param userProperties 添加用户自定义属性键值对; 该键值对在消费消费时可被获取.也可用于做SQL属性过滤 + * @param userProperties 添加用户自定义属性键值对; 该键值对在消费消费时可被获取.也可用于做SQL属性过滤 * @param startDeliverTime 设置消息的定时投递时间(绝对时间),最大延迟时间为7天. *

*
    @@ -81,7 +81,7 @@ public class RocketMQTemplate implements InitializingBean, DisposableBean { try { long now = System.currentTimeMillis(); - + Message rocketMsg = new Message(environmentPrefix+"_"+topic, tag, keys, convertToRocketMsg(payload)); if(userProperties!=null && !userProperties.isEmpty()){ for (Entry userProp : userProperties.entrySet()) { @@ -133,7 +133,7 @@ public class RocketMQTemplate implements InitializingBean, DisposableBean { * @param tag 消息标签, 请使用合法标识符, 尽量简短且见名知意 * @param key 业务主键 * @param payload 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述. - * @param userProperties 添加用户自定义属性键值对; 该键值对在消费消费时可被获取.也可用于做SQL属性过滤 + * @param userProperties 添加用户自定义属性键值对; 该键值对在消费消费时可被获取.也可用于做SQL属性过滤 * @param startDeliverTime 设置消息的定时投递时间(绝对时间),最大延迟时间为7天. *

    *
      @@ -151,7 +151,7 @@ public class RocketMQTemplate implements InitializingBean, DisposableBean { } try { long now = System.currentTimeMillis(); - + Message rocketMsg = new Message(environmentPrefix+"_"+topic, tag, keys, convertToRocketMsg(payload)); if(userProperties!=null && !userProperties.isEmpty()){ for (Entry userProp : userProperties.entrySet()) { @@ -202,7 +202,7 @@ public class RocketMQTemplate implements InitializingBean, DisposableBean { * @param tag 消息标签, 请使用合法标识符, 尽量简短且见名知意 * @param key 业务主键 * @param payload 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述. - * @param userProperties 添加用户自定义属性键值对; 该键值对在消费消费时可被获取.也可用于做SQL属性过滤 + * @param userProperties 添加用户自定义属性键值对; 该键值对在消费消费时可被获取.也可用于做SQL属性过滤 * @param startDeliverTime 设置消息的定时投递时间(绝对时间),最大延迟时间为7天. *

      *
        @@ -219,7 +219,7 @@ public class RocketMQTemplate implements InitializingBean, DisposableBean { } try { long now = System.currentTimeMillis(); - + Message rocketMsg = new Message(environmentPrefix+"_"+topic, tag, keys, convertToRocketMsg(payload)); if(userProperties!=null && !userProperties.isEmpty()){ for (Entry userProp : userProperties.entrySet()) { diff --git a/src/main/java/org/apache/rocketmq/spring/starter/enums/ConsumeMode.java b/src/main/java/zteits/rocketmq/spring/starter/enums/ConsumeMode.java index 04a0349..3795b7d 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/enums/ConsumeMode.java +++ b/src/main/java/zteits/rocketmq/spring/starter/enums/ConsumeMode.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.spring.starter.enums; +package zteits.rocketmq.spring.starter.enums; public enum ConsumeMode { /** @@ -27,7 +27,7 @@ public enum ConsumeMode { * 顺序接收消息,一个队列,一个线程 */ ORDERLY, - + /** * 批量接收发送的消息,允许自定义范围为[1, 32], 实际消费数量可能小于该值 */ diff --git a/src/main/java/org/apache/rocketmq/spring/starter/enums/SelectorType.java b/src/main/java/zteits/rocketmq/spring/starter/enums/SelectorType.java index 1923713..0b5b4f4 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/enums/SelectorType.java +++ b/src/main/java/zteits/rocketmq/spring/starter/enums/SelectorType.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.spring.starter.enums; +package zteits.rocketmq.spring.starter.enums; import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.filter.ExpressionType; diff --git a/src/main/java/org/apache/rocketmq/spring/starter/exception/ConvertMsgException.java b/src/main/java/zteits/rocketmq/spring/starter/exception/ConvertMsgException.java index 503ba1a..4e75358 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/exception/ConvertMsgException.java +++ b/src/main/java/zteits/rocketmq/spring/starter/exception/ConvertMsgException.java @@ -1,4 +1,4 @@ -package org.apache.rocketmq.spring.starter.exception; +package zteits.rocketmq.spring.starter.exception; public class ConvertMsgException extends RuntimeException{ @@ -23,5 +23,5 @@ public class ConvertMsgException extends RuntimeException{ public ConvertMsgException(Throwable cause) { super(cause); } - + } diff --git a/src/main/java/org/apache/rocketmq/spring/starter/msgvo/ConsumeFailedMsgVO.java b/src/main/java/zteits/rocketmq/spring/starter/msgvo/ConsumeFailedMsgVO.java index 8224133..ec22372 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/msgvo/ConsumeFailedMsgVO.java +++ b/src/main/java/zteits/rocketmq/spring/starter/msgvo/ConsumeFailedMsgVO.java @@ -1,4 +1,4 @@ -package org.apache.rocketmq.spring.starter.msgvo; +package zteits.rocketmq.spring.starter.msgvo; import java.io.Serializable; import java.util.Date; @@ -33,7 +33,7 @@ public class ConsumeFailedMsgVO implements Serializable{ /**重复消费次数*/ private Integer reconsumeTimes; - + /**消费失败错误信息*/ private String cunsumerErrMsg; diff --git a/src/main/java/org/apache/rocketmq/spring/starter/utils/ExceptionUtil.java b/src/main/java/zteits/rocketmq/spring/starter/utils/ExceptionUtil.java index e97da1c..70818ea 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/utils/ExceptionUtil.java +++ b/src/main/java/zteits/rocketmq/spring/starter/utils/ExceptionUtil.java @@ -1,21 +1,21 @@ -package org.apache.rocketmq.spring.starter.utils; - -import java.io.PrintWriter; -import java.io.StringWriter; - -public class ExceptionUtil { - - public static String getTrace(Throwable t) { - StringBuffer buffer = new StringBuffer(); - if(t==null){ - return ""; - } - StringWriter stringWriter = new StringWriter(); - PrintWriter writer = new PrintWriter(stringWriter); - t.printStackTrace(writer); - //设置堆栈信息 - buffer.append("堆栈信息为:" + stringWriter.getBuffer().toString()); - return buffer.toString(); - } - -} +package zteits.rocketmq.spring.starter.utils; + +import java.io.PrintWriter; +import java.io.StringWriter; + +public class ExceptionUtil { + + public static String getTrace(Throwable t) { + StringBuffer buffer = new StringBuffer(); + if(t==null){ + return ""; + } + StringWriter stringWriter = new StringWriter(); + PrintWriter writer = new PrintWriter(stringWriter); + t.printStackTrace(writer); + //设置堆栈信息 + buffer.append("堆栈信息为:" + stringWriter.getBuffer().toString()); + return buffer.toString(); + } + +} diff --git a/src/main/java/org/apache/rocketmq/spring/starter/utils/IPUtil.java b/src/main/java/zteits/rocketmq/spring/starter/utils/IPUtil.java index b40739c..f9b4e80 100644 --- a/src/main/java/org/apache/rocketmq/spring/starter/utils/IPUtil.java +++ b/src/main/java/zteits/rocketmq/spring/starter/utils/IPUtil.java @@ -1,4 +1,4 @@ -package org.apache.rocketmq.spring.starter.utils; +package zteits.rocketmq.spring.starter.utils; import java.net.InetAddress; diff --git a/src/main/resources/META-INF/spring.factories b/src/main/resources/META-INF/spring.factories index 6ab3023..7247f53 100644 --- a/src/main/resources/META-INF/spring.factories +++ b/src/main/resources/META-INF/spring.factories @@ -1,2 +1,2 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ -org.apache.rocketmq.spring.starter.AliyunRocketMQAutoConfiguration \ No newline at end of file +zteits.rocketmq.spring.starter.AliyunRocketMQAutoConfiguration diff --git a/src/test/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfigurationTests.java b/src/test/java/zteits/rocketmq/spring/starter/RocketMQAutoConfigurationTests.java index e4e2c41..e4e2c41 100644 --- a/src/test/java/org/apache/rocketmq/spring/starter/RocketMQAutoConfigurationTests.java +++ b/src/test/java/zteits/rocketmq/spring/starter/RocketMQAutoConfigurationTests.java