From 21cbf7234c472500506305cdf19926d969d9150b Mon Sep 17 00:00:00 2001 From: Goson Zhang <4675739@qq.com> Date: Mon, 20 Jan 2025 14:10:29 +0800 Subject: [PATCH] [INLONG-11689][SDK] Optimize user reporting information management (#11690) * [INLONG-11689][SDK] Optimize user reporting information management * [INLONG-11689][SDK] Optimize user reporting information management --------- Co-authored-by: gosonzhang --- .../sdk/dataproxy/common/EventInfo.java | 148 +++++++++++ .../exception/ProxyEventException.java | 42 ++++ .../dataproxy/sender/http/HttpEventInfo.java | 88 +++++++ .../dataproxy/sender/tcp/TcpEventInfo.java | 113 +++++++++ .../sdk/dataproxy/utils/LogCounter.java | 2 +- .../sdk/dataproxy/utils/ProxyUtils.java | 36 ++- .../inlong/sdk/dataproxy/EventInfoTest.java | 229 ++++++++++++++++++ 7 files changed, 651 insertions(+), 7 deletions(-) create mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/EventInfo.java create mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/exception/ProxyEventException.java create mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpEventInfo.java create mode 100644 inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java create mode 100644 inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/EventInfoTest.java diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/EventInfo.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/EventInfo.java new file mode 100644 index 0000000000..3d63c10039 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/common/EventInfo.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.common; + +import org.apache.inlong.common.msg.AttributeConstants; +import org.apache.inlong.sdk.dataproxy.exception.ProxyEventException; +import org.apache.inlong.sdk.dataproxy.utils.LogCounter; +import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Report Event information class + * + * Used to encapsulate the data information reported by the caller, including + * grouId, streamId, dt, attributes, and body, auditVerison, msgUUID, etc. + * This class performs field value validity checks on the reported data and + * throws ProxyEventException for data that does not meet the reporting requirements, including + * mandatory fields that are empty, attribute sets that contain reserved words, attribute delimiters, + * empty message bodies, etc. + * Since the TCP and HTTP reports supported by the SDK differ only in the + * message body type, this class uses a Generics definition + */ +public abstract class EventInfo { + + protected static final Logger logger = LoggerFactory.getLogger(EventInfo.class); + protected static final LogCounter exceptCnt = new LogCounter(10, 100000, 60 * 1000L); + + private final String groupId; + private final String streamId; + private final long dtMs; + private final Map attrs = new HashMap<>(); + protected int msgCnt = 0; + protected int bodySize = 0; + protected final List bodyList = new ArrayList<>(); + + protected EventInfo(String groupId, String streamId, long dtMs, Long auditId, String msgUUID, + Map attrs, boolean isSingle, List bodyList) throws ProxyEventException { + // groupId + if (StringUtils.isBlank(groupId)) { + throw new ProxyEventException("groupId is blank!"); + } + this.groupId = groupId.trim(); + // streamId + if (StringUtils.isBlank(streamId)) { + throw new ProxyEventException("streamId is blank!"); + } + this.streamId = streamId.trim(); + // dtMs + this.dtMs = dtMs <= 0L ? System.currentTimeMillis() : dtMs; + // attrs + if (attrs != null && !attrs.isEmpty()) { + for (Map.Entry entry : attrs.entrySet()) { + if (StringUtils.isBlank(entry.getKey())) { + continue; + } + innSetAttr(entry.getKey().trim(), entry.getValue()); + } + } + if (auditId != null && auditId != -1L) { + this.attrs.put(AttributeConstants.AUDIT_VERSION, String.valueOf(auditId)); + } + if (StringUtils.isNotBlank(msgUUID)) { + this.attrs.put(AttributeConstants.MSG_UUID, msgUUID.trim()); + } + // body + setBodyList(isSingle, bodyList); + } + + public String getGroupId() { + return groupId; + } + + public String getStreamId() { + return streamId; + } + + public long getDtMs() { + return dtMs; + } + + public Map getAttrs() { + return attrs; + } + + public int getMsgCnt() { + return msgCnt; + } + + public int getBodySize() { + return bodySize; + } + + protected abstract void setBodyList(boolean isSingle, List bodyList) throws ProxyEventException; + + protected void innSetAttr(String key, String value) throws ProxyEventException { + if (ProxyUtils.SdkReservedWords.contains(key)) { + throw new ProxyEventException("Attribute key(" + key + ") is reserved word!"); + } + if (key.contains(AttributeConstants.SEPARATOR) + || key.contains(AttributeConstants.KEY_VALUE_SEPARATOR)) { + if (exceptCnt.shouldPrint()) { + logger.warn(String.format("Attribute key(%s) include reserved word(%s or %s)", + key, AttributeConstants.KEY_VALUE_SEPARATOR, AttributeConstants.KEY_VALUE_SEPARATOR)); + } + throw new ProxyEventException("Attribute key(" + key + ") include reserved word(" + + AttributeConstants.KEY_VALUE_SEPARATOR + " or " + + AttributeConstants.KEY_VALUE_SEPARATOR + ")!"); + } + String valValue = value; + if (valValue != null) { + valValue = valValue.trim(); + if (valValue.contains(AttributeConstants.SEPARATOR) + || valValue.contains(AttributeConstants.KEY_VALUE_SEPARATOR)) { + if (exceptCnt.shouldPrint()) { + logger.warn(String.format("Attribute value(%s) include reserved word(%s or %s)", + valValue, AttributeConstants.KEY_VALUE_SEPARATOR, AttributeConstants.KEY_VALUE_SEPARATOR)); + } + throw new ProxyEventException("Attribute value(" + valValue + ") include reserved word(" + + AttributeConstants.KEY_VALUE_SEPARATOR + " or " + + AttributeConstants.KEY_VALUE_SEPARATOR + ")!"); + } + } + this.attrs.put(key, valValue); + } +} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/exception/ProxyEventException.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/exception/ProxyEventException.java new file mode 100644 index 0000000000..7392631d61 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/exception/ProxyEventException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.exception; + +/** + * Proxy Event Exception + * + * This exception is used specifically when an unacceptable situation when constructing an event. + * If this exception is thrown, the caller needs to solve the specified problem or discard the illegal message. + */ +public class ProxyEventException extends Exception { + + public ProxyEventException() { + } + + public ProxyEventException(String message) { + super(message); + } + + public ProxyEventException(String message, Throwable cause) { + super(message, cause); + } + + public ProxyEventException(Throwable cause) { + super(cause); + } +} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpEventInfo.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpEventInfo.java new file mode 100644 index 0000000000..f8ae172bba --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/http/HttpEventInfo.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.sender.http; + +import org.apache.inlong.sdk.dataproxy.common.EventInfo; +import org.apache.inlong.sdk.dataproxy.exception.ProxyEventException; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Collections; +import java.util.List; + +/** + * HTTP Event Information class + * + * Used to encapsulate the data information reported by HTTP + */ +public class HttpEventInfo extends EventInfo { + + public HttpEventInfo(String groupId, String streamId, + long dtMs, String body) throws ProxyEventException { + super(groupId, streamId, dtMs, null, null, null, true, Collections.singletonList(body)); + } + + public HttpEventInfo(String groupId, String streamId, + long dtMs, long auditId, String body) throws ProxyEventException { + super(groupId, streamId, dtMs, auditId, null, null, true, Collections.singletonList(body)); + } + + public HttpEventInfo(String groupId, String streamId, + long dtMs, List bodyList) throws ProxyEventException { + super(groupId, streamId, dtMs, null, null, null, false, bodyList); + } + + public HttpEventInfo(String groupId, String streamId, + long dtMs, long auditId, List bodyList) throws ProxyEventException { + super(groupId, streamId, dtMs, auditId, null, null, false, bodyList); + } + + public List getBodyList() { + return bodyList; + } + + @Override + protected void setBodyList(boolean isSingle, List bodyList) throws ProxyEventException { + String tmpValue; + if (isSingle) { + if (StringUtils.isBlank(bodyList.get(0))) { + throw new ProxyEventException("body is null or empty!"); + } + tmpValue = bodyList.get(0).trim(); + this.bodyList.add(tmpValue); + this.bodySize = tmpValue.length(); + this.msgCnt = 1; + } else { + if (bodyList == null || bodyList.isEmpty()) { + throw new ProxyEventException("bodyList is null or empty!"); + } + for (String body : bodyList) { + if (StringUtils.isBlank(body)) { + continue; + } + tmpValue = body.trim(); + this.bodyList.add(tmpValue.trim()); + this.bodySize += tmpValue.length(); + this.msgCnt++; + } + if (this.bodyList.isEmpty()) { + throw new ProxyEventException("bodyList no valid content!"); + } + } + } +} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java new file mode 100644 index 0000000000..dfa255aab6 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpEventInfo.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy.sender.tcp; + +import org.apache.inlong.sdk.dataproxy.common.EventInfo; +import org.apache.inlong.sdk.dataproxy.exception.ProxyEventException; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * HTTP Event Information class + * + * Used to encapsulate the data information reported by TCP + */ +public class TcpEventInfo extends EventInfo { + + public TcpEventInfo(String groupId, String streamId, long dtMs, + Map attrs, byte[] body) throws ProxyEventException { + super(groupId, streamId, dtMs, null, null, attrs, true, Collections.singletonList(body)); + } + + public TcpEventInfo(String groupId, String streamId, long dtMs, long auditId, + Map attrs, byte[] body) throws ProxyEventException { + super(groupId, streamId, dtMs, auditId, null, attrs, true, Collections.singletonList(body)); + } + + public TcpEventInfo(String groupId, String streamId, long dtMs, String msgUUID, + Map attrs, byte[] body) throws ProxyEventException { + super(groupId, streamId, dtMs, null, msgUUID, attrs, true, Collections.singletonList(body)); + } + + public TcpEventInfo(String groupId, String streamId, long dtMs, long auditId, String msgUUID, + Map attrs, byte[] body) throws ProxyEventException { + super(groupId, streamId, dtMs, auditId, msgUUID, attrs, true, Collections.singletonList(body)); + } + + public TcpEventInfo(String groupId, String streamId, + long dtMs, Map attrs, List bodyList) throws ProxyEventException { + super(groupId, streamId, dtMs, null, null, attrs, false, bodyList); + } + + public TcpEventInfo(String groupId, String streamId, long dtMs, + long auditId, Map attrs, List bodyList) throws ProxyEventException { + super(groupId, streamId, dtMs, auditId, null, attrs, false, bodyList); + } + + public TcpEventInfo(String groupId, String streamId, long dtMs, + String msgUUID, Map attrs, List bodyList) throws ProxyEventException { + super(groupId, streamId, dtMs, null, msgUUID, attrs, false, bodyList); + } + + public TcpEventInfo(String groupId, String streamId, long dtMs, + long auditId, String msgUUID, Map attrs, List bodyList) throws ProxyEventException { + super(groupId, streamId, dtMs, auditId, msgUUID, attrs, false, bodyList); + } + + public List getBodyList() { + return bodyList; + } + + public void setAttr(String key, String value) throws ProxyEventException { + if (StringUtils.isBlank(key)) { + throw new ProxyEventException("Key is blank!"); + } + innSetAttr(key.trim(), value); + } + + @Override + protected void setBodyList(boolean isSingle, List bodyList) throws ProxyEventException { + if (isSingle) { + if (bodyList.get(0) == null || bodyList.get(0).length == 0) { + throw new ProxyEventException("body is null or empty!"); + } + this.bodyList.add(bodyList.get(0)); + this.bodySize = bodyList.get(0).length; + this.msgCnt = 1; + } else { + if (bodyList == null || bodyList.isEmpty()) { + throw new ProxyEventException("bodyList is null or empty!"); + } + for (byte[] body : bodyList) { + if (body == null || body.length == 0) { + continue; + } + this.bodyList.add(body); + this.bodySize += body.length; + this.msgCnt++; + } + if (this.bodyList.isEmpty()) { + throw new ProxyEventException("bodyList no valid content!"); + } + } + } +} diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/LogCounter.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/LogCounter.java index edb14c62b2..3fac799a3c 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/LogCounter.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/LogCounter.java @@ -27,7 +27,7 @@ public class LogCounter { private long control = 100000L; private long reset = 60 * 1000L; - private AtomicLong lastLogTime = new AtomicLong(System.currentTimeMillis()); + private final AtomicLong lastLogTime = new AtomicLong(System.currentTimeMillis()); public LogCounter(long start, long control, long reset) { this.start = start; diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java index 0367e6e06a..61dbb263df 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java @@ -38,11 +38,20 @@ public class ProxyUtils { + public static final String KEY_FILE_STATUS_CHECK = "_file_status_check"; + public static final String KEY_SECRET_ID = "_secretId"; + public static final String KEY_SIGNATURE = "_signature"; + public static final String KEY_TIME_STAMP = "_timeStamp"; + public static final String KEY_NONCE = "_nonce"; + public static final String KEY_USERNAME = "_userName"; + public static final String KEY_CLIENT_IP = "_clientIP"; + public static final String KEY_ENCY_VERSION = "_encyVersion"; + public static final String KEY_ENCY_AES_KEY = "_encyAesKey"; private static final Logger logger = LoggerFactory.getLogger(ProxyUtils.class); private static final LogCounter exceptCounter = new LogCounter(10, 200000, 60 * 1000L); private static final int TIME_LENGTH = 13; - private static final Set invalidAttr = new HashSet<>(); + public static final Set SdkReservedWords = new HashSet<>(); public static final Set SdkAllowedMsgType = new HashSet<>(); private static String localHost; private static String sdkVersion; @@ -50,10 +59,25 @@ public class ProxyUtils { static { localHost = getLocalIp(); getJarVersion(); - Collections.addAll(invalidAttr, "groupId", "streamId", "dt", "msgUUID", "cp", - "cnt", "mt", "m", "sid", "t", "NodeIP", "messageId", "_file_status_check", "_secretId", - "_signature", "_timeStamp", "_nonce", "_userName", "_clientIP", "_encyVersion", "_encyAesKey", - "proxySend", "errMsg", "errCode", AttributeConstants.MSG_RPT_TIME); + Collections.addAll(SdkReservedWords, + AttributeConstants.GROUP_ID, AttributeConstants.STREAM_ID, + AttributeConstants.DATA_TIME, AttributeConstants.MSG_UUID, + AttributeConstants.COMPRESS_TYPE, AttributeConstants.MESSAGE_COUNT, + AttributeConstants.MESSAGE_TYPE, AttributeConstants.METHOD, + AttributeConstants.SEQUENCE_ID, AttributeConstants.TIME_STAMP, + AttributeConstants.NODE_IP, AttributeConstants.MESSAGE_ID, + AttributeConstants.MESSAGE_IS_ACK, AttributeConstants.MESSAGE_PROXY_SEND, + AttributeConstants.MESSAGE_PROCESS_ERRCODE, AttributeConstants.MESSAGE_PROCESS_ERRMSG, + AttributeConstants.MSG_RPT_TIME, AttributeConstants.AUDIT_VERSION, + AttributeConstants.PROXY_SDK_VERSION, KEY_FILE_STATUS_CHECK, + KEY_SECRET_ID, KEY_SIGNATURE, KEY_TIME_STAMP, KEY_NONCE, KEY_USERNAME, + KEY_CLIENT_IP, KEY_ENCY_VERSION, KEY_ENCY_AES_KEY); + /* + * Collections.addAll(SdkReservedWords, "groupId", "streamId", "dt", "msgUUID", "cp", "cnt", "mt", "m", "sid", + * "t", "NodeIP", "messageId", "isAck", "proxySend", "errCode", "errMsg", "rtms", "sdkVersion", "auditVersion", + * "_file_status_check", "_secretId", "_signature", "_timeStamp", "_nonce", "_userName", "_clientIP", + * "_encyVersion", "_encyAesKey"); + */ Collections.addAll(SdkAllowedMsgType, MsgType.MSG_ACK_SERVICE, MsgType.MSG_MULTI_BODY, MsgType.MSG_BIN_MULTI_BODY); @@ -106,7 +130,7 @@ public static boolean isAttrKeysValid(Map attrsMap) { return false; } for (String key : attrsMap.keySet()) { - if (invalidAttr.contains(key)) { + if (SdkReservedWords.contains(key)) { logger.error("the attributes is invalid ,please check ! {}", key); return false; } diff --git a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/EventInfoTest.java b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/EventInfoTest.java new file mode 100644 index 0000000000..c2af1eacb4 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/EventInfoTest.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dataproxy; + +import org.apache.inlong.sdk.dataproxy.exception.ProxyEventException; +import org.apache.inlong.sdk.dataproxy.sender.http.HttpEventInfo; +import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpEventInfo; + +import org.junit.Assert; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class EventInfoTest { + + @Test + public void testTcpEventInfo() throws Exception { + // case a1 normal data setting + String groupId = "groupId"; + String streamId = "streamId"; + long dtMs = System.currentTimeMillis(); + Map attrsA1 = null; + byte[] body = "test".getBytes(StandardCharsets.UTF_8); + TcpEventInfo eventInfoA1 = + new TcpEventInfo(groupId, streamId, dtMs, attrsA1, body); + Assert.assertEquals(groupId, eventInfoA1.getGroupId()); + Assert.assertEquals(streamId, eventInfoA1.getStreamId()); + Assert.assertEquals(dtMs, eventInfoA1.getDtMs()); + Assert.assertNotNull(eventInfoA1.getAttrs()); + Assert.assertEquals(0, eventInfoA1.getAttrs().size()); + Assert.assertEquals(1, eventInfoA1.getMsgCnt()); + Assert.assertEquals(body.length, eventInfoA1.getBodySize()); + Assert.assertArrayEquals(body, eventInfoA1.getBodyList().get(0)); + eventInfoA1.setAttr("a1key", "mmm"); + Assert.assertEquals(1, eventInfoA1.getAttrs().size()); + Assert.assertEquals(1, eventInfoA1.getMsgCnt()); + // case a2 normal data setting + Map attrsA2 = new HashMap<>(); + List bodyListA2 = new ArrayList<>(); + bodyListA2.add("test_msg_1".getBytes(StandardCharsets.UTF_8)); + bodyListA2.add("test_msg_2".getBytes(StandardCharsets.UTF_8)); + TcpEventInfo eventInfoA2 = + new TcpEventInfo(groupId, streamId, dtMs, attrsA2, bodyListA2); + Assert.assertEquals(groupId, eventInfoA2.getGroupId()); + Assert.assertEquals(streamId, eventInfoA2.getStreamId()); + Assert.assertEquals(dtMs, eventInfoA2.getDtMs()); + Assert.assertEquals(attrsA2, eventInfoA2.getAttrs()); + Assert.assertEquals(2, eventInfoA2.getMsgCnt()); + int totalSize = 0; + List tgtListA2 = eventInfoA2.getBodyList(); + for (byte[] b : bodyListA2) { + Assert.assertTrue(tgtListA2.contains(b)); + totalSize += b.length; + } + Assert.assertEquals(totalSize, eventInfoA2.getBodySize()); + eventInfoA2.setAttr("a2key", "ccc"); + Assert.assertEquals(1, eventInfoA2.getAttrs().size()); + // case A3 abnormal data setting + Assert.assertThrows(ProxyEventException.class, + () -> new TcpEventInfo(null, streamId, dtMs, attrsA1, body)); + Assert.assertThrows(ProxyEventException.class, + () -> new TcpEventInfo(" ", streamId, dtMs, attrsA1, body)); + Assert.assertThrows(ProxyEventException.class, + () -> new TcpEventInfo(groupId, null, dtMs, attrsA1, body)); + Assert.assertThrows(ProxyEventException.class, + () -> new TcpEventInfo(groupId, " ", dtMs, attrsA1, body)); + byte[] bodyA301 = null; + Assert.assertThrows(ProxyEventException.class, + () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA1, bodyA301)); + byte[] bodyA302 = "".getBytes(StandardCharsets.UTF_8); + Assert.assertThrows(ProxyEventException.class, + () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA1, bodyA302)); + byte[] bodyA303 = " ".getBytes(StandardCharsets.UTF_8); + TcpEventInfo eventInfoA303 = + new TcpEventInfo(groupId, streamId, dtMs, attrsA1, bodyA303); + Assert.assertEquals(1, eventInfoA303.getMsgCnt()); + Assert.assertEquals(bodyA303.length, eventInfoA303.getBodySize()); + List msgListA311 = null; + Assert.assertThrows(ProxyEventException.class, + () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA1, msgListA311)); + List msgListA312 = new ArrayList<>(); + Assert.assertThrows(ProxyEventException.class, + () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA1, msgListA312)); + List msgListA313 = new ArrayList<>(); + msgListA313.add(null); + Assert.assertThrows(ProxyEventException.class, + () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA1, msgListA313)); + List msgListA314 = new ArrayList<>(); + msgListA314.add("".getBytes(StandardCharsets.UTF_8)); + Assert.assertThrows(ProxyEventException.class, + () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA1, msgListA314)); + List msgListA315 = new ArrayList<>(); + msgListA315.add("".getBytes(StandardCharsets.UTF_8)); + msgListA315.add("test".getBytes(StandardCharsets.UTF_8)); + TcpEventInfo eventInfoA315 = + new TcpEventInfo(groupId, streamId, dtMs, attrsA1, msgListA315); + Assert.assertEquals(1, eventInfoA315.getMsgCnt()); + Assert.assertEquals("test".getBytes(StandardCharsets.UTF_8).length, eventInfoA315.getBodySize()); + // case A4 normal attributes setting + Map attrsA41 = new HashMap<>(); + attrsA41.put("aaa&mmm", "value"); + Assert.assertThrows(ProxyEventException.class, + () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA41, body)); + Map attrsA42 = new HashMap<>(); + attrsA42.put("aaa=mmm", "value"); + Assert.assertThrows(ProxyEventException.class, + () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA42, body)); + Map attrsA43 = new HashMap<>(); + attrsA43.put("groupId", "value"); + Assert.assertThrows(ProxyEventException.class, + () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA43, body)); + Map attrsA44 = new HashMap<>(); + attrsA44.put("testA44", "va&lue"); + Assert.assertThrows(ProxyEventException.class, + () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA44, body)); + Map attrsA45 = new HashMap<>(); + attrsA45.put("testA45", "va=lue"); + Assert.assertThrows(ProxyEventException.class, + () -> new TcpEventInfo(groupId, streamId, dtMs, attrsA45, body)); + TcpEventInfo eventInfoA46 = + new TcpEventInfo(groupId, streamId, dtMs, attrsA1, body); + Assert.assertThrows(ProxyEventException.class, + () -> eventInfoA46.setAttr("aaa&mmm", "value")); + Assert.assertThrows(ProxyEventException.class, + () -> eventInfoA46.setAttr("streamId", "value")); + Assert.assertThrows(ProxyEventException.class, + () -> eventInfoA46.setAttr("kkk=mmm", "value")); + Assert.assertThrows(ProxyEventException.class, + () -> eventInfoA46.setAttr("aaaa", "va=lue")); + Assert.assertThrows(ProxyEventException.class, + () -> eventInfoA46.setAttr("aaaa", "va&lue")); + // case 5, set uuid, auditVersion + String uuid = "uuid"; + long auditVer = 32L; + TcpEventInfo eventInfoA51 = + new TcpEventInfo(groupId, streamId, dtMs, auditVer, attrsA1, body); + Assert.assertNotNull(eventInfoA51.getAttrs()); + Assert.assertEquals(1, eventInfoA51.getAttrs().size()); + TcpEventInfo eventInfoA52 = + new TcpEventInfo(groupId, streamId, dtMs, uuid, attrsA1, body); + Assert.assertNotNull(eventInfoA52.getAttrs()); + Assert.assertEquals(1, eventInfoA52.getAttrs().size()); + TcpEventInfo eventInfoA53 = + new TcpEventInfo(groupId, streamId, dtMs, auditVer, uuid, attrsA1, body); + Assert.assertNotNull(eventInfoA53.getAttrs()); + Assert.assertEquals(2, eventInfoA53.getAttrs().size()); + } + + @Test + public void testHttpEventInfo() throws Exception { + // case a1 normal data setting + String groupId = "groupId"; + String streamId = "streamId"; + long dtMs = System.currentTimeMillis(); + String body = "test"; + HttpEventInfo eventInfoA1 = + new HttpEventInfo(groupId, streamId, dtMs, body); + Assert.assertEquals(groupId, eventInfoA1.getGroupId()); + Assert.assertEquals(streamId, eventInfoA1.getStreamId()); + Assert.assertEquals(dtMs, eventInfoA1.getDtMs()); + Assert.assertNotNull(eventInfoA1.getAttrs()); + Assert.assertTrue(eventInfoA1.getAttrs().isEmpty()); + Assert.assertEquals(1, eventInfoA1.getMsgCnt()); + Assert.assertEquals(body.length(), eventInfoA1.getBodySize()); + Assert.assertEquals(body, eventInfoA1.getBodyList().get(0)); + // case A2 normal setting + List bodyListA2 = new ArrayList<>(); + bodyListA2.add("test_body_1"); + bodyListA2.add("test_body_2"); + HttpEventInfo eventInfoA2 = + new HttpEventInfo(groupId, streamId, dtMs, bodyListA2); + Assert.assertEquals(groupId, eventInfoA2.getGroupId()); + Assert.assertEquals(streamId, eventInfoA2.getStreamId()); + Assert.assertEquals(dtMs, eventInfoA2.getDtMs()); + Assert.assertNotNull(eventInfoA2.getAttrs()); + Assert.assertTrue(eventInfoA2.getAttrs().isEmpty()); + Assert.assertEquals(2, eventInfoA2.getMsgCnt()); + int totalSize = 0; + List tgtA2 = eventInfoA2.getBodyList(); + for (String item : tgtA2) { + Assert.assertTrue(bodyListA2.contains(item)); + totalSize += item.length(); + } + Assert.assertEquals(totalSize, eventInfoA2.getBodySize()); + // case A3 auditVer + long auditVer = 1000L; + HttpEventInfo eventInfoA3 = + new HttpEventInfo(groupId, streamId, dtMs, auditVer, body); + Assert.assertNotNull(eventInfoA3.getAttrs()); + Assert.assertEquals(1, eventInfoA3.getAttrs().size()); + // case A4 abnormal setting + Assert.assertThrows(ProxyEventException.class, + () -> new HttpEventInfo(null, streamId, dtMs, body)); + Assert.assertThrows(ProxyEventException.class, + () -> new HttpEventInfo(" ", streamId, dtMs, body)); + Assert.assertThrows(ProxyEventException.class, + () -> new HttpEventInfo(groupId, null, dtMs, body)); + Assert.assertThrows(ProxyEventException.class, + () -> new HttpEventInfo(groupId, " ", dtMs, body)); + String bodyA401 = null; + Assert.assertThrows(ProxyEventException.class, + () -> new HttpEventInfo(groupId, streamId, dtMs, bodyA401)); + String bodyA402 = ""; + Assert.assertThrows(ProxyEventException.class, + () -> new HttpEventInfo(groupId, streamId, dtMs, bodyA402)); + String bodyA403 = " "; + Assert.assertThrows(ProxyEventException.class, + () -> new HttpEventInfo(groupId, streamId, dtMs, bodyA403)); + } +}