Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reader side connection close race #2074

Merged
merged 1 commit into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions source/adios2/toolkit/sst/cp/cp_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ struct _SstStream
long DiscardPriorTimestep; /* timesteps numerically less than this will be
discarded with prejudice */
long LastDPNotifiedTimestep;
int FailureContactRank;

/* reader side marshal info */
FFSContext ReaderFFSContext;
Expand Down
37 changes: 26 additions & 11 deletions source/adios2/toolkit/sst/cp/cp_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "cp_internal.h"

#define gettid() pthread_self()
#define MUTEX_DEBUG
#ifdef MUTEX_DEBUG
#define STREAM_MUTEX_LOCK(Stream) \
{ \
Expand Down Expand Up @@ -241,16 +240,31 @@ extern void ReaderConnCloseHandler(CManager cm, CMConnection ClosedConn,

if (Stream->Status == Established)
{
/*
* tag our reader instance as failed.
* If any instance is failed, we should remove all, but that requires a
* global operation, so prep.
*/
CP_verbose(Stream, "Reader-side Rank received a "
"connection-close event during normal "
"operations, peer likely failed\n");
Stream->Status = PeerFailed;
STREAM_CONDITION_SIGNAL(Stream);
if ((Stream->WriterConfigParams->CPCommPattern == SstCPCommMin) &&
(Stream->Rank != 0))
{
CP_verbose(Stream, "Reader-side Rank received a "
"connection-close event during normal "
"operations, but might be part of shutdown "
"Don't change stream status.\n");
/* if this happens and *is* a failure, we'll get the status from
* rank 0 later */
}
else
{
/*
* tag our reader instance as failed, IFF this came from someone we
* should have gotten a CLOSE from. I.E. a reverse peer
*/
CP_verbose(Stream, "Reader-side Rank received a "
"connection-close event during normal "
"operations, peer likely failed\n");
if (FailedPeerRank == Stream->FailureContactRank)
{
Stream->Status = PeerFailed;
STREAM_CONDITION_SIGNAL(Stream);
}
}
CP_verbose(
Stream,
"The close was for connection to writer peer %d, notifying DP\n",
Expand Down Expand Up @@ -730,6 +744,7 @@ extern void CP_PeerSetupHandler(CManager cm, CMConnection conn, void *Msg_v,
{
Stream->ConnectionsToWriter[Msg->WriterRank].CMconn = conn;
CMConnection_add_reference(conn);
Stream->FailureContactRank == Msg->WriterRank;
}
CMconn_register_close_handler(conn, ReaderConnCloseHandler, (void *)Stream);
STREAM_CONDITION_SIGNAL(Stream);
Expand Down
83 changes: 65 additions & 18 deletions source/adios2/toolkit/sst/dp/evpath_dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ static void SendSpeculativePreloadMsgs(CP_Services Svcs,
Evpath_WSR_Stream WSR_Stream,
TimestepList TS);

// reader-side routine, called by the main thread
static DP_RS_Stream EvpathInitReader(CP_Services Svcs, void *CP_Stream,
void **ReaderContactInfoPtr,
struct _SstParams *Params,
Expand Down Expand Up @@ -301,6 +302,7 @@ static DP_RS_Stream EvpathInitReader(CP_Services Svcs, void *CP_Stream,
return Stream;
}

// reader-side routine, called by the main thread
static void EvpathDestroyReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v)
{
Evpath_RS_Stream RS_Stream = (Evpath_RS_Stream)RS_Stream_v;
Expand All @@ -315,6 +317,7 @@ static void EvpathDestroyReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v)
free(RS_Stream);
}

// writer side routine, called by the
static void MarkReadRequest(TimestepList TS, DP_WSR_Stream Reader,
int RequestingRank)
{
Expand All @@ -329,6 +332,7 @@ static void MarkReadRequest(TimestepList TS, DP_WSR_Stream Reader,
}
}

// writer side routine, called by the network handler thread
static void EvpathReadRequestHandler(CManager cm, CMConnection conn,
void *msg_v, void *client_Data,
attr_list attrs)
Expand Down Expand Up @@ -427,6 +431,7 @@ typedef struct _EvpathCompletionHandle
struct _EvpathCompletionHandle *Next;
} * EvpathCompletionHandle;

// reader-side routine called by the network handler thread
static void EvpathReadReplyHandler(CManager cm, CMConnection conn, void *msg_v,
void *client_Data, attr_list attrs)
{
Expand Down Expand Up @@ -473,6 +478,7 @@ static void EvpathReadReplyHandler(CManager cm, CMConnection conn, void *msg_v,
TAU_STOP_FUNC();
}

// reader-side routine, called from the main program
static int HandleRequestWithPreloaded(CP_Services Svcs,
Evpath_RS_Stream RS_Stream, int Rank,
long Timestep, size_t Offset,
Expand All @@ -499,6 +505,7 @@ static int HandleRequestWithPreloaded(CP_Services Svcs,
return 1;
}

// reader-side routine, called from the main program
static void DiscardPriorPreloaded(CP_Services Svcs, Evpath_RS_Stream RS_Stream,
long Timestep)
{
Expand Down Expand Up @@ -535,13 +542,13 @@ static void DiscardPriorPreloaded(CP_Services Svcs, Evpath_RS_Stream RS_Stream,
pthread_mutex_unlock(&RS_Stream->DataLock);
}

// reader-side routine, called from the network handler thread
static void EvpathPreloadHandler(CManager cm, CMConnection conn, void *msg_v,
void *client_Data, attr_list attrs)
{
EvpathPreloadMsg PreloadMsg = (EvpathPreloadMsg)msg_v;
Evpath_RS_Stream RS_Stream = PreloadMsg->RS_Stream;
CP_Services Svcs = (CP_Services)client_Data;
EvpathCompletionHandle Handle = NULL;
RSTimestepList Entry = calloc(1, sizeof(*Entry));

Svcs->verbose(
Expand All @@ -565,6 +572,7 @@ static void EvpathPreloadHandler(CManager cm, CMConnection conn, void *msg_v,
return;
}

// writer-side routine, called from the main program
static DP_WS_Stream EvpathInitWriter(CP_Services Svcs, void *CP_Stream,
struct _SstParams *Params,
attr_list DPAttrs)
Expand Down Expand Up @@ -606,6 +614,7 @@ static DP_WS_Stream EvpathInitWriter(CP_Services Svcs, void *CP_Stream,
return (void *)Stream;
}

// writer-side routine, called from the main program
static void EvpathDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v)
{
Evpath_WS_Stream WS_Stream = (Evpath_WS_Stream)WS_Stream_v;
Expand All @@ -630,6 +639,7 @@ static void EvpathDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v)
free(WS_Stream);
}

// writer-side routine, called from the main program
static DP_WSR_Stream EvpathInitWriterPerReader(CP_Services Svcs,
DP_WS_Stream WS_Stream_v,
int readerCohortSize,
Expand Down Expand Up @@ -694,13 +704,15 @@ static DP_WSR_Stream EvpathInitWriterPerReader(CP_Services Svcs,
return WSR_Stream;
}

// writer-side routine, called from the main program
static void EvpathDestroyWriterPerReader(CP_Services Svcs,
DP_WSR_Stream WSR_Stream_v)
{
Evpath_WSR_Stream WSR_Stream = (Evpath_WSR_Stream)WSR_Stream_v;
free(WSR_Stream);
}

// reader-side routine, called from the main program
static void EvpathProvideWriterDataToReader(CP_Services Svcs,
DP_RS_Stream RS_Stream_v,
int writerCohortSize,
Expand Down Expand Up @@ -743,6 +755,7 @@ static void AddRequestToList(CP_Services Svcs, Evpath_RS_Stream Stream,
pthread_mutex_unlock(&Stream->DataLock);
}

// reader-side routine, called from the main program
static void RemoveRequestFromList(CP_Services Svcs, Evpath_RS_Stream Stream,
EvpathCompletionHandle Handle)
{
Expand Down Expand Up @@ -773,19 +786,23 @@ static void RemoveRequestFromList(CP_Services Svcs, Evpath_RS_Stream Stream,
pthread_mutex_unlock(&Stream->DataLock);
}

// reader-side routine, called from the network handler (close handler)
static void FailRequestsToRank(CP_Services Svcs, CManager cm,
Evpath_RS_Stream Stream, int FailedRank)
{
EvpathCompletionHandle Tmp;
Svcs->verbose(Stream->CP_Stream, "Fail all pending requests on stream %p\n",
int FailedSomethingToRank = 0;
Svcs->verbose(Stream->CP_Stream,
"Fail pending requests to rank %d on stream %p\n", FailedRank,
Stream);
pthread_mutex_lock(&Stream->DataLock);
Tmp = Stream->PendingReadRequests;
while (Tmp != NULL)
{
if (Tmp->Failed != 1)
if ((Tmp->Failed != 1) && (Tmp->Rank == FailedRank))
{
Tmp->Failed = 1;
FailedSomethingToRank = 1;
Svcs->verbose(Tmp->CPStream,
"Found a pending remote memory read "
"to writer rank %d, marking as "
Expand All @@ -797,9 +814,36 @@ static void FailRequestsToRank(CP_Services Svcs, CManager cm,
}
Tmp = Tmp->Next;
}
if (FailedSomethingToRank)
{
Tmp = Stream->PendingReadRequests;
Svcs->verbose(Stream->CP_Stream,
"We were waiting for requests on rank %d, fail *all* "
"pending requests on stream %p\n",
FailedRank, Stream);

while (Tmp != NULL)
{
if (Tmp->Failed != 1)
{
Tmp->Failed = 1;
FailedSomethingToRank = 1;
Svcs->verbose(Tmp->CPStream,
"Found a pending remote memory read "
"to writer rank %d, marking as "
"failed and signalling condition %d\n",
Tmp->Rank, Tmp->CMcondition);
CMCondition_signal(cm, Tmp->CMcondition);
Svcs->verbose(Tmp->CPStream, "Did the signal of condition %d\n",
Tmp->Rank, Tmp->CMcondition);
}
Tmp = Tmp->Next;
}
}
pthread_mutex_unlock(&Stream->DataLock);
Svcs->verbose(Stream->CP_Stream,
"Done Failing requests to writer from stream %p\n", Stream);
"Done Failing requests to writer %d from stream %p\n",
FailedRank, Stream);
}

typedef struct _EvpathPerTimestepInfo
Expand All @@ -808,6 +852,7 @@ typedef struct _EvpathPerTimestepInfo
int CheckInt;
} * EvpathPerTimestepInfo;

// reader-side routine, called from the main program
static void *EvpathReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v,
int Rank, long Timestep, size_t Offset,
size_t Length, void *Buffer,
Expand All @@ -817,7 +862,8 @@ static void *EvpathReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v,
Stream_v; /* DP_RS_Stream is the return from InitReader */
CManager cm = Svcs->getCManager(Stream->CP_Stream);
EvpathCompletionHandle ret = malloc(sizeof(struct _EvpathCompletionHandle));
EvpathPerTimestepInfo TimestepInfo = (EvpathPerTimestepInfo)DP_TimestepInfo;
// EvpathPerTimestepInfo TimestepInfo =
// (EvpathPerTimestepInfo)DP_TimestepInfo;
struct _EvpathReadRequestMsg ReadRequestMsg;

int HadPreload;
Expand Down Expand Up @@ -1065,8 +1111,9 @@ static void EvpathReaderReleaseTimestep(CP_Services Svcs,
Evpath_WSR_Stream WSR_Stream = (Evpath_WSR_Stream)Stream_v;
Evpath_WS_Stream WS_Stream =
WSR_Stream->WS_Stream; /* pointer to writer struct */
TimestepList tmp = WS_Stream->Timesteps;
TimestepList tmp;

tmp = WS_Stream->Timesteps;
if ((!WSR_Stream->ReaderRequests) &&
(Timestep >= WSR_Stream->ReadPatternLockTimestep))
{
Expand Down Expand Up @@ -1128,7 +1175,7 @@ static void EvpathProvideTimestep(CP_Services Svcs, DP_WS_Stream Stream_v,
{
Evpath_WS_Stream Stream = (Evpath_WS_Stream)Stream_v;
TimestepList Entry = malloc(sizeof(struct _TimestepEntry));
struct _EvpathPerTimestepInfo *Info = NULL;
// struct _EvpathPerTimestepInfo *Info = NULL;
// malloc(sizeof(struct _EvpathPerTimestepInfo));

// This section exercised the CP's ability to distribute DP per timestep
Expand Down Expand Up @@ -1244,17 +1291,17 @@ static FMStructDescRec EvpathWriterContactStructs[] = {
sizeof(struct _EvpathWriterContactInfo), NULL},
{NULL, NULL, 0, NULL}};

static FMField EvpathTimestepInfoList[] = {
{"CheckString", "string", sizeof(char *),
FMOffset(EvpathPerTimestepInfo, CheckString)},
{"CheckInt", "integer", sizeof(void *),
FMOffset(EvpathPerTimestepInfo, CheckInt)},
{NULL, NULL, 0, 0}};

static FMStructDescRec EvpathTimestepInfoStructs[] = {
{"EvpathTimestepInfo", EvpathTimestepInfoList,
sizeof(struct _EvpathPerTimestepInfo), NULL},
{NULL, NULL, 0, NULL}};
// static FMField EvpathTimestepInfoList[] = {
// {"CheckString", "string", sizeof(char *),
// FMOffset(EvpathPerTimestepInfo, CheckString)},
// {"CheckInt", "integer", sizeof(void *),
// FMOffset(EvpathPerTimestepInfo, CheckInt)},
// {NULL, NULL, 0, 0}};

// static FMStructDescRec EvpathTimestepInfoStructs[] = {
// {"EvpathTimestepInfo", EvpathTimestepInfoList,
// sizeof(struct _EvpathPerTimestepInfo), NULL},
// {NULL, NULL, 0, NULL}};

static struct _CP_DP_Interface evpathDPInterface;

Expand Down