Skip to content

Commit

Permalink
优化部分接口的逻辑(涉及到connect, perform, assocate)
Browse files Browse the repository at this point in the history
1. iolayer_performs()当clone参数为空时,随机选择io线程
2. iolayer_connect()在网络线程中直接连接,不再投递
3. iolayer_associate()在网络线程中直接连接,不再投递

Signed-off-by: spriteray <[email protected]>
  • Loading branch information
spriteray committed Oct 26, 2020
1 parent c2b01e2 commit d08bfc5
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 77 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@

- 设置会话的超时时间 iolayer_set_timeout()
- 设置会话的保活时间 iolayer_set_keepalive()
- 设置会话的地址和端口 iolayer_set_endpoint()
- 设置会话的IO服务逻辑 iolayer_set_service()
- 设置会话的读事件常驻事件集 iolayer_set_persist()
- 设置会话的发送队列长度限制 iolayer_set_sndqlimit()
Expand Down
5 changes: 1 addition & 4 deletions examples/io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ void IIOSession::setSendqueueLimit( uint32_t limit )
void IIOSession::setEndpoint( const std::string & host, uint16_t port )
{
assert( m_Sid != 0 && m_Layer != NULL );

m_Host = host;
m_Port = port;
iolayer_set_endpoint( m_Layer, m_Sid, host.c_str(), port );
m_Host = host; m_Port = port;
}

int32_t IIOSession::send( const std::string & buffer )
Expand Down
5 changes: 2 additions & 3 deletions include/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ int32_t iolayer_associate( iolayer_t self,
// 建议在ioservice_t::onStart()中调用
int32_t iolayer_set_timeout( iolayer_t self, sid_t id, int32_t seconds );
int32_t iolayer_set_keepalive( iolayer_t self, sid_t id, int32_t seconds );
int32_t iolayer_set_endpoint( iolayer_t self, sid_t id, const char * host, uint16_t port );
int32_t iolayer_set_service( iolayer_t self, sid_t id, ioservice_t * service, void * context );
// 设置读事件常驻事件库( 默认为0; 激活后, 极端的情况下能提高IO性能40%左右 )
int32_t iolayer_set_persist( iolayer_t self, sid_t id, int32_t onoff );
Expand Down Expand Up @@ -207,9 +206,9 @@ int32_t iolayer_shutdowns( iolayer_t self, sid_t * ids, uint32_t count );
// recycle - 任务回收函数
int32_t iolayer_perform( iolayer_t self, sid_t id, int32_t type, void * task, taskrecycler_t recycle );

// 提交任务到网络层(广播所有网络线程)
// 提交任务到网络层
// task - 任务
// clone - 任务复制函数
// clone - 任务复制函数, 如果为NULL, 随机选择一个网络线程投递
// execute - 任务处理函数
int32_t iolayer_performs( iolayer_t self, void * task, taskcloner_t clone, taskexecutor_t execute );

Expand Down
120 changes: 62 additions & 58 deletions src/iolayer.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,21 @@ int32_t iolayer_connect( iolayer_t self, const char * host, uint16_t port, conne
connector->port = port;
connector->context = context;
connector->cb = callback;
connector->index = DISPATCH_POLICY( layer,
__sync_fetch_and_add(&layer->roundrobin, 1) );
connector->host = strdup( host );

iothreads_post( layer->threads, connector->index, eIOTaskType_Connect, connector, 0 );
// 就地投递给本网络线程
int8_t index = iothreads_get_index( layer->threads );
if ( index >= 0 )
{
connector->index = index;
_connect_direct( iothreads_get_sets(layer->threads, index), connector );
}
else
{
connector->index = DISPATCH_POLICY(
layer, __sync_fetch_and_add(&layer->roundrobin, 1) );
iothreads_post( layer->threads, connector->index, eIOTaskType_Connect, connector, 0 );
}

return 0;
}
Expand Down Expand Up @@ -281,10 +291,22 @@ int32_t iolayer_associate( iolayer_t self, int32_t fd, void * privdata, reattach
associater->context = context;
associater->parent = layer;
associater->privdata = privdata;
associater->index = DISPATCH_POLICY( layer,
__sync_fetch_and_add(&layer->roundrobin, 1) );
// 提交到网络层
iothreads_post( layer->threads, associater->index, eIOTaskType_Associate, associater, 0 );

// 就地投递给本网络线程
int8_t index = iothreads_get_index( layer->threads );
if ( index >= 0 )
{
associater->index = index;
_associate_direct( iothreads_get_sets(layer->threads, index), associater );
}
else
{
// 随机找一个io线程
associater->index = DISPATCH_POLICY(
layer, __sync_fetch_and_add(&layer->roundrobin, 1) );
// 提交到网络层
iothreads_post( layer->threads, associater->index, eIOTaskType_Associate, associater, 0 );
}

return 0;
}
Expand Down Expand Up @@ -466,43 +488,6 @@ int32_t iolayer_set_keepalive( iolayer_t self, sid_t id, int32_t seconds )
return 0;
}

