You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Consumer Strategy Have chosen lastest, This setting doesn't seem to work, Program restart consumes historical data,
How to solve this problem?
The complete code is as follows:
try {
Map<String, String> optionParams = new HashMap<>();
optionParams.put(RocketMQConfig.NAME_SERVER_ADDR, nameSrvAddr);
SparkConf sparkConf = new SparkConf().setAppName("JavaCustomReceiver").setMaster("local[*]");
JavaStreamingContext sc = new JavaStreamingContext(sparkConf, new Duration(duration));
List<String> topics = new ArrayList<>();
if (StringUtils.hasText(topic)) {
for (String s : topic.split(";")) {
topics.add(s);
}
}
LocationStrategy locationStrategy = LocationStrategy.PreferConsistent();
JavaInputDStream<MessageExt> stream = RocketMqUtils.createJavaMQPullStream(sc, groupId,
topics, ConsumerStrategy.lastest(), false, false, false, locationStrategy, optionParams);
stream.foreachRDD(new VoidFunction<JavaRDD<MessageExt>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(JavaRDD<MessageExt> messageExtJavaRDD) throws Exception {
JavaRDD<GPSRDD> GPSRDDJavaRDD = messageExtJavaRDD.map(new Function<MessageExt, GPSRDD>() {
private static final long serialVersionUID = 1L;
@Override
public GPSRDD call(MessageExt messageExt) throws Exception {
GPSRDD gps = new GPSRDD();
String xxx = new String(messageExt.getBody());
System.out.println(xxx);
return gps;
}
});
}
});
sc.start();
} catch (Exception e) {
e.printStackTrace();
}
The text was updated successfully, but these errors were encountered:
kai23333
changed the title
[rocketmq-spark] [rocketmq-spark] ask a master for help, repeated consumption occurs when the program restarts
[rocketmq-spark] ask a master for help, repeated consumption occurs when the program restarts
Feb 15, 2023
Reference code : https://github.com/apache/rocketmq-externals/blob/master/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/RocketMqUtilsTest.java
My code :
Consumer Strategy Have chosen lastest, This setting doesn't seem to work, Program restart consumes historical data,
How to solve this problem?
The complete code is as follows:
The text was updated successfully, but these errors were encountered: