Skip to content

Commit

Permalink
iolayer_performs() -> iolayer_invoke()
Browse files Browse the repository at this point in the history
Signed-off-by: spriteray <[email protected]>
  • Loading branch information
spriteray committed Oct 26, 2020
1 parent d08bfc5 commit b59ab1e
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 125 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
OS = $(shell uname)

APP = libevlite
VERSION = 9.8.7
VERSION = 9.8.8
PREFIX = /usr/local

# 主版本号
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
- 停止对外提供接入服务, 不再接受新的连接;
- 停止所有连接的接收服务, 不再回调ioservice_t::process()

- ##### 提交任务到网络层 iolayer_perform(), iolayer_performs()
- ##### 提交任务到网络层 iolayer_invoke(), iolayer_perform()

- ##### 销毁网络层 iolayer_destroy()

10 changes: 5 additions & 5 deletions examples/io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,11 @@ int32_t IIOService::broadcast( const sids_t & ids, const char * buffer, size_t n
return iolayer_broadcast( m_IOLayer, const_cast<sid_t *>( &(*start) ), count, buffer, nbytes );
}

int32_t IIOService::invoke( void * task, taskcloner_t clone, taskexecutor_t execute )
{
return iolayer_invoke( m_IOLayer, task, clone, execute );
}

