diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/README.md b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/README.md index d85a8632f32..bb490d87438 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/README.md +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/README.md @@ -39,3 +39,79 @@ chmod +x ./build.sh After the build process finished, you can import the package (`import inlong_dataproxy`) in your python project to use InLong dataproxy. > **Note**: When the C++ SDK or the version of Python you're using is updated, you'll need to rebuild it by re-executing the `build.sh` script + +## Config Parameters + +Refer to `demo/config_example.json`. + +| name | default value | description | +|:-------------------------|:-------------------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------| +| thread_num | 10 | number of network sending threads | +| inlong_group_ids | "" | the list of inlong_group_id, seperated by commas, such as "b_inlong_group_test_01, b_inlong_group_test_02" | +| enable_groupId_isolation | false | whether different groupid data using different buffer pools inside the sdk | +| buffer_num_per_groupId | 5 | number of buffer pools of each groupid | +| enable_pack | true | whether multiple messages are packed while sending to dataproxy | +| pack_size | 4096 | byte, pack messages and send to dataproxy when the data in buffer pool exceeds this value | +| ext_pack_size | 16384 | byte, maximum length of a message | +| enable_zip | true | whether zip data while sending to dataproxy | +| min_ziplen | 512 | byte, minimum zip len | +| enable_retry | true | whether do resend while failed to send data | +| retry_ms | 3000 | millisecond, resend interval | +| retry_num | 3 | maximum resend times | +| max_active_proxy | 3 | maximum number of established connections with dataproxy | +| max_buf_pool | 50 `*`1024`*` 1024 | byte, the size of buffer pool | +| log_num | 10 | maximum number of log files | +| log_size | 10 | MB, maximum size of one log file | +| log_level | 2 | log level: trace(4)>debug(3)>info(2)>warn(1)>error(0) | +| log_file_type | 2 | type of log output: 2->file, 1->console | +| log_path | ./logs/ | log path | +| proxy_update_interval | 10 | interval of requesting and updating dataproxy lists from manager | +| manager_url | "http://127.0.0.1:8083/inlong/manager/openapi/dataproxy/getIpList" | the url of manager openapi | +| need_auth | false | whether need authentication while interacting with manager | +| auth_id | "" | authenticate id if need authentication | +| auth_key | "" | authenticate key if need authentication | + +## Usage + +Follow these steps to use the DataProxy Python SDK: + +1. **Initialize the SDK**: Before using the SDK, you need to initialize it by calling `init_api(config_file)`. The `config_file` parameter is the path to your configuration file. It is recommended to use an absolute path. Please note that this function only needs to be called once per process. + + Example: + ```python + inlong_api = inlong_dataproxy.InLongApi() + inlong_api.init_api("path/to/your/config_file.json") + ``` + +2. **Send data**: To send data, use the `send(inlong_group_id, inlong_stream_id, msg, msg_len, call_back_func = null)` function. The parameters are as follows: + - `inlong_group_id`: The group ID associated with the data. + - `inlong_stream_id`: The stream ID associated with the data. + - `msg`: The data message to be sent. + - `msg_len`: The length of the data message. + - `call_back_func` (optional): A callback function that will be called if your data fails to send. + + Example: + ```python + inlong_api.send("your_inlong_group_id", "your_inlong_stream_id", "your_message", len("your_message"), call_back_func = your_callback_function) + ``` + +3. **Close the SDK**: Once you have no more data to send, close the SDK by calling `close_api(max_waitms)`. The `max_waitms` parameter is the maximum time interval (in milliseconds) to wait for data in memory to be sent. + + Example: + ```python + inlong_api.close_api(1000) + ``` + +4. **Function return values**: The functions mentioned above return 0 if they are successful, and a non-zero value indicates failure. Make sure to check the return values to ensure proper execution. + +## Demo + +You can refer to the `/demo/send_demo.py` file. To run this demo, you first need to ensure that the SDK has been built and installed properly. Then, follow these steps: + +1. Navigate to the `demo` directory in your terminal or command prompt. +2. Modify the configuration settings in `config_example.json` as needed to match your specific use case. +3. Execute the following command, replacing `[inlong_group_id]` and `[inlong_stream_id]` with the appropriate IDs: + + ```bash + python send_demo.py config_example.json [inlong_group_id] [inlong_stream_id] + ``` \ No newline at end of file diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/demo/config_example.json b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/demo/config_example.json new file mode 100644 index 00000000000..252c8808a33 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/demo/config_example.json @@ -0,0 +1,30 @@ +{ + "init-param": { + "inlong_group_ids": "test_pulsar_group", + "manager_url": "http://127.0.0.1:8083/inlong/manager/openapi/dataproxy/getIpList", + "manager_update_interval": 2, + "manager_url_timeout": 5, + "msg_type": 7, + "max_proxy_num": 8, + "per_groupid_thread_nums": 1, + "dispatch_interval_zip": 8, + "dispatch_interval_send": 10, + "recv_buf_size": 10240000, + "send_buf_size": 10240000, + "enable_pack": true, + "pack_size": 409600, + "pack_timeout": 3000, + "ext_pack_size": 409600, + "enable_zip": true, + "min_zip_len": 512, + "tcp_detection_interval": 60000, + "tcp_idle_time": 600000, + "log_num": 10, + "log_size": 104857600, + "log_level": 3, + "log_path": "./", + "need_auth": false, + "auth_id": "", + "auth_key": "" + } +} diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/demo/send_demo.py b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/demo/send_demo.py new file mode 100644 index 00000000000..e8c33bc23ae --- /dev/null +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-python/demo/send_demo.py @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys + +import inlong_dataproxy + +# user defined callback function +def callback_func(inlong_group_id, inlong_stream_id, msg, msg_len, report_time, ip): + print("******this is call back, print info******") + print("inlong_group_id:", inlong_group_id, ", inlong_stream_id:", inlong_stream_id) + print("msg_len:", msg_len, ", msg content:", msg) + print("report_time:", report_time, ", client ip:", ip) + print("******call back end******") + return 0 + +def main(): + if len(sys.argv) < 2: + print("USAGE: python send_demo.py config_example.json [inlong_group_id] [inlong_stream_id]") + return + + inlong_api = inlong_dataproxy.InLongApi() + + # step1. init api + init_status = inlong_api.init_api(sys.argv[1]) + if init_status: + print("init error, error code is: " + init_status) + return + + print("---->start sdk successfully") + + count = 5 + inlong_group_id = "test_pulsar_group" + inlong_stream_id = "test_pulsar_stream" + + if (len(sys.argv) == 4): + inlong_group_id = sys.argv[2] + inlong_stream_id = sys.argv[3] + + print("inlong_group_id:", inlong_group_id, ", inlong_stream_id:", inlong_stream_id) + + msg = "python sdk|1234" + + # step2. send message + print("---->start tc_api_send") + for i in range(count): + send_status = inlong_api.send(inlong_group_id, inlong_stream_id, msg, len(msg), callback_func) + if send_status: + print("tc_api_send error, error code is: " + send_status) + + # step3. close api + close_status = inlong_api.close_api(10000) + if close_status: + print("close sdk error, error code is: " + close_status) + else: + print("---->close sdk successfully") + +if __name__ == "__main__": + main() \ No newline at end of file