Skip to content

Commit

Permalink
Merge pull request #54 from TraceNature/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
pingxingshikong authored Jul 16, 2021
2 parents e64e91b + 4794c30 commit 15820bf
Show file tree
Hide file tree
Showing 66 changed files with 4,089 additions and 244 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ RedisSyncer是一个redis多任务同步工具集,应用于redis单实例及
* 指定key/command过滤或放行
* rdb跨版本支持,支持高版本至低版本
* ttl校准

* 命令订阅(目标端支持kafka)
## Quick start

请参阅[Quick Start Guide](docs/quickstart.md),文档包括构建及部署方法及基本使用方法
Expand Down Expand Up @@ -60,7 +60,7 @@ RedisSyncer是一个redis多任务同步工具集,应用于redis单实例及

| **环境条件** |**版本号** |
| :----:| :----: |
| \[Redis\] | \[2.8-6.0\] |
| \[Redis\] | \[2.8-6.2\] |


## 支持的命令(写命令)
Expand Down
11 changes: 11 additions & 0 deletions docs/using_documents.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ RedisSyncer一款通过replication协议模拟slave来获取源Redis节点数据
* repl-ping-slave-period要小于readTimeout(redissyncer默认60000ms)
* 源节点内存不够无法进行bgsave
* 续传offset刷过
4.出现 ERR unknown command 'REPLCONF' 'SYNC'问题
* 请检查源Redis是否支持 SYNC、PSYNC命令
* 若增量同步阶段 target端 出现ERR unknown command "xxx", 可以提issue或自行修改syncer.replica.register.DefaultCommandRegister添加对应的命令解析器

### 断点续传机制
3.3以上版本有两种断点续传机制 v1、v2
* v1 基于offset实现简陋版本的断点续传,即将offset持久化到本地,当出现程序突然宕掉可能会导致最新的offset无法持久化,进而可能造成增量续传阶段offset不为最新导致部分命令二次拉取,但该版本机制不会在目标redis写入记录数据
* v2 增强版断点续传机制,每次命令提交syncer会自动将每一批次数据封装成一个事务,并在每个事务中加入一个key为 syncer-hash-offset-checkpoint的检查点写入目标库,能够尽最大可能满足数据一致性。但本机制会往目标redis每个存有数据的db中插入一条名为syncer-hash-offset-checkpoint的hash结构记录相关数据,并暂时仅支持目标为单节节点的类型

#### 如何开启v2
* 默认为v1,若想使用v2断点续传机制,请在启动syncer时设置 --server.breakpointContinuationType=v2

### 断点续传机制
3.3以上版本有两种断点续传机制 v1、v2
Expand Down
6 changes: 5 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,11 @@
<version>4.1.42.Final</version>
</dependency>


<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,15 @@ public enum ResultCodeAndMessage {
TASK_MSG_TASK_TARGET_REDIS_TYPE_NULL("4029","目标Redis集群targetRedisType类型不能为空","源Redis集群类型不能为空"),

TASK_MSG_TASK_SOURCE_MASTER_REDIS_NAME_NULL("4030","源Redis集群sourceRedisMasterName类型不能为空","源Redis集群sourceRedisMasterName类型不能为空"),
TASK_MSG_TASK_TARGET_MASTER_REDIS_NAME_NULL("4030","目标Redis集群targetRedisMasterName类型不能为空","目标Redis集群targetRedisMasterName类型不能为空")
TASK_MSG_TASK_TARGET_MASTER_REDIS_NAME_NULL("4030","目标Redis集群targetRedisMasterName类型不能为空","目标Redis集群targetRedisMasterName类型不能为空"),
TASK_MSG_TASK_TARGET_TOPIC_NAME_NULL("4031","kafka命令订阅模式topicName不能为空","kafka命令订阅模式topicName不能为空"),
TASK_MSG_TASK_TARGET_KAFKA_ADDRESS_NULL("4032","kafka命令订阅模式targetKafkaAddress不能为空","kafka命令订阅模式targetKafkaAddress不能为空"),


;
TASK_MSG_TASK_TARGET_REDIS_ADDRESS_NULL("100","目标redis地址不能为空","目标redis地址不能为空")


;


private String code;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2295,4 +2295,13 @@ public Object execute(Jedis connection){
}
}.runBinary(sampleKey);
}

public Object sendCommand(final ProtocolCommand cmd, final byte[]... args){
return new JedisClusterCommand<Object>(connectionHandler, maxAttempts) {
@Override
public Object execute(Jedis connection){
return connection.sendCommand(cmd);
}
}.runBinary(cmd.getRaw());
}
}
5 changes: 5 additions & 0 deletions syncer-jedis/src/main/java/syncer/jedis/HostAndPort.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ public HostAndPort(String host, int port) {
this.port = port;
}


public void setHost(String host) {
this.host = host;
}

public String getHost() {
return host;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public enum RedisType {
CLUSTER(2,"cluster集群模式"),
FILE(3,"文件模式"),
SENTINEL(4,"哨兵模式"),

KAFKA(5,"kafka"),
NONE(-1,"失败");

private Integer code;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package syncer.replica.listener;

public interface CommandListener {

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ public interface ReplicationListener {

boolean removeEventListener(EventListener listener);




/*
* Raw byte
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ public RedisSyncerRdbParser(RedisInputStream in, AbstractReplication replication
* @throws IOException when read timeout
*/
public long parse() throws IOException, IncrementException {

long offset = 0L;
this.replication.submitEvent(new PreRdbSyncEvent(), Tuples.of(0L, 0L));
this.replication.setStatus(TaskStatus.RDBRUNNING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,6 @@ public interface ReplicationRegister {

ReplicConfig getConfig();


void closeClean();
}
Original file line number Diff line number Diff line change
Expand Up @@ -253,5 +253,15 @@ public ReplicConfig getConfig() {
return config;
}

@Override
public void closeClean() {
if(Objects.nonNull(modules)){
modules.clear();
}
if(Objects.nonNull(commands)){
commands.clear();
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ public ReplicConfig getConfig() {
return replication.getConfig();
}

@Override
public void closeClean() {
replication.closeClean();
}

@Override
public void open() throws IOException {
replication.open();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ public ReplicConfig getConfig() {
return replication.getConfig();
}

@Override
public void closeClean() {
replication.closeClean();
}

@Override
public void open() throws IOException {
this.sentinel.open();
Expand All @@ -154,6 +159,7 @@ public void open() throws IOException {

@Override
public void close() throws IOException {

this.sentinel.close();
}

Expand All @@ -166,6 +172,8 @@ public void broken(String reason) throws IOException {
@Override
public void onSwitch(Sentinel sentinel, HostAndPort next) {
if (prev == null || !prev.equals(next)) {
// next.setHost("114.67.76.82");
// System.out.println("TASKID["+getConfig().getTaskId()+"]Sentinel switch master to ["+next+"]");
log.info("TASKID[{}]Sentinel switch master to [{}]", getConfig().getTaskId(),next);
closeQuietly(replication);
if(failoverNum.getAndIncrement()>0){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class SyncTypeUtils {
redisTypeMap.put(RedisType.CLUSTER.getCode(),RedisType.CLUSTER);
redisTypeMap.put(RedisType.FILE.getCode(),RedisType.FILE);
redisTypeMap.put(RedisType.SENTINEL.getCode(),RedisType.SENTINEL);
redisTypeMap.put(RedisType.KAFKA.getCode(),RedisType.KAFKA);
redisTypeMap.put(RedisType.NONE.getCode(),RedisType.NONE);

taskStatusTypeMap.put(TaskStatus.STARTING.getCode(),TaskStatus.STARTING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ public static String byteToString(byte[] bytes) {
public static String[]byteToString(List<byte[]> bytes) {
Objects.requireNonNull(bytes);
String[]res=new String[bytes.size()];

for (int i=0;i<bytes.size();i++){
try {
String strContent = new String(bytes.get(i), "utf-8");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,6 @@ public interface RedisClient {
void updateLastReplidAndOffset(String replid,long offset);

void commitCheckPoint();

void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
public class ConnectErrorRetry {
public static final int MAX_TIMES=5;
private String taskId;
private boolean closeStatus=false;
//2n-1
public ConnectErrorRetry(String taskId) {
this.taskId = taskId;
Expand All @@ -22,6 +23,10 @@ void retry(JedisRetryRunner retryRunner){
JedisConnectionException ret=null;
while (times++<MAX_TIMES){
try {
if(closeStatus){
log.info("[TASKID {}],ConnectErrorRetry send close event");
break;
}
log.error("[TASKID {}],send target retry {} times",taskId,times);
retryRunner.run();
return;
Expand All @@ -40,4 +45,8 @@ void retry(JedisRetryRunner retryRunner){
throw ret;
}
}

public void close(){
closeStatus=true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ public JedisMultiExecPipeLineClient(String host, Integer port, String password,S
ThreadPoolUtils.exec(new JedisMultiExecPipeLineClient.PipelineSubmitThread(taskId));
}


protected void resizeClient(String host,Integer port,String password){
pipelined.close();
targetClient=createJedis(this.host,this.port,password);
pipelined = targetClient.pipelined();
retry=new ConnectErrorRetry(taskId);
}

@Override
public String get(Long dbNum, byte[] key) {
return null;
Expand Down Expand Up @@ -221,6 +229,15 @@ public void commitCheckPoint(){

}

@Override
public void close() {
if(Objects.nonNull(this.pipelined)){
this.pipelined.close();
}
retry.close();
this.targetClient.close();
}

@Override
public String set(Long dbNum, byte[] key, byte[] value) {
selectDb(dbNum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,15 @@ public void commitCheckPoint() {

}

@Override
public void close() {
if (Objects.nonNull(this.pipelined)) {
pipelined.close();
}
targetClient.close();
}


@Override
public void select(Integer dbNum) {
commitLock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,15 @@ public void commitCheckPoint() {

}

@Override
public void close() {
if(Objects.nonNull(pipelined)){
pipelined.close();
}
targetClient.close();

}


@Override
public void select(Integer dbNum) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

@Slf4j
Expand Down Expand Up @@ -231,10 +232,17 @@ public String restoreReplace(Long dbNum, byte[] key, long ttl, byte[] serialized
@Override
public Object send(byte[] cmd, byte[]... args) {
updateCommitTime();
if(Strings.byteToString(cmd).toUpperCase().equalsIgnoreCase("FLUSHALL")){
if(Strings.byteToString(cmd).toUpperCase().equalsIgnoreCase("FLUSHALL")
||Strings.byteToString(cmd).toUpperCase().equalsIgnoreCase("MULTI")
||Strings.byteToString(cmd).toUpperCase().equalsIgnoreCase("EXEC")
){
return "OK";
}
return redisClient.sendCommand(args[0], ClusterProtocolCommand.builder().raw(cmd).build(),args);
if(Objects.isNull(args)||args.length<1){
return redisClient.sendCommand(ClusterProtocolCommand.builder().raw(cmd).build(),args);
}else {
return redisClient.sendCommand(args[0], ClusterProtocolCommand.builder().raw(cmd).build(),args);
}
}

/**
Expand All @@ -251,6 +259,11 @@ public void commitCheckPoint() {

}

@Override
public void close() {

}


@Override
public void select(Integer dbNum) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ protected void pulse() {
}
}

private void close() {
public void close() {
try {
log.error("[TASKID {}]任务异常关闭...",taskId);
SingleTaskDataManagerUtils.brokenTask(taskId);
Expand Down Expand Up @@ -260,8 +260,10 @@ public void onMessage(String channel, String response) {
}
}

void doSwitchListener(HostAndPort host) {

//client切换操作
void doSwitchListener(HostAndPort host) {
// client=
}

}
Expand Down
Loading

0 comments on commit 15820bf

Please sign in to comment.