Skip to content

Commit

Permalink
[INLONG-11689][SDK] Optimize user reporting information management (#…
Browse files Browse the repository at this point in the history
…11690)

* [INLONG-11689][SDK] Optimize user reporting information management

* [INLONG-11689][SDK] Optimize user reporting information management

---------

Co-authored-by: gosonzhang <[email protected]>
  • Loading branch information
gosonzhang and gosonzhang authored Jan 20, 2025
1 parent 3d754fa commit 21cbf72
Show file tree
Hide file tree
Showing 7 changed files with 651 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy.common;

import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.sdk.dataproxy.exception.ProxyEventException;
import org.apache.inlong.sdk.dataproxy.utils.LogCounter;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Report Event information class
*
* Used to encapsulate the data information reported by the caller, including
* grouId, streamId, dt, attributes, and body, auditVerison, msgUUID, etc.
* This class performs field value validity checks on the reported data and
* throws ProxyEventException for data that does not meet the reporting requirements, including
* mandatory fields that are empty, attribute sets that contain reserved words, attribute delimiters,
* empty message bodies, etc.
* Since the TCP and HTTP reports supported by the SDK differ only in the
* message body type, this class uses a Generics definition
*/
public abstract class EventInfo<T> {

protected static final Logger logger = LoggerFactory.getLogger(EventInfo.class);
protected static final LogCounter exceptCnt = new LogCounter(10, 100000, 60 * 1000L);

private final String groupId;
private final String streamId;
private final long dtMs;
private final Map<String, String> attrs = new HashMap<>();
protected int msgCnt = 0;
protected int bodySize = 0;
protected final List<T> bodyList = new ArrayList<>();

protected EventInfo(String groupId, String streamId, long dtMs, Long auditId, String msgUUID,
Map<String, String> attrs, boolean isSingle, List<T> bodyList) throws ProxyEventException {
// groupId
if (StringUtils.isBlank(groupId)) {
throw new ProxyEventException("groupId is blank!");
}
this.groupId = groupId.trim();
// streamId
if (StringUtils.isBlank(streamId)) {
throw new ProxyEventException("streamId is blank!");
}
this.streamId = streamId.trim();
// dtMs
this.dtMs = dtMs <= 0L ? System.currentTimeMillis() : dtMs;
// attrs
if (attrs != null && !attrs.isEmpty()) {
for (Map.Entry<String, String> entry : attrs.entrySet()) {
if (StringUtils.isBlank(entry.getKey())) {
continue;
}
innSetAttr(entry.getKey().trim(), entry.getValue());
}
}
if (auditId != null && auditId != -1L) {
this.attrs.put(AttributeConstants.AUDIT_VERSION, String.valueOf(auditId));
}
if (StringUtils.isNotBlank(msgUUID)) {
this.attrs.put(AttributeConstants.MSG_UUID, msgUUID.trim());
}
// body
setBodyList(isSingle, bodyList);
}

public String getGroupId() {
return groupId;
}

public String getStreamId() {
return streamId;
}

public long getDtMs() {
return dtMs;
}

public Map<String, String> getAttrs() {
return attrs;
}

public int getMsgCnt() {
return msgCnt;
}

public int getBodySize() {
return bodySize;
}

protected abstract void setBodyList(boolean isSingle, List<T> bodyList) throws ProxyEventException;

protected void innSetAttr(String key, String value) throws ProxyEventException {
if (ProxyUtils.SdkReservedWords.contains(key)) {
throw new ProxyEventException("Attribute key(" + key + ") is reserved word!");
}
if (key.contains(AttributeConstants.SEPARATOR)
|| key.contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
if (exceptCnt.shouldPrint()) {
logger.warn(String.format("Attribute key(%s) include reserved word(%s or %s)",
key, AttributeConstants.KEY_VALUE_SEPARATOR, AttributeConstants.KEY_VALUE_SEPARATOR));
}
throw new ProxyEventException("Attribute key(" + key + ") include reserved word("
+ AttributeConstants.KEY_VALUE_SEPARATOR + " or "
+ AttributeConstants.KEY_VALUE_SEPARATOR + ")!");
}
String valValue = value;
if (valValue != null) {
valValue = valValue.trim();
if (valValue.contains(AttributeConstants.SEPARATOR)
|| valValue.contains(AttributeConstants.KEY_VALUE_SEPARATOR)) {
if (exceptCnt.shouldPrint()) {
logger.warn(String.format("Attribute value(%s) include reserved word(%s or %s)",
valValue, AttributeConstants.KEY_VALUE_SEPARATOR, AttributeConstants.KEY_VALUE_SEPARATOR));
}
throw new ProxyEventException("Attribute value(" + valValue + ") include reserved word("
+ AttributeConstants.KEY_VALUE_SEPARATOR + " or "
+ AttributeConstants.KEY_VALUE_SEPARATOR + ")!");
}
}
this.attrs.put(key, valValue);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy.exception;

/**
* Proxy Event Exception
*
* This exception is used specifically when an unacceptable situation when constructing an event.
* If this exception is thrown, the caller needs to solve the specified problem or discard the illegal message.
*/
public class ProxyEventException extends Exception {

public ProxyEventException() {
}

public ProxyEventException(String message) {
super(message);
}

public ProxyEventException(String message, Throwable cause) {
super(message, cause);
}

public ProxyEventException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy.sender.http;

import org.apache.inlong.sdk.dataproxy.common.EventInfo;
import org.apache.inlong.sdk.dataproxy.exception.ProxyEventException;

import org.apache.commons.lang3.StringUtils;

import java.util.Collections;
import java.util.List;

/**
* HTTP Event Information class
*
* Used to encapsulate the data information reported by HTTP
*/
public class HttpEventInfo extends EventInfo<String> {

public HttpEventInfo(String groupId, String streamId,
long dtMs, String body) throws ProxyEventException {
super(groupId, streamId, dtMs, null, null, null, true, Collections.singletonList(body));
}

public HttpEventInfo(String groupId, String streamId,
long dtMs, long auditId, String body) throws ProxyEventException {
super(groupId, streamId, dtMs, auditId, null, null, true, Collections.singletonList(body));
}

public HttpEventInfo(String groupId, String streamId,
long dtMs, List<String> bodyList) throws ProxyEventException {
super(groupId, streamId, dtMs, null, null, null, false, bodyList);
}

public HttpEventInfo(String groupId, String streamId,
long dtMs, long auditId, List<String> bodyList) throws ProxyEventException {
super(groupId, streamId, dtMs, auditId, null, null, false, bodyList);
}

public List<String> getBodyList() {
return bodyList;
}

@Override
protected void setBodyList(boolean isSingle, List<String> bodyList) throws ProxyEventException {
String tmpValue;
if (isSingle) {
if (StringUtils.isBlank(bodyList.get(0))) {
throw new ProxyEventException("body is null or empty!");
}
tmpValue = bodyList.get(0).trim();
this.bodyList.add(tmpValue);
this.bodySize = tmpValue.length();
this.msgCnt = 1;
} else {
if (bodyList == null || bodyList.isEmpty()) {
throw new ProxyEventException("bodyList is null or empty!");
}
for (String body : bodyList) {
if (StringUtils.isBlank(body)) {
continue;
}
tmpValue = body.trim();
this.bodyList.add(tmpValue.trim());
this.bodySize += tmpValue.length();
this.msgCnt++;
}
if (this.bodyList.isEmpty()) {
throw new ProxyEventException("bodyList no valid content!");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy.sender.tcp;

import org.apache.inlong.sdk.dataproxy.common.EventInfo;
import org.apache.inlong.sdk.dataproxy.exception.ProxyEventException;

import org.apache.commons.lang3.StringUtils;

import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* HTTP Event Information class
*
* Used to encapsulate the data information reported by TCP
*/
public class TcpEventInfo extends EventInfo<byte[]> {

public TcpEventInfo(String groupId, String streamId, long dtMs,
Map<String, String> attrs, byte[] body) throws ProxyEventException {
super(groupId, streamId, dtMs, null, null, attrs, true, Collections.singletonList(body));
}

public TcpEventInfo(String groupId, String streamId, long dtMs, long auditId,
Map<String, String> attrs, byte[] body) throws ProxyEventException {
super(groupId, streamId, dtMs, auditId, null, attrs, true, Collections.singletonList(body));
}

public TcpEventInfo(String groupId, String streamId, long dtMs, String msgUUID,
Map<String, String> attrs, byte[] body) throws ProxyEventException {
super(groupId, streamId, dtMs, null, msgUUID, attrs, true, Collections.singletonList(body));
}

public TcpEventInfo(String groupId, String streamId, long dtMs, long auditId, String msgUUID,
Map<String, String> attrs, byte[] body) throws ProxyEventException {
super(groupId, streamId, dtMs, auditId, msgUUID, attrs, true, Collections.singletonList(body));
}

public TcpEventInfo(String groupId, String streamId,
long dtMs, Map<String, String> attrs, List<byte[]> bodyList) throws ProxyEventException {
super(groupId, streamId, dtMs, null, null, attrs, false, bodyList);
}

public TcpEventInfo(String groupId, String streamId, long dtMs,
long auditId, Map<String, String> attrs, List<byte[]> bodyList) throws ProxyEventException {
super(groupId, streamId, dtMs, auditId, null, attrs, false, bodyList);
}

public TcpEventInfo(String groupId, String streamId, long dtMs,
String msgUUID, Map<String, String> attrs, List<byte[]> bodyList) throws ProxyEventException {
super(groupId, streamId, dtMs, null, msgUUID, attrs, false, bodyList);
}

public TcpEventInfo(String groupId, String streamId, long dtMs,
long auditId, String msgUUID, Map<String, String> attrs, List<byte[]> bodyList) throws ProxyEventException {
super(groupId, streamId, dtMs, auditId, msgUUID, attrs, false, bodyList);
}

public List<byte[]> getBodyList() {
return bodyList;
}

public void setAttr(String key, String value) throws ProxyEventException {
if (StringUtils.isBlank(key)) {
throw new ProxyEventException("Key is blank!");
}
innSetAttr(key.trim(), value);
}

@Override
protected void setBodyList(boolean isSingle, List<byte[]> bodyList) throws ProxyEventException {
if (isSingle) {
if (bodyList.get(0) == null || bodyList.get(0).length == 0) {
throw new ProxyEventException("body is null or empty!");
}
this.bodyList.add(bodyList.get(0));
this.bodySize = bodyList.get(0).length;
this.msgCnt = 1;
} else {
if (bodyList == null || bodyList.isEmpty()) {
throw new ProxyEventException("bodyList is null or empty!");
}
for (byte[] body : bodyList) {
if (body == null || body.length == 0) {
continue;
}
this.bodyList.add(body);
this.bodySize += body.length;
this.msgCnt++;
}
if (this.bodyList.isEmpty()) {
throw new ProxyEventException("bodyList no valid content!");
}
}
}
}
Loading

0 comments on commit 21cbf72

Please sign in to comment.