Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-10387][Audit] Audit SDK supports obtaining Audit-Proxy capabilities through InLong Manager #10398

Merged
merged 12 commits into from
Jun 14, 2024
Merged
4 changes: 4 additions & 0 deletions inlong-audit/audit-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,29 @@
* limitations under the License.
*/

package org.apache.inlong.audit.entities;
package org.apache.inlong.audit.entity;

public enum AuditComponent {

AGENT("Agent"), DATAPROXY("DataProxy"), SORT("Sort"), COMMON_AUDIT("Common");
private final String component;

/**
* Constructor for the enum.
*
* @param component the name of the component
*/

AuditComponent(String component) {
this.component = component;
}

/**
* Returns the name of the component.
*
* @return the name of the component
*/

public String getComponent() {
return component;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.inlong.audit.entities;
package org.apache.inlong.audit.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
Expand All @@ -24,6 +24,23 @@
@AllArgsConstructor
public class AuditProxy {

/**
* The host of the audit proxy.
*/
private String host;

/**
* The port of the audit proxy.
*/
private int port;

/**
* Returns a string representation of the audit proxy.
*
* @return a string representation of the audit proxy
*/
@Override
public String toString() {
return String.format("%s:%d", host, port);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.entity;

import com.google.gson.Gson;
import lombok.Data;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;

/**
* Class representing a common response.
*/
@Data
public class CommonResponse<T> {

/**
* Gson instance for JSON serialization and deserialization.
*/
private static final Gson gson = new Gson();

/**
* Error message of the response.
*/
private String errMsg;

/**
* Success status of the response.
*/
private boolean success;

/**
* Data of the response.
*/
private List<T> data;

/**
* Converts a JSON string to a CommonResponse object.
*
* @param json the JSON string
* @param clazz the class of the data
* @return a CommonResponse object
*/
public static CommonResponse fromJson(String json, Class clazz) {
Type objectType = type(CommonResponse.class, clazz);
return gson.fromJson(json, objectType);
}

/**
* Returns a parameterized type.
*
* @param raw the raw type
* @param args the actual type arguments
* @return a parameterized type
*/
private static ParameterizedType type(final Class raw, final Type... args) {
return new ParameterizedType() {

@Override
public Type getRawType() {
return raw;
}

@Override
public Type[] getActualTypeArguments() {
return args;
}

@Override
public Type getOwnerType() {
return null;
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.audit.utils;

import org.apache.inlong.common.util.BasicAuth;

import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;

import java.util.HashMap;
import java.util.Map;

public class HttpUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(HttpUtils.class);
private static final String PARAM_COMPONENT = "component";
private static HttpClient httpClient = null;

static {
try {
SSLContext sslContext = SSLContextBuilder.create()
.loadTrustMaterial(new TrustSelfSignedStrategy())
.build();

httpClient = HttpClientBuilder.create()
.setSSLContext(sslContext)
.build();
} catch (Exception e) {
LOGGER.error("Error initializing SSL context or HTTP client", e);
}
}

public static Map<String, String> getAuthHeader(String secretId, String secretKey) {
Map<String, String> header = new HashMap<>();
try {
header.put(BasicAuth.BASIC_AUTH_HEADER,
BasicAuth.genBasicAuthCredential(secretId, secretKey));
} catch (Exception e) {
LOGGER.error("Get auth header error", e);
}
return header;
}

public static String httpGet(String component, String url, String secretId, String secretKey, int timeoutMs) {
if (httpClient == null) {
LOGGER.error("httpClient is null");
return null;
}
try {
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(timeoutMs)
.setConnectionRequestTimeout(timeoutMs)
.setSocketTimeout(timeoutMs)
.build();
URIBuilder uriBuilder = new URIBuilder(url);
uriBuilder.addParameter(PARAM_COMPONENT, component);
String finalUrl = uriBuilder.build().toString();

HttpGet request = new HttpGet(finalUrl);
request.setConfig(requestConfig);

Map<String, String> authHeaders = getAuthHeader(secretId, secretKey);
for (Map.Entry<String, String> entry : authHeaders.entrySet()) {
request.addHeader(entry.getKey(), entry.getValue());
}

try (CloseableHttpResponse response = (CloseableHttpResponse) httpClient.execute(request)) {
String responseStr = EntityUtils.toString(response.getEntity());
LOGGER.info("Http response: {}", responseStr);
if (responseStr != null && !responseStr.isEmpty()
&& response.getStatusLine().getStatusCode() == 200) {
return responseStr;
}
}
} catch (Exception e) {
LOGGER.error("Send get request has exception", e);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.apache.inlong.audit.entities.AuditComponent.COMMON_AUDIT;
import static org.apache.inlong.audit.entity.AuditComponent.COMMON_AUDIT;

public class Heartbeat {

Expand Down
48 changes: 42 additions & 6 deletions inlong-audit/audit-sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,59 @@ which can ensure that each module is reconciled in accordance with the unified a

## Usage
### Configure Audit Proxy Addresses
The Audit SDK will summarize the results according to the cycle
- The Audit SDK will summarize the results according to the cycle
and send them to the ip:port list set by the interface.
- There are two ways to set the address of the Audit Proxy, configure the address directly or get the address from the manager. Please choose one of the methods.
#### Configure Audit Proxy Addresses by fixed ip:port
If the ip:port of the AuditProxy is fixed, then this interface needs to be called once.
If the AuditProxy changes in real time, then the business program needs to call this interface periodically to update
If the AuditProxy changes in real time, then the business program needs to call this interface periodically to update.
```java
HashSet<String> ipPortList=new HashSet<>();
HashSet<String> ipPortList = new HashSet<>();
ipPortList.add("0.0.0.0:54041");
AuditOperator.getInstance().setAuditProxy(ipPortList);
```
dockerzhang marked this conversation as resolved.
Show resolved Hide resolved
#### Configure Audit Proxy Addresses by InLong Manager
By configuring the InLong Manager's address, module information, and manager certification information,
The Audit SDK will automatically fetch the Manager to obtain the address of the Audit Proxy.
```java
String host = "127.0.0.1:8083"; // The manager address
String secretId = "*****"; // Secret id
String secretKey = "******"; // Secret key
AuditOperator.getInstance().setAuditProxy(AuditComponent,host,secretId,secretKey);
```
- Explain of AuditComponent
```java
public enum AuditComponent {

AGENT("Agent"), DATAPROXY("DataProxy"), SORT("Sort"), COMMON_AUDIT("Common");
private final String component;

/**
* Constructor for the enum.
*
* @param component the name of the component
*/

AuditComponent(String component) {
this.component = component;
}

/**
* Returns the name of the component.
*
* @return the name of the component
*/

public String getComponent() {
return component;
}
}
```

### Add Audit Data
Call the add method for statistics, where the auditID parameter uniquely identifies an audit object,
inlongGroupID,inlongStreamID,logTime are audit dimensions, count is the number of items, size is the size, and logTime
is milliseconds.

#### Example for Agent to Add Audit Data
```java
AuditOperator.getInstance().add(auditID, auditTag, inlongGroupID, inlongStreamID, logTime,
Expand All @@ -39,7 +77,6 @@ The scenario of supplementary recording of agent data, so the version number par
count, size, auditVersion);
```
The scenario of supplementary recording of DataProxy data, so the version number parameter needs to be passed in.

#### Example for Sort Flink to Add Audit Data
```java
AuditReporterImpl auditReporter=new AuditReporterImpl();
Expand All @@ -58,7 +95,6 @@ The scenario of supplementary recording of DataProxy data, so the version number
logTime, count, size, auditVersion)
```
In order to ensure the accuracy of auditing, each operator needs to create an auditAuditReporterImpl instance.

- Explain of AuditDimensions

| parameter | description |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import static org.apache.inlong.audit.entity.AuditType.POSTGRES;
import static org.apache.inlong.audit.entity.AuditType.SDK;
import static org.apache.inlong.audit.entity.AuditType.STARROCKS;
import static org.apache.inlong.audit.entity.AuditType.TUBE;
import static org.apache.inlong.audit.entity.AuditType.TUBEMQ;
import static org.apache.inlong.audit.entity.FlowType.INPUT;
import static org.apache.inlong.audit.entity.FlowType.OUTPUT;

Expand Down Expand Up @@ -88,11 +88,11 @@ public enum AuditIdEnum {
SORT_POSTGRES_INPUT(27, INPUT, POSTGRES, "Received Audit Metrics for Sort Postgres"),
SORT_POSTGRES_OUTPUT(28, OUTPUT, POSTGRES, "Sent Audit Metrics for Sort Postgres"),

SORT_BINLOG_INPUT(29, INPUT, BINLOG, "Received Audit Metrics for Sort Binlog"),
SORT_BINLOG_OUTPUT(30, OUTPUT, BINLOG, "Sent Audit Metrics for Sort Binlog"),
SORT_BINLOG_INPUT(35, INPUT, BINLOG, "Received Audit Metrics for Sort Binlog"),
SORT_BINLOG_OUTPUT(36, OUTPUT, BINLOG, "Sent Audit Metrics for Sort Binlog"),

SORT_TUBE_INPUT(33, INPUT, TUBE, "Received Audit Metrics for Sort Tube"),
SORT_TUBE_OUTPUT(34, OUTPUT, TUBE, "Sent Audit Metrics for Sort Tube"),
SORT_TUBE_INPUT(33, INPUT, TUBEMQ, "Received Audit Metrics for Sort TubeMQ"),
SORT_TUBE_OUTPUT(34, OUTPUT, TUBEMQ, "Sent Audit Metrics for Sort TubeMQ"),

SORT_MYSQL_INPUT(35, INPUT, MYSQL, "Received Audit Metrics for Sort MySQL"),
SORT_MYSQL_OUTPUT(36, OUTPUT, MYSQL, "Sent Audit Metrics for Sort MySQL"),
Expand Down
Loading
Loading