Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[p2pstore] fix memory leaking in cgo. #61

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 36 additions & 13 deletions mooncake-p2p-store/src/p2pstore/transfer_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,29 @@

package p2pstore

/*
* All memory pointed to by the "char *" parameters will not be used
* after the C function returns.
* This means that the caller can free the memory pointed to by "char *"
* parameters, after the call is completed.
* All the C functions used here follow this convention.
*/

//#cgo LDFLAGS: -L../../../build/mooncake-transfer-engine/src -L../../../thirdparties/lib -ltransfer_engine -lstdc++ -lnuma -lglog -libverbs -ljsoncpp -letcd-cpp-api -lprotobuf -lgrpc++ -lgrpc
//#include "../../../mooncake-transfer-engine/include/transfer_engine_c.h"
import "C"

import (
"unsafe"
"net"
"strconv"
"unsafe"
)

type BatchID int64

type TransferEngine struct {
engine C.transfer_engine_t
xport C.transport_t
xport C.transport_t
}

func parseServerName(serverName string) (host string, port int) {
Expand All @@ -45,35 +53,45 @@ func parseServerName(serverName string) (host string, port int) {
return host, port
}

const (
rdmaCStr = C.CString("rdma")
)

func NewTransferEngine(metadata_uri string, local_server_name string, nic_priority_matrix string) (*TransferEngine, error) {
// For simplifiy, local_server_name must be a valid IP address or hostname
connectable_name, rpc_port := parseServerName(local_server_name)

native_engine := C.createTransferEngine(C.CString(metadata_uri),
C.CString(local_server_name),
C.CString(connectable_name),
C.uint64_t(rpc_port))
metadataUri := C.CString(metadata_uri)
localServerName := C.CString(local_server_name)
connectableName := C.CString(connectable_name)
nicPriorityMatrix := C.CString(nic_priority_matrix)
defer C.free(unsafe.Pointer(metadataUri))
defer C.free(unsafe.Pointer(localServerName))
defer C.free(unsafe.Pointer(connectableName))
defer C.free(unsafe.Pointer(nicPriorityMatrix))

native_engine := C.createTransferEngine(metadataUri, localServerName, connectableName, C.uint64_t(rpc_port))
if native_engine == nil {
return nil, ErrTransferEngine
}

var args [2]unsafe.Pointer
args[0] = unsafe.Pointer(C.CString(nic_priority_matrix))
args[0] = unsafe.Pointer(nicPriorityMatrix)
args[1] = nil
xport := C.installTransport(native_engine, C.CString("rdma"), &args[0])
xport := C.installTransport(native_engine, rdmaCStr, &args[0])
if xport == nil {
C.destroyTransferEngine(native_engine)
return nil, ErrTransferEngine
}

return &TransferEngine{
engine: native_engine,
xport:xport,
xport: xport,
}, nil
}

func (engine *TransferEngine) Close() error {
ret := C.uninstallTransport(engine.engine, C.CString("rdma"))
ret := C.uninstallTransport(engine.engine, rdmaCStr)
if ret < 0 {
return ErrTransferEngine
}
Expand All @@ -83,7 +101,9 @@ func (engine *TransferEngine) Close() error {
}

func (engine *TransferEngine) registerLocalMemory(addr uintptr, length uint64, location string) error {
ret := C.registerLocalMemory(engine.engine, unsafe.Pointer(addr), C.size_t(length), C.CString(location), 1)
locationCStr := C.CString(location)
defer C.free(unsafe.Pointer(locationCStr))
ret := C.registerLocalMemory(engine.engine, unsafe.Pointer(addr), C.size_t(length), locationCStr, 1)
if ret < 0 {
return ErrTransferEngine
}
Expand Down Expand Up @@ -163,7 +183,10 @@ func (engine *TransferEngine) freeBatchID(batchID BatchID) error {
}

func (engine *TransferEngine) openSegment(name string) (int64, error) {
ret := C.openSegment(engine.engine, C.CString(name))
nameCStr := C.CString(name)
defer C.free(unsafe.Pointer(nameCStr))

ret := C.openSegment(engine.engine, nameCStr)
if ret < 0 {
return -1, ErrTransferEngine
}
Expand All @@ -184,4 +207,4 @@ func (engine *TransferEngine) syncSegmentCache() error {
return ErrTransferEngine
}
return nil
}
}
10 changes: 9 additions & 1 deletion mooncake-transfer-engine/include/transfer_engine_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ typedef struct segment_desc segment_desc_t;
typedef void *transfer_engine_t;
typedef void *transport_t;

/*
* All memory pointed to by the "char *" parameters will not be used
* after the C function returns.
* This means that the caller can free the memory pointed to by "char *"
* parameters, after the call is completed.
* All the C functions here follow this convention.
*/

transfer_engine_t createTransferEngine(const char *metadata_conn_string,
const char *local_server_name,
const char *ip_or_host_name,
Expand Down Expand Up @@ -129,4 +137,4 @@ int syncSegmentCache(transfer_engine_t engine);
}
#endif // __cplusplus

#endif // TRANSFER_ENGINE_C
#endif // TRANSFER_ENGINE_C