int32_t IIOService::perform( sid_t sid, int32_t type, void * task, taskrecycler_t recycle )
{
int32_t rc = iolayer_perform(
Expand All @@ -370,11 +375,6 @@ int32_t IIOService::perform( sid_t sid, int32_t type, void * task, taskrecycler_
return rc;
}

int32_t IIOService::perform( void * task, taskcloner_t clone, taskexecutor_t perform )
{
return iolayer_performs( m_IOLayer, task, clone, perform );
}

int32_t IIOService::shutdown( sid_t id )
{
return iolayer_shutdown( m_IOLayer, id );
Expand Down
5 changes: 2 additions & 3 deletions examples/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,8 @@ public :
int32_t shutdown( const sids_t & ids );

// 提交任务到网络层
int32_t perform( sid_t sid,
int32_t type, void * task, taskrecycler_t recycle );
int32_t perform( void * task, taskcloner_t clone, taskexecutor_t perform );
int32_t invoke( void * task, taskcloner_t clone, taskexecutor_t execute );
int32_t perform( sid_t sid, int32_t type, void * task, taskrecycler_t recycle );

private :
// 监听上下文
Expand Down
128 changes: 62 additions & 66 deletions include/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,57 +44,6 @@ extern "C"
typedef uint64_t sid_t;
typedef void * iolayer_t;

// 任务回收函数
// 参数1: 类型
// 参数2: 任务
typedef void (*taskrecycler_t)( int32_t, void * );

// 任务克隆函数
// 参数1: 任务
typedef void * (*taskcloner_t)( void * );

// 任务处理函数
// 参数1: 网络线程上下文参数
// 参数2: 任务
typedef void (*taskexecutor_t)( void *, void * );

// 重新关联函数,返回新的描述符
// 参数1: 上次关联的描述符
// 参数2: 描述符相关的私有数据
typedef int32_t (*reattacher_t)( int32_t, void * );

// 数据改造方法(不建议原地改造)
// 参数1: 上下文参数
// 参数2: 欲发送或者广播的消息内容
// 参数3: 指向消息长度的指针, 返回改造后的数据包长度
typedef char * (*transformer_t)( void *, const char * , size_t * );

// 新会话创建成功后的回调
// 参数1: 上下文参数
// 参数2: 网络线程上下文参数
// 参数3: 新会话ID
// 参数4: 会话的IP地址
// 参数5: 会话的端口号
typedef int32_t (*acceptor_t)( void *, void *, sid_t, const char * , uint16_t );

// 连接结果的回调
// 参数1: 上下文参数
// 参数2: 网络线程上下文参数
// 参数3: 连接结果
// 参数4: 连接的远程服务器的地址
// 参数5: 连接的远程服务器的端口
// 参数6: 连接成功后返回的会话ID
typedef int32_t (*connector_t)( void *, void *, int32_t, const char *, uint16_t, sid_t );

// 关联成功后的回调
// 参数1: 上下文参数
// 参数2: 网络线程上下文参数
// 参数3: 关联结果
// 参数3: 描述符
// 参数4: 描述符相关私有数据
// 参数5: 会话ID
typedef int32_t (*associator_t)( void *, void *, int32_t, int32_t, void *, sid_t );

// IO服务
// start() - 网络就绪的回调
// process() - 收到数据包的回调
Expand Down Expand Up @@ -141,32 +90,67 @@ iolayer_t iolayer_create( uint8_t nthreads, uint32_t nclients, uint8_t immediate
// 确保长度和网络线程个数相等, 毕竟多个网络线程是对等的
int32_t iolayer_set_iocontext( iolayer_t self, void ** contexts, uint8_t count );

// 网络层设置数据包改造方法, 该网络层的统一的数据包改造方法(在listen(), connect(), associate()之前调用)
// 数据改造方法(不建议原地改造)
// 参数1: 上下文参数
// 参数2: 欲发送或者广播的消息内容
// 参数3: 指向消息长度的指针, 返回改造后的数据包长度
typedef char * (*transformer_t)( void *, const char * , size_t * );
// 网络层设置数据包改造方法,
// 该网络层的统一的数据包改造方法(在listen(), connect(), associate()之前调用)
// self -
// transform - 数据包改造方法(不建议原地改造)
// context - 上下文参数
int32_t iolayer_set_transform( iolayer_t self, transformer_t transform, void * context );

// 服务器开启
// 新会话创建成功后的回调
// 参数1: 上下文参数
// 参数2: 网络线程上下文参数
// 参数3: 新会话ID
// 参数4: 会话的IP地址
// 参数5: 会话的端口号
typedef int32_t (*acceptor_t)( void *, void *, sid_t, const char * , uint16_t );
// 开启服务端
// type - 网络类型: NETWORK_TCP or NETWORK_KCP
// host - 绑定的地址
// port - 监听的端口号
// callback - 新会话创建成功后的回调,会被多个网络线程调用
// callback - 新会话创建成功后的回调(参考acceptor_t的定义),会被多个网络线程调用
// context - 上下文参数
int32_t iolayer_listen( iolayer_t self,
const char * host, uint16_t port, acceptor_t callback, void * context );

// 客户端开启
// 连接结果的回调
// 参数1: 上下文参数
// 参数2: 网络线程上下文参数
// 参数3: 连接结果
// 参数4: 连接的远程服务器的地址
// 参数5: 连接的远程服务器的端口
// 参数6: 连接成功后返回的会话ID
typedef int32_t (*connector_t)( void *, void *, int32_t, const char *, uint16_t, sid_t );
// 开启客户端
// host - 远程服务器的地址
// port - 远程服务器的端口
// callback - 连接结果的回调
// callback - 连接结果的回调(参考connector_t的定义)
// context - 上下文参数
int32_t iolayer_connect( iolayer_t self,
const char * host, uint16_t port, connector_t callback, void * context );

// 重新关联函数,返回新的描述符
// 参数1: 上次关联的描述符
// 参数2: 描述符相关的私有数据
typedef int32_t (*reattacher_t)( int32_t, void * );
// 关联成功后的回调
// 参数1: 上下文参数
// 参数2: 网络线程上下文参数
// 参数3: 关联结果
// 参数3: 描述符
// 参数4: 描述符相关私有数据
// 参数5: 会话ID
typedef int32_t (*associator_t)( void *, void *, int32_t, int32_t, void *, sid_t );
// 描述符关联会话ID
// fd - 描述符
// privdata - 描述符相关的私有数据
// reattach - 重新关联函数,返回新的描述符
// callback - 关联成功后的回调
// reattach - 重新关联函数(参考reattacher_t定义),返回新的描述符
// callback - 关联成功后的回调(参考associator_t的定义)
// context - 上下文参数
int32_t iolayer_associate( iolayer_t self,
int32_t fd, void * privdata, reattacher_t reattach, associator_t callback, void * context );
Expand Down Expand Up @@ -199,19 +183,31 @@ int32_t iolayer_broadcast2( iolayer_t self, const char * buf, size_t nbytes );
int32_t iolayer_shutdown( iolayer_t self, sid_t id );
int32_t iolayer_shutdowns( iolayer_t self, sid_t * ids, uint32_t count );

// 提交任务到网络层(会话ID所在网络线程)
// 任务复制函数
// 参数: 任务
typedef void * (*taskcloner_t)( void * );
// 任务处理函数
// 参数1: 网络线程上下文参数
// 参数2: 任务
typedef void (*taskexecutor_t)( void *, void * );
// 提交任务到网络层
// task - 任务
// clone - 任务复制函数(参考taskcloner_t的定义), 如果为NULL, 随机选择一个网络线程投递
// execute - 任务处理函数(参考taskexecutor_t的定义)
int32_t iolayer_invoke( iolayer_t self, void * task, taskcloner_t clone, taskexecutor_t execute );

// 任务回收函数
// 参数1: 类型
// 参数2: 任务
typedef void (*taskrecycler_t)( int32_t, void * );
// 分派任务给指定的会话
// 通过ioservice_t::perform()回调执行该任务
// id - 会话ID
// type - 任务类型
// task - 任务数据
// recycle - 任务回收函数
// recycle - 任务回收函数(参考taskrecycler_t的定义)
int32_t iolayer_perform( iolayer_t self, sid_t id, int32_t type, void * task, taskrecycler_t recycle );

// 提交任务到网络层
// task - 任务
// clone - 任务复制函数, 如果为NULL, 随机选择一个网络线程投递
// execute - 任务处理函数
int32_t iolayer_performs( iolayer_t self, void * task, taskcloner_t clone, taskexecutor_t execute );

// 停止网络服务
// 行为定义:
// 1. 停止对外提供接入服务, 不再接受新的连接;
Expand Down
4 changes: 2 additions & 2 deletions libevlite.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@
isa = XCBuildConfiguration;
buildSettings = {
DYLIB_COMPATIBILITY_VERSION = 9;
DYLIB_CURRENT_VERSION = 9.8.7;
DYLIB_CURRENT_VERSION = 9.8.8;
EXECUTABLE_PREFIX = "";
GCC_PREPROCESSOR_DEFINITIONS = (
"DEBUG=1",
Expand All @@ -635,7 +635,7 @@
isa = XCBuildConfiguration;
buildSettings = {
DYLIB_COMPATIBILITY_VERSION = 9;
DYLIB_CURRENT_VERSION = 9.8.7;
DYLIB_CURRENT_VERSION = 9.8.8;
EXECUTABLE_PREFIX = "";
PRODUCT_NAME = "$(TARGET_NAME)";
};
Expand Down
78 changes: 39 additions & 39 deletions src/iolayer.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ static int32_t _assign_direct( struct iolayer * self, uint8_t index, evsets_t se
static ssize_t _send_direct( struct iolayer * self, struct session_manager * manager, struct task_send * task );
static int32_t _broadcast_direct( struct iolayer * self, uint8_t index, struct session_manager * manager, struct message * msg );
static int32_t _broadcast2_direct( struct iolayer * self, struct session_manager * manager, struct message * msg );
static void _invoke_direct( struct iolayer * self, uint8_t index, struct task_invoke * task );
static int32_t _perform_direct( struct iolayer * self, struct session_manager * manager, struct task_perform * task );
static void _performs_direct( struct iolayer * self, uint8_t index, struct task_performs * task );
static int32_t _shutdown_direct( struct session_manager * manager, sid_t id );
static int32_t _shutdowns_direct( uint8_t index, struct session_manager * manager, struct sidlist * ids );

Expand Down Expand Up @@ -613,52 +613,30 @@ int32_t iolayer_broadcast2( iolayer_t self, const char * buf, size_t nbytes )
return 0;
}

int32_t iolayer_perform( iolayer_t self, sid_t id, int32_t type, void * task, taskrecycler_t recycle )
{
uint8_t index = SID_INDEX(id);
struct iolayer * layer = (struct iolayer *)self;

if ( unlikely(index >= layer->nthreads) )
{
syslog(LOG_WARNING, "%s(SID=%ld) failed, the Session's index[%u] is invalid .", __FUNCTION__, id, index );
return -1;
}

struct task_perform ptask = { id, type, task, recycle };

if ( pthread_self() == iothreads_get_id( layer->threads, index ) )
{
return _perform_direct( layer, _get_manager(layer, index), &ptask );
}

// 跨线程提交发送任务
return iothreads_post( layer->threads, index, eIOTaskType_Perform, (void *)&ptask, sizeof(ptask) );
}

int32_t iolayer_performs( iolayer_t self, void * task, taskcloner_t clone, taskexecutor_t execute )
int32_t iolayer_invoke( iolayer_t self, void * task, taskcloner_t clone, taskexecutor_t execute )
{
assert( self != NULL && "Illegal IOLayer" );
assert( execute != NULL && "Illegal specified Execute-Function" );

uint8_t i = 0;
pthread_t threadid = pthread_self();
struct task_performs tasklist[ 256 ]; // 栈中分配更快
struct task_invoke tasklist[ 256 ]; // 栈中分配更快
struct iolayer * layer = (struct iolayer *)self;

if ( clone == NULL )
{
uint8_t index = milliseconds() % layer->nthreads;
struct task_performs inner_task = { task, execute };
struct task_invoke inner_task = { task, execute };

if ( threadid == iothreads_get_id( layer->threads, index ) )
{
// 本线程内直接执行
_performs_direct( layer, index, &inner_task );
_invoke_direct( layer, index, &inner_task );
}
else
{
// 跨线程提交执行任务
iothreads_post( layer->threads, index, eIOTaskType_Performs, &inner_task, sizeof(struct task_performs) );
iothreads_post( layer->threads, index, eIOTaskType_Invoke, &inner_task, sizeof(struct task_invoke) );
}
}
else
Expand All @@ -674,19 +652,41 @@ int32_t iolayer_performs( iolayer_t self, void * task, taskcloner_t clone, taske
if ( threadid == iothreads_get_id( layer->threads, i ) )
{
// 本线程内直接广播
_performs_direct( layer, i, &(tasklist[i]) );
_invoke_direct( layer, i, &(tasklist[i]) );
}
else
{
// 跨线程提交广播任务
iothreads_post( layer->threads, i, eIOTaskType_Performs, &(tasklist[i]), sizeof(struct task_performs) );
iothreads_post( layer->threads, i, eIOTaskType_Invoke, &(tasklist[i]), sizeof(struct task_invoke) );
}
}
}

return 0;
}

int32_t iolayer_perform( iolayer_t self, sid_t id, int32_t type, void * task, taskrecycler_t recycle )
{
uint8_t index = SID_INDEX(id);
struct iolayer * layer = (struct iolayer *)self;

if ( unlikely(index >= layer->nthreads) )
{
syslog(LOG_WARNING, "%s(SID=%ld) failed, the Session's index[%u] is invalid .", __FUNCTION__, id, index );
return -1;
}

struct task_perform ptask = { id, type, task, recycle };

if ( pthread_self() == iothreads_get_id( layer->threads, index ) )
{
return _perform_direct( layer, _get_manager(layer, index), &ptask );
}

// 跨线程提交发送任务
return iothreads_post( layer->threads, index, eIOTaskType_Perform, (void *)&ptask, sizeof(ptask) );
}

int32_t iolayer_shutdown( iolayer_t self, sid_t id )
{
uint8_t index = SID_INDEX(id);
Expand Down Expand Up @@ -1251,6 +1251,11 @@ int32_t _broadcast2_direct( struct iolayer * self, struct session_manager * mana
return count;
}

void _invoke_direct( struct iolayer * self, uint8_t index, struct task_invoke * task )
{
task->perform( iothreads_get_context( self->threads, index ), task->task );
}

int32_t _perform_direct( struct iolayer * self, struct session_manager * manager, struct task_perform * task )
{
int32_t rc = 0;
Expand All @@ -1277,11 +1282,6 @@ int32_t _perform_direct( struct iolayer * self, struct session_manager * manager
return rc;
}

void _performs_direct( struct iolayer * self, uint8_t index, struct task_performs * task )
{
task->perform( iothreads_get_context( self->threads, index ), task->task );
}

int32_t _shutdown_direct( struct session_manager * manager, sid_t id )
{
struct session * session = session_manager_get( manager, id );
Expand Down Expand Up @@ -1385,13 +1385,13 @@ void _concrete_processor( void * context, uint8_t index, int16_t type, void * ta
break;

// 逻辑任务
case eIOTaskType_Perform :
_perform_direct( layer, manager, (struct task_perform *)task );
case eIOTaskType_Invoke :
_invoke_direct( layer, index, (struct task_invoke *)task );
break;

// 逻辑任务
case eIOTaskType_Performs :
_performs_direct( layer, index, (struct task_performs *)task );
case eIOTaskType_Perform :
_perform_direct( layer, manager, (struct task_perform *)task );
break;
}
}
Loading

0 comments on commit b59ab1e

Please sign in to comment.