int32_t iolayer_set_endpoint( iolayer_t self, sid_t id, const char * host, uint16_t port )
{
// NOT Thread-Safe
uint8_t index = SID_INDEX(id);
struct iolayer * layer = (struct iolayer *)self;

// 参数检查
assert( layer != NULL && "Illegal IOLayer" );
assert( layer->threads != NULL && "Illegal IOThreadGroup" );
assert( "iolayer_set_endpoint() must be in the specified thread"
&& pthread_equal(iothreads_get_id(layer->threads, index), pthread_self()) != 0 );

if ( index >= layer->nthreads )
{
syslog(LOG_WARNING, "%s(SID=%ld) failed, the Session's index[%u] is invalid .", __FUNCTION__, id, index );
return -1;
}

struct session_manager * manager = _get_manager( layer, index );
if ( manager == NULL )
{
syslog(LOG_WARNING, "%s(SID=%ld) failed, the Session's manager[%u] is invalid .", __FUNCTION__, id, index );
return -2;
}

struct session * session = session_manager_get( manager, id );
if ( session == NULL )
{
syslog(LOG_WARNING, "%s(SID=%ld) failed, the Session is invalid .", __FUNCTION__, id );
return -3;
}

session_copy_endpoint( session, host, port );

return 0;
}

int32_t iolayer_set_service( iolayer_t self, sid_t id, ioservice_t * service, void * context )
{
// NOT Thread-Safe
Expand Down Expand Up @@ -652,31 +637,50 @@ int32_t iolayer_perform( iolayer_t self, sid_t id, int32_t type, void * task, ta

int32_t iolayer_performs( iolayer_t self, void * task, taskcloner_t clone, taskexecutor_t execute )
{
uint8_t i = 0;
assert( self != NULL && "Illegal IOLayer" );
assert( execute != NULL && "Illegal specified Execute-Function" );

uint8_t i = 0;
pthread_t threadid = pthread_self();
struct task_performs tasklist[ 256 ]; // 栈中分配更快
struct iolayer * layer = (struct iolayer *)self;

// clone task
for ( i = 0; i < layer->nthreads; ++i )
if ( clone == NULL )
{
tasklist[i].clone = clone;
tasklist[i].perform = execute;
tasklist[i].task = i == 0 ? task : clone( task );
}
uint8_t index = milliseconds() % layer->nthreads;
struct task_performs inner_task = { task, execute };

for ( i = 0; i < layer->nthreads; ++i )
{
if ( threadid == iothreads_get_id( layer->threads, i ) )
if ( threadid == iothreads_get_id( layer->threads, index ) )
{
// 本线程内直接广播
_performs_direct( layer, i, &(tasklist[i]) );
// 本线程内直接执行
_performs_direct( layer, index, &inner_task );
}
else
{
// 跨线程提交广播任务
iothreads_post( layer->threads, i, eIOTaskType_Performs, &(tasklist[i]), sizeof(struct task_performs) );
// 跨线程提交执行任务
iothreads_post( layer->threads, index, eIOTaskType_Performs, &inner_task, sizeof(struct task_performs) );
}
}
else
{
for ( i = 0; i < layer->nthreads; ++i )
{
tasklist[i].perform = execute;
tasklist[i].task = i == 0 ? task : clone( task );
}

for ( i = 0; i < layer->nthreads; ++i )
{
if ( threadid == iothreads_get_id( layer->threads, i ) )
{
// 本线程内直接广播
_performs_direct( layer, i, &(tasklist[i]) );
}
else
{
// 跨线程提交广播任务
iothreads_post( layer->threads, i, eIOTaskType_Performs, &(tasklist[i]), sizeof(struct task_performs) );
}
}
}

Expand Down
1 change: 0 additions & 1 deletion src/iolayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ struct task_perform
struct task_performs
{
void * task;
taskcloner_t clone;
taskexecutor_t perform;
};

Expand Down
17 changes: 7 additions & 10 deletions src/session.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,22 +136,20 @@ int32_t _del_session( struct session * self )
// 会话停止(删除网络事件以及关闭描述符)
void _stop( struct session * self )
{
evsets_t sets = self->evsets;

// 删除网络事件
if ( self->status&SESSION_READING )
{
evsets_del( sets, self->evread );
evsets_del( self->evsets, self->evread );
self->status &= ~SESSION_READING;
}
if ( self->status&SESSION_WRITING )
{
evsets_del( sets, self->evwrite );
evsets_del( self->evsets, self->evwrite );
self->status &= ~SESSION_WRITING;
}
if ( self->status&SESSION_KEEPALIVING )
{
evsets_del( sets, self->evkeepalive );
evsets_del( self->evsets, self->evkeepalive );
self->status &= ~SESSION_KEEPALIVING;
}

Expand Down Expand Up @@ -410,7 +408,6 @@ ssize_t session_sendmessage( struct session * self, struct message * message )
void session_add_event( struct session * self, int16_t ev )
{
int8_t status = self->status;
evsets_t sets = self->evsets;

// 注册读事件
// 不在等待读事件的正常会话
Expand All @@ -419,8 +416,8 @@ void session_add_event( struct session * self, int16_t ev )
{
event_set( self->evread, self->fd, ev|self->setting.persist_mode );
event_set_callback( self->evread, channel_on_read, self );
evsets_add( sets, self->evread, self->setting.timeout_msecs );

evsets_add( self->evsets, self->evread, self->setting.timeout_msecs );
// 修改读状态
self->status |= SESSION_READING;
}

Expand All @@ -439,8 +436,8 @@ void session_add_event( struct session * self, int16_t ev )

event_set( self->evwrite, self->fd, ev );
event_set_callback( self->evwrite, channel_on_write, self );
evsets_add( sets, self->evwrite, wait_for_shutdown );

evsets_add( self->evsets, self->evwrite, wait_for_shutdown );
// 修改写状态
self->status |= SESSION_WRITING;
}
}
Expand Down
1 change: 1 addition & 0 deletions src/threads-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ struct iothreads
pthread_mutex_t lock;
};

int8_t iothreads_get_index( iothreads_t self );
struct acceptorlist * iothreads_get_acceptlist( iothreads_t self, uint8_t index );

#endif
17 changes: 17 additions & 0 deletions src/threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,23 @@ struct acceptorlist * iothreads_get_acceptlist( iothreads_t self, uint8_t index
return &(iothreads->threads[index].acceptorlist);
}

int8_t iothreads_get_index( iothreads_t self )
{
int8_t i = 0;
pthread_t tid = pthread_self();
struct iothreads * iothreads = (struct iothreads *)(self);

for ( i = 0; i < iothreads->nthreads; ++i )
{
if ( iothreads->threads[i].id == tid )
{
return i;
}
}

return -1;
}

pthread_t iothreads_get_id( iothreads_t self, uint8_t index )
{
struct iothreads * iothreads = (struct iothreads *)(self);
Expand Down

0 comments on commit d08bfc5

Please sign in to comment.