Skip to content

Commit

Permalink
[INLONG-10532][SDK] Add dataproxy python SDK sample and the correspon…
Browse files Browse the repository at this point in the history
…ding document (#10546)

Co-authored-by: jameswyli <[email protected]>
  • Loading branch information
yfsn666 and jameswyli authored Jul 2, 2024
1 parent 8de559e commit bda8f4c
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 0 deletions.
76 changes: 76 additions & 0 deletions inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,79 @@ chmod +x ./build.sh
After the build process finished, you can import the package (`import inlong_dataproxy`) in your python project to use InLong dataproxy.

> **Note**: When the C++ SDK or the version of Python you're using is updated, you'll need to rebuild it by re-executing the `build.sh` script
## Config Parameters

Refer to `demo/config_example.json`.

| name | default value | description |
|:-------------------------|:-------------------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------|
| thread_num | 10 | number of network sending threads |
| inlong_group_ids | "" | the list of inlong_group_id, seperated by commas, such as "b_inlong_group_test_01, b_inlong_group_test_02" |
| enable_groupId_isolation | false | whether different groupid data using different buffer pools inside the sdk |
| buffer_num_per_groupId | 5 | number of buffer pools of each groupid |
| enable_pack | true | whether multiple messages are packed while sending to dataproxy |
| pack_size | 4096 | byte, pack messages and send to dataproxy when the data in buffer pool exceeds this value |
| ext_pack_size | 16384 | byte, maximum length of a message |
| enable_zip | true | whether zip data while sending to dataproxy |
| min_ziplen | 512 | byte, minimum zip len |
| enable_retry | true | whether do resend while failed to send data |
| retry_ms | 3000 | millisecond, resend interval |
| retry_num | 3 | maximum resend times |
| max_active_proxy | 3 | maximum number of established connections with dataproxy |
| max_buf_pool | 50 `*`1024`*` 1024 | byte, the size of buffer pool |
| log_num | 10 | maximum number of log files |
| log_size | 10 | MB, maximum size of one log file |
| log_level | 2 | log level: trace(4)>debug(3)>info(2)>warn(1)>error(0) |
| log_file_type | 2 | type of log output: 2->file, 1->console |
| log_path | ./logs/ | log path |
| proxy_update_interval | 10 | interval of requesting and updating dataproxy lists from manager |
| manager_url | "http://127.0.0.1:8083/inlong/manager/openapi/dataproxy/getIpList" | the url of manager openapi |
| need_auth | false | whether need authentication while interacting with manager |
| auth_id | "" | authenticate id if need authentication |
| auth_key | "" | authenticate key if need authentication |

## Usage

Follow these steps to use the DataProxy Python SDK:

1. **Initialize the SDK**: Before using the SDK, you need to initialize it by calling `init_api(config_file)`. The `config_file` parameter is the path to your configuration file. It is recommended to use an absolute path. Please note that this function only needs to be called once per process.

Example:
```python
inlong_api = inlong_dataproxy.InLongApi()
inlong_api.init_api("path/to/your/config_file.json")
```

2. **Send data**: To send data, use the `send(inlong_group_id, inlong_stream_id, msg, msg_len, call_back_func = null)` function. The parameters are as follows:
- `inlong_group_id`: The group ID associated with the data.
- `inlong_stream_id`: The stream ID associated with the data.
- `msg`: The data message to be sent.
- `msg_len`: The length of the data message.
- `call_back_func` (optional): A callback function that will be called if your data fails to send.

Example:
```python
inlong_api.send("your_inlong_group_id", "your_inlong_stream_id", "your_message", len("your_message"), call_back_func = your_callback_function)
```

3. **Close the SDK**: Once you have no more data to send, close the SDK by calling `close_api(max_waitms)`. The `max_waitms` parameter is the maximum time interval (in milliseconds) to wait for data in memory to be sent.

Example:
```python
inlong_api.close_api(1000)
```

4. **Function return values**: The functions mentioned above return 0 if they are successful, and a non-zero value indicates failure. Make sure to check the return values to ensure proper execution.

## Demo

You can refer to the `/demo/send_demo.py` file. To run this demo, you first need to ensure that the SDK has been built and installed properly. Then, follow these steps:

1. Navigate to the `demo` directory in your terminal or command prompt.
2. Modify the configuration settings in `config_example.json` as needed to match your specific use case.
3. Execute the following command, replacing `[inlong_group_id]` and `[inlong_stream_id]` with the appropriate IDs:

```bash
python send_demo.py config_example.json [inlong_group_id] [inlong_stream_id]
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"init-param": {
"inlong_group_ids": "test_pulsar_group",
"manager_url": "http://127.0.0.1:8083/inlong/manager/openapi/dataproxy/getIpList",
"manager_update_interval": 2,
"manager_url_timeout": 5,
"msg_type": 7,
"max_proxy_num": 8,
"per_groupid_thread_nums": 1,
"dispatch_interval_zip": 8,
"dispatch_interval_send": 10,
"recv_buf_size": 10240000,
"send_buf_size": 10240000,
"enable_pack": true,
"pack_size": 409600,
"pack_timeout": 3000,
"ext_pack_size": 409600,
"enable_zip": true,
"min_zip_len": 512,
"tcp_detection_interval": 60000,
"tcp_idle_time": 600000,
"log_num": 10,
"log_size": 104857600,
"log_level": 3,
"log_path": "./",
"need_auth": false,
"auth_id": "",
"auth_key": ""
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

import sys

import inlong_dataproxy

# user defined callback function
def callback_func(inlong_group_id, inlong_stream_id, msg, msg_len, report_time, ip):
print("******this is call back, print info******")
print("inlong_group_id:", inlong_group_id, ", inlong_stream_id:", inlong_stream_id)
print("msg_len:", msg_len, ", msg content:", msg)
print("report_time:", report_time, ", client ip:", ip)
print("******call back end******")
return 0

def main():
if len(sys.argv) < 2:
print("USAGE: python send_demo.py config_example.json [inlong_group_id] [inlong_stream_id]")
return

inlong_api = inlong_dataproxy.InLongApi()

# step1. init api
init_status = inlong_api.init_api(sys.argv[1])
if init_status:
print("init error, error code is: " + init_status)
return

print("---->start sdk successfully")

count = 5
inlong_group_id = "test_pulsar_group"
inlong_stream_id = "test_pulsar_stream"

if (len(sys.argv) == 4):
inlong_group_id = sys.argv[2]
inlong_stream_id = sys.argv[3]

print("inlong_group_id:", inlong_group_id, ", inlong_stream_id:", inlong_stream_id)

msg = "python sdk|1234"

# step2. send message
print("---->start tc_api_send")
for i in range(count):
send_status = inlong_api.send(inlong_group_id, inlong_stream_id, msg, len(msg), callback_func)
if send_status:
print("tc_api_send error, error code is: " + send_status)

# step3. close api
close_status = inlong_api.close_api(10000)
if close_status:
print("close sdk error, error code is: " + close_status)
else:
print("---->close sdk successfully")

if __name__ == "__main__":
main()

0 comments on commit bda8f4c

Please sign in to comment.