Skip to content

Commit

Permalink
fixed:1.fix the issue of inconsistent restore data when importing str…
Browse files Browse the repository at this point in the history
…ing types as numbers from local rdb 2.optimize task submission model
  • Loading branch information
pingxingshikong committed Nov 10, 2023
1 parent f81b1fc commit 5442670
Show file tree
Hide file tree
Showing 16 changed files with 372 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package syncer.common.config;

import com.alibaba.fastjson.JSON;
import sun.net.util.IPAddressUtil;
import sun.net.util .IPAddressUtil;
import syncer.common.constant.BreakpointContinuationType;
import syncer.common.constant.StoreType;
import syncer.common.util.spring.SpringUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ public static BatchedKeyStringValueStringEvent string(KeyValuePairEvent<byte[],
return kv;
}



public static BatchedKeyStringValueModuleEvent module(KeyValuePairEvent<byte[], ?> raw, Module value, int batch, boolean last) {
BatchedKeyStringValueModuleEvent kv = new BatchedKeyStringValueModuleEvent();
copy(raw, kv, batch, last);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public void onEvent(Replication replication, Event event) {
if (kv instanceof KeyStringValueStringEvent) {
KeyStringValueStringEvent ksvs = (KeyStringValueStringEvent) kv;
listener.onEvent(replication, KeyValuePairs.string(ksvs, ksvs.getValue(), batch, true));
// listener.onEvent(replication,ksvs);

} else if (kv instanceof KeyStringValueByteArrayIteratorEvent) {
if (type == RDB_TYPE_SET || type == RDB_TYPE_SET_INTSET) {
KeyStringValueByteArrayIteratorEvent skv = (KeyStringValueByteArrayIteratorEvent) kv;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ public void onEvent(Replication replication, Event event) {
// Before every it.next() MUST check precondition it.hasNext()
int batch = 0;
final int type = kv.getValueRdbType();

//String
if (kv instanceof KeyStringValueStringEvent) {
System.out.println("------");
KeyStringValueStringEvent ksvs = (KeyStringValueStringEvent) kv;
listener.onEvent(replication, KeyValuePairs.string(ksvs, ksvs.getValue(), batch, true));
} else if (kv instanceof KeyStringValueByteArrayIteratorEvent) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
* Copyright 2016-2017 Leon Chen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package syncer.replica.parser.rdb;



import syncer.replica.util.Lzf;
import syncer.replica.util.bytes.ByteArray;

import java.io.IOException;
import java.io.OutputStream;
import java.math.BigInteger;
import java.nio.ByteBuffer;

import static java.lang.Double.doubleToLongBits;
import static java.lang.Float.floatToIntBits;
import static java.nio.ByteOrder.BIG_ENDIAN;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static syncer.replica.constant.Constants.*;

/**
* @author Leon Chen
* @since 3.5.3
*/
public class BaseRdbEncoder {

/**
* @param time time
* @param out out
* @throws IOException IOException
* @since 3.5.3
*/
public void rdbSaveTime(int time, OutputStream out) throws IOException {
out.write(ByteBuffer.allocate(Integer.BYTES).order(LITTLE_ENDIAN).putInt(time).array());
}

/**
* @since 3.5.3
* @param timestamp timestamp
* @param out out
* @throws IOException IOException
*/
public void rdbSaveMillisecondTime(long timestamp, OutputStream out) throws IOException {
out.write(ByteBuffer.allocate(Long.BYTES).order(LITTLE_ENDIAN).putLong(timestamp).array());
}

/**
* @since 3.5.3
* @param len len
* @param out out
* @return length
* @throws IOException IOException
*/
public int rdbSaveLen(long len, OutputStream out) throws IOException {
byte[] ary = toUnsigned(len);
BigInteger value = new BigInteger(1, ary);
if (value.compareTo(BigInteger.valueOf(0XFFFFFFFFL)) > 0) {
/* Save a 64 bit len */
out.write(RDB_64BITLEN);
out.write(ByteBuffer.allocate(Long.BYTES).order(BIG_ENDIAN).put(ary).array());
return 9;
} else if (len < (1 << 6)) {
out.write((byte) ((len & 0xFF) | (RDB_6BITLEN << 6)));
return 1;
} else if (len < (1 << 14)) {
/* Save a 14 bit len */
out.write((byte) (((len >> 8) & 0xFF) | (RDB_14BITLEN << 6)));
out.write((byte) (len & 0xFF));
return 2;
} else if (len <= 0XFFFFFFFFL) {
/* Save a 32 bit len */
out.write(RDB_32BITLEN);
out.write(ByteBuffer.allocate(Integer.BYTES).order(BIG_ENDIAN).putInt((int) len).array());
return 5;
} else {
/* Save a 64 bit len */
out.write(RDB_64BITLEN);
out.write(ByteBuffer.allocate(Long.BYTES).order(BIG_ENDIAN).putLong(len).array());
return 9;
}
}

/**
* @since 3.5.3
* @param len len
* @return byte array
* @throws IOException IOException
*/
public byte[] rdbSaveLen(long len) throws IOException {
byte[] ary = toUnsigned(len);
BigInteger value = new BigInteger(1, ary);
if (value.compareTo(BigInteger.valueOf(0XFFFFFFFFL)) > 0) {
/* Save a 64 bit len */
return ByteBuffer.allocate(9).order(BIG_ENDIAN).put((byte) RDB_64BITLEN).put(ary).array();
} else if (len < (1 << 6)) {
return new byte[]{(byte) ((len & 0xFF) | (RDB_6BITLEN << 6))};
} else if (len < (1 << 14)) {
/* Save a 14 bit len */
return new byte[]{(byte) (((len >> 8) & 0xFF) | (RDB_14BITLEN << 6)), (byte) (len & 0xFF)};
} else if (len <= 0XFFFFFFFFL) {
/* Save a 32 bit len */
return ByteBuffer.allocate(5).order(BIG_ENDIAN).put((byte) RDB_32BITLEN).putInt((int) len).array();
} else {
/* Save a 64 bit len */
return ByteBuffer.allocate(9).order(BIG_ENDIAN).put((byte) RDB_64BITLEN).putLong(len).array();
}
}



/**
* @since 3.5.3
* @param value value
* @param out out
* @throws IOException IOException
*/
public void rdbSaveDoubleValue(double value, OutputStream out) throws IOException {
if (value == Double.NEGATIVE_INFINITY) {
out.write(255);
} else if (value == Double.POSITIVE_INFINITY) {
out.write(254);
} else if (Double.isNaN(value)) {
out.write(253);
} else {
String str = null;
if (value == (double) (long) value) {
str = Long.toString((long) value, 10);
} else {
str = String.format("%.17f", value);
}
out.write(str.length());
out.write(str.getBytes());
}
}

/**
* @since 3.5.3
* @param value value
* @param out out
* @throws IOException IOException
*/
public void rdbSaveBinaryFloatValue(float value, OutputStream out) throws IOException {
out.write(ByteBuffer.allocate(Long.BYTES).order(LITTLE_ENDIAN).putLong(floatToIntBits(value)).array());
}

/**
* @since 3.5.3
* @param value value
* @param out out
* @throws IOException IOException
*/
public void rdbSaveBinaryDoubleValue(double value, OutputStream out) throws IOException {
out.write(ByteBuffer.allocate(Long.BYTES).order(LITTLE_ENDIAN).putLong(doubleToLongBits(value)).array());
}

/**
* @since 3.5.3
* @param bytes input
* @param out out
* @throws IOException IOException
*/
public void rdbSaveEncodedStringObject(ByteArray bytes, OutputStream out) throws IOException {
ByteArray compressed = new ByteArray(bytes.length() - 3);
long length = Lzf.encode(bytes, bytes.length(), compressed, 0);
if (length <= 0) {
rdbSavePlainStringObject(bytes, out);
} else {
int type = (RDB_ENCVAL << 6) | RDB_ENC_LZF;
out.write(type);
rdbSaveLen(length, out);
rdbSaveLen(bytes.length(), out);
out.write(compressed.first(), 0, (int) length);
}
}

/**
* @since 3.5.3
* @param bytes bytes
* @param out out
* @throws IOException IOException
*/
public void rdbGenericSaveStringObject(ByteArray bytes, OutputStream out) throws IOException {
if (bytes.length() > 20) {
rdbSaveEncodedStringObject(bytes, out);
} else {
rdbSavePlainStringObject(bytes, out);
}
}

/**
* @since 3.5.3
* @param bytes bytes
* @param out out
* @throws IOException IOException
*/
public void rdbSavePlainStringObject(ByteArray bytes, OutputStream out) throws IOException {
rdbSaveLen(bytes.length(), out);
out.write(bytes.first());
}

private byte[] toUnsigned(long value) {
byte[] ary = new byte[8];
for (int i = 0; i < 8; i++) {
ary[7 - i] = (byte) ((value >>> (i << 3)) & 0xFF);
}
return ary;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2016-2017 Leon Chen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package syncer.replica.parser.rdb;


import syncer.replica.util.bytes.ByteBuilder;

import static java.lang.Integer.parseInt;

/**
* @author Leon Chen
*/
public class ByteParser {
public static byte[] toBytes(String str) {
char[] ary = str.toCharArray();
ByteBuilder s = ByteBuilder.allocate(ary.length);
for (int i = 0; i < ary.length; i++) {
switch (ary[i]) {
case '\\':
i++;
if (i < ary.length) {
switch (ary[i]) {
case 'n':
s.put((byte) '\n');
break;
case 'r':
s.put((byte) '\r');
break;
case 't':
s.put((byte) '\t');
break;
case 'b':
s.put((byte) '\b');
break;
case 'f':
s.put((byte) '\f');
break;
case 'a':
s.put((byte) 7);
break;
case 'x':
if (i + 2 >= ary.length) {
s.put((byte) '\\');
s.put((byte) 'x');
} else {
char high = ary[++i];
char low = ary[++i];
try {
s.put((byte) parseInt(new String(new char[]{high, low}), 16));
} catch (Exception e) {
s.put((byte) '\\');
s.put((byte) 'x');
s.put((byte) high);
s.put((byte) low);
}
}
break;
default:
s.put((byte) ary[i]);
break;
}
}
break;
default:
s.put((byte) ary[i]);
break;
}
}
return s.array();
}
}
Loading

0 comments on commit 5442670

Please sign in to comment.