diff --git a/source/adios2/toolkit/sst/cp/cp_common.c b/source/adios2/toolkit/sst/cp/cp_common.c index 6e092c67d5..83d55c6833 100644 --- a/source/adios2/toolkit/sst/cp/cp_common.c +++ b/source/adios2/toolkit/sst/cp/cp_common.c @@ -65,7 +65,7 @@ void CP_validateParams(SstStream Stream, SstParams Params, int Writer) if (Params->ControlTransport == NULL) { /* determine reasonable default, now "sockets" */ - Params->ControlTransport = strdup("sockets"); + Params->ControlTransport = strdup("zplenet"); } else { diff --git a/source/adios2/toolkit/sst/cp/cp_internal.h b/source/adios2/toolkit/sst/cp/cp_internal.h index 595c96f72d..7ce04d9173 100644 --- a/source/adios2/toolkit/sst/cp/cp_internal.h +++ b/source/adios2/toolkit/sst/cp/cp_internal.h @@ -80,7 +80,6 @@ typedef struct _WS_ReaderInfo long OldestUnreleasedTimestep; struct _SentTimestepRec *SentTimestepList; void *DP_WSR_Stream; - void *RS_StreamID; int ReaderCohortSize; int *Peers; CP_PeerConnection *Connections; diff --git a/source/adios2/toolkit/sst/cp/cp_reader.c b/source/adios2/toolkit/sst/cp/cp_reader.c index a5efd21328..7c65138ea4 100644 --- a/source/adios2/toolkit/sst/cp/cp_reader.c +++ b/source/adios2/toolkit/sst/cp/cp_reader.c @@ -20,6 +20,38 @@ #include "adios2/toolkit/profiling/taustubs/taustubs.h" #include "cp_internal.h" +static int locked = 0; +#define gettid() pthread_self() +#define MUTEX_DEBUG +#ifdef MUTEX_DEBUG +#define PTHREAD_MUTEX_LOCK(lock) \ + printf("(PID %lx, TID %lx) CP_READER Trying lock line %d\n", \ + (long)getpid(), (long)gettid(), __LINE__); \ + pthread_mutex_lock(lock); \ + locked++; \ + printf("(PID %lx) CP_READER Got lock\n", (long)getpid()); +#define PTHREAD_MUTEX_UNLOCK(lock) \ + printf("(PID %lx, TID %lx) CP_READER UNlocking line %d\n", (long)getpid(), \ + (long)gettid(), __LINE__); \ + locked--; \ + pthread_mutex_unlock(lock); +#define SST_ASSERT_LOCKED() assert(locked) +#define SST_ASSERT_UNLOCKED() /* gotta lock to really do this */ +#else +#define PTHREAD_MUTEX_LOCK(lock) \ + { \ + pthread_mutex_lock(lock); \ + locked++; \ + } +#define PTHREAD_MUTEX_UNLOCK(lock) \ + { \ + locked--; \ + pthread_mutex_unlock(lock); \ + } +#define SST_ASSERT_LOCKED() assert(locked) +#define SST_ASSERT_UNLOCKED() /* gotta lock to really do this */ +#endif + static char *readContactInfoFile(const char *Name, SstStream Stream, int Timeout) { @@ -175,10 +207,10 @@ extern void ReaderConnCloseHandler(CManager cm, CMConnection ClosedConn, CP_verbose(Stream, "Reader-side Rank received a " "connection-close event during normal " "operations, peer likely failed\n"); - pthread_mutex_lock(&Stream->DataLock); + PTHREAD_MUTEX_LOCK(&Stream->DataLock); Stream->Status = PeerFailed; pthread_cond_signal(&Stream->DataCondition); - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); CP_verbose( Stream, "The close was for connection to writer peer %d, notifying DP\n", @@ -245,7 +277,9 @@ static int HasAllPeers(SstStream Stream) int i, StillWaiting = 0; if (!Stream->ConnectionsToWriter) { - CP_verbose(Stream, "Waiting for first Peer notification\n"); + CP_verbose(Stream, + "(PID %lx, TID %lx) Waiting for first Peer notification\n", + (long)gettid(), (long)getpid()); return 0; } i = 0; @@ -358,6 +392,8 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, MPI_Comm comm) SMPI_Comm_rank(Stream->mpiComm, &Stream->Rank); SMPI_Comm_size(Stream->mpiComm, &Stream->CohortSize); + printf("READER main program thread PID %lx, TID %lx in reader open\n", + (long)gettid(), (long)getpid()); CP_validateParams(Stream, Params, 0 /* reader */); Stream->ConfigParams = Params; @@ -383,6 +419,11 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, MPI_Comm comm) pointers = (struct _CP_DP_PairInfo **)ParticipateInReaderInitDataExchange( Stream, dpInfo, &data_block); + extern int CMtrace_val[]; + int tmp = CMtrace_val[3]; + int tmp2 = CMtrace_val[5]; + printf("READER (%p) main program thread PID %lx, TID %lx in reader open\n", + Stream, (long)gettid(), (long)getpid()); if (Stream->Rank == 0) { struct _CombinedWriterInfo WriterData; @@ -426,6 +467,8 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, MPI_Comm comm) ReaderRegister.WriterResponseCondition, &response); + // CMtrace_val[3] = 1; + // CMtrace_val[5] = 1; if (CMwrite(rank0_to_rank0_conn, Stream->CPInfo->ReaderRegisterFormat, &ReaderRegister) != 1) { @@ -519,14 +562,14 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, MPI_Comm comm) getPeerArrays(Stream->CohortSize, Stream->Rank, Stream->WriterCohortSize, &Stream->Peers, NULL); - pthread_mutex_lock(&Stream->DataLock); + PTHREAD_MUTEX_LOCK(&Stream->DataLock); while (!HasAllPeers(Stream)) { /* wait until we get the timestep metadata or something else changes */ pthread_cond_wait(&Stream->DataCondition, &Stream->DataLock); } - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); } else { @@ -571,13 +614,16 @@ SstStream SstReaderOpen(const char *Name, SstParams Params, MPI_Comm comm) Stream->DP_Interface->provideWriterDataToReader( &Svcs, Stream->DP_Stream, ReturnData->WriterCohortSize, Stream->ConnectionsToWriter, ReturnData->DP_WriterInfo); - pthread_mutex_lock(&Stream->DataLock); + PTHREAD_MUTEX_LOCK(&Stream->DataLock); Stream->Status = Established; - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); CP_verbose(Stream, "Sending Reader Activate messages to writer\n"); memset(&Msg, 0, sizeof(Msg)); + SST_ASSERT_UNLOCKED(); sendOneToEachWriterRank(Stream, Stream->CPInfo->ReaderActivateFormat, &Msg, &Msg.WSR_Stream); + CMtrace_val[3] = tmp; + CMtrace_val[5] = tmp2; CP_verbose(Stream, "Finish opening Stream \"%s\", starting with Step number %d\n", Filename, ReturnData->StartingStepNumber); @@ -609,9 +655,12 @@ extern void CP_PeerSetupHandler(CManager cm, CMConnection conn, void *Msg_v, SstStream Stream; struct _PeerSetupMsg *Msg = (struct _PeerSetupMsg *)Msg_v; Stream = (SstStream)Msg->RS_Stream; - pthread_mutex_lock(&Stream->DataLock); + CP_verbose(Stream, "Received peer setup from rank %d, conn %p\n", + Msg->WriterRank, conn); + PTHREAD_MUTEX_LOCK(&Stream->DataLock); if (!Stream->ConnectionsToWriter) { + CP_verbose(Stream, "Allocating connections to writer\n"); Stream->ConnectionsToWriter = calloc(sizeof(CP_PeerConnection), Msg->WriterCohortSize); } @@ -624,7 +673,7 @@ extern void CP_PeerSetupHandler(CManager cm, CMConnection conn, void *Msg_v, } CMconn_register_close_handler(conn, ReaderConnCloseHandler, (void *)Stream); pthread_cond_signal(&Stream->DataCondition); - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); TAU_STOP_FUNC(); } @@ -638,7 +687,7 @@ void queueTimestepMetadataMsgAndNotify(SstStream Stream, memset(&Msg, 0, sizeof(Msg)); Msg.Timestep = tsm->Timestep; - pthread_mutex_lock(&Stream->DataLock); + PTHREAD_MUTEX_LOCK(&Stream->DataLock); /* * before discarding, install any precious metadata from this message */ @@ -646,7 +695,7 @@ void queueTimestepMetadataMsgAndNotify(SstStream Stream, { FFSMarshalInstallPreciousMetadata(Stream, tsm); } - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); /* * send each writer rank a release for this timestep (actually goes to @@ -663,6 +712,7 @@ void queueTimestepMetadataMsgAndNotify(SstStream Stream, "Sending ReleaseTimestep message for PRIOR DISCARD " "timestep %d, one to each writer\n", tsm->Timestep); + SST_ASSERT_UNLOCKED(); sendOneToEachWriterRank(Stream, Stream->CPInfo->ReleaseTimestepFormat, &Msg, &Msg.WSR_Stream); @@ -674,11 +724,13 @@ void queueTimestepMetadataMsgAndNotify(SstStream Stream, "ignoring in PRIOR DISCARD\n", tsm->Timestep); } + if (tsm == NULL) + printf("READER RETURN_BUFFER, tsm == %p, line %d\n", tsm, __LINE__); CMreturn_buffer(Stream->CPInfo->cm, tsm); return; } - pthread_mutex_lock(&Stream->DataLock); + PTHREAD_MUTEX_LOCK(&Stream->DataLock); struct _TimestepMetadataList *New = malloc(sizeof(struct _RequestQueue)); New->MetadataMsg = tsm; New->Next = NULL; @@ -701,7 +753,7 @@ void queueTimestepMetadataMsgAndNotify(SstStream Stream, tsm->Timestep); pthread_cond_signal(&Stream->DataCondition); - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); if ((Stream->WriterConfigParams->CPCommPattern == SstCPCommMin) && (Stream->ConfigParams->AlwaysProvideLatestTimestep)) { @@ -736,7 +788,7 @@ void CP_TimestepMetadataHandler(CManager cm, CMConnection conn, void *Msg_v, "Received a message that timestep %d has been discarded\n", Msg->Timestep); - pthread_mutex_lock(&Stream->DataLock); + PTHREAD_MUTEX_LOCK(&Stream->DataLock); /* * before discarding, install any precious metadata from this * message @@ -745,7 +797,7 @@ void CP_TimestepMetadataHandler(CManager cm, CMConnection conn, void *Msg_v, { FFSMarshalInstallPreciousMetadata(Stream, Msg); } - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); return; } @@ -791,6 +843,10 @@ void CP_WriterResponseHandler(CManager cm, CMConnection conn, void *Msg_v, // Msg->CP_WriterInfo[i]->WriterID); // } + printf("READER network handler thread PID %lx, TID %lx in writer response " + "handler\n", + (long)gettid(), (long)getpid()); + /* arrange for this message data to stay around */ CMtake_buffer(cm, Msg); @@ -817,12 +873,12 @@ extern void CP_WriterCloseHandler(CManager cm, CMConnection conn, void *Msg_v, "Timestep %d was the final timestep.\n", Msg->FinalTimestep); - pthread_mutex_lock(&Stream->DataLock); + PTHREAD_MUTEX_LOCK(&Stream->DataLock); Stream->FinalTimestep = Msg->FinalTimestep; Stream->Status = PeerClosed; /* wake anyone that might be waiting */ pthread_cond_signal(&Stream->DataCondition); - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); TAU_STOP_FUNC(); } @@ -838,22 +894,22 @@ extern void CP_CommPatternLockedHandler(CManager cm, CMConnection conn, "Received a CommPatternLocked message, beginning with Timestep %d.\n", Msg->Timestep); - pthread_mutex_lock(&Stream->DataLock); + PTHREAD_MUTEX_LOCK(&Stream->DataLock); Stream->CommPatternLocked = 1; Stream->CommPatternLockedTimestep = Msg->Timestep; - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); } static long MaxQueuedMetadata(SstStream Stream) { struct _TimestepMetadataList *Next; long MaxTimestep = -1; - pthread_mutex_lock(&Stream->DataLock); + PTHREAD_MUTEX_LOCK(&Stream->DataLock); Next = Stream->Timesteps; if (Next == NULL) { CP_verbose(Stream, "MaxQueued Timestep returning -1\n"); - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); return -1; } while (Next) @@ -864,7 +920,7 @@ static long MaxQueuedMetadata(SstStream Stream) } Next = Next->Next; } - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); CP_verbose(Stream, "MaxQueued Timestep returning %ld\n", MaxTimestep); return MaxTimestep; } @@ -873,12 +929,12 @@ static long NextQueuedMetadata(SstStream Stream) { struct _TimestepMetadataList *Next; long MinTimestep = LONG_MAX; - pthread_mutex_lock(&Stream->DataLock); + PTHREAD_MUTEX_LOCK(&Stream->DataLock); Next = Stream->Timesteps; if (Next == NULL) { CP_verbose(Stream, "NextQueued Timestep returning -1\n"); - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); return -1; } while (Next) @@ -889,7 +945,7 @@ static long NextQueuedMetadata(SstStream Stream) } Next = Next->Next; } - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); CP_verbose(Stream, "NextQueued Timestep returning %ld\n", MinTimestep); return MinTimestep; } @@ -898,10 +954,10 @@ static void triggerDataCondition(CManager cm, void *vStream) { SstStream Stream = (SstStream)vStream; - pthread_mutex_lock(&Stream->DataLock); + PTHREAD_MUTEX_LOCK(&Stream->DataLock); /* wake the sleeping main thread for timeout */ pthread_cond_signal(&Stream->DataCondition); - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); } static void waitForMetadataWithTimeout(SstStream Stream, float timeout_secs) @@ -912,7 +968,7 @@ static void waitForMetadataWithTimeout(SstStream Stream, float timeout_secs) int timeout_int_usec = ((timeout_secs - floorf(timeout_secs)) * 1000000); CMTaskHandle TimeoutTask = NULL; - pthread_mutex_lock(&Stream->DataLock); + PTHREAD_MUTEX_LOCK(&Stream->DataLock); gettimeofday(&start, NULL); Next = Stream->Timesteps; CP_verbose( @@ -921,7 +977,7 @@ static void waitForMetadataWithTimeout(SstStream Stream, float timeout_secs) timeout_secs, start.tv_sec, start.tv_usec); if (Next) { - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); CP_verbose(Stream, "Returning from wait with timeout, NO TIMEOUT\n"); } end.tv_sec = start.tv_sec + timeout_int_sec; @@ -939,7 +995,7 @@ static void waitForMetadataWithTimeout(SstStream Stream, float timeout_secs) // special case if (timeout_secs == 0.0) { - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); CP_verbose( Stream, "Returning from wait With no data after zero timeout poll\n"); @@ -955,14 +1011,14 @@ static void waitForMetadataWithTimeout(SstStream Stream, float timeout_secs) if (Next) { CMremove_task(TimeoutTask); - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); CP_verbose(Stream, "Returning from wait with timeout, NO TIMEOUT\n"); return; } if (Stream->Status != Established) { - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); CP_verbose(Stream, "Returning from wait with timeout, STREAM NO " "LONGER ESTABLISHED\n"); return; @@ -972,7 +1028,7 @@ static void waitForMetadataWithTimeout(SstStream Stream, float timeout_secs) now.tv_sec, now.tv_usec, end.tv_sec, end.tv_usec); if (timercmp(&now, &end, >)) { - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); CP_verbose(Stream, "Returning from wait after timing out\n"); return; } @@ -986,7 +1042,7 @@ static void releasePriorTimesteps(SstStream Stream, long Latest) { struct _TimestepMetadataList *Next, *Last; TSMetadataList FoundTS = NULL; - pthread_mutex_lock(&Stream->DataLock); + PTHREAD_MUTEX_LOCK(&Stream->DataLock); CP_verbose(Stream, "Releasing any timestep earlier than %d\n", Latest); Next = Stream->Timesteps; Last = NULL; @@ -1020,10 +1076,7 @@ static void releasePriorTimesteps(SstStream Stream, long Latest) "Sending ReleaseTimestep message for RELEASE " "PRIOR timestep %d, one to each writer\n", This->MetadataMsg->Timestep); - sendOneToEachWriterRank(Stream, - Stream->CPInfo->ReleaseTimestepFormat, &Msg, - &Msg.WSR_Stream); - CMreturn_buffer(Stream->CPInfo->cm, This->MetadataMsg); + if (Last == NULL) { Stream->Timesteps = Next; @@ -1033,6 +1086,16 @@ static void releasePriorTimesteps(SstStream Stream, long Latest) Last->Next = Next; } free(This); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); + SST_ASSERT_UNLOCKED(); + sendOneToEachWriterRank(Stream, + Stream->CPInfo->ReleaseTimestepFormat, &Msg, + &Msg.WSR_Stream); + if (This->MetadataMsg == NULL) + printf("READER RETURN_BUFFER, metadatamsg == %p, line %d\n", + This->MetadataMsg, __LINE__); + CMreturn_buffer(Stream->CPInfo->cm, This->MetadataMsg); + PTHREAD_MUTEX_LOCK(&Stream->DataLock); } else { @@ -1040,7 +1103,7 @@ static void releasePriorTimesteps(SstStream Stream, long Latest) Next = Next->Next; } } - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); } static void FreeTimestep(SstStream Stream, long Timestep) @@ -1050,9 +1113,13 @@ static void FreeTimestep(SstStream Stream, long Timestep) */ struct _TimestepMetadataList *List = Stream->Timesteps; + SST_ASSERT_LOCKED(); if (Stream->Timesteps->MetadataMsg->Timestep == Timestep) { Stream->Timesteps = List->Next; + if (List->MetadataMsg == NULL) + printf("READER RETURN_BUFFER, List->MEtadataMsg == %p, line %d\n", + List->MetadataMsg, __LINE__); CMreturn_buffer(Stream->CPInfo->cm, List->MetadataMsg); free(List); } @@ -1065,6 +1132,10 @@ static void FreeTimestep(SstStream Stream, long Timestep) if (List->MetadataMsg->Timestep == Timestep) { last->Next = List->Next; + if (List->MetadataMsg == NULL) + printf("READER RETURN_BUFFER, List->MEtadataMsg == %p, " + "line %d\n", + List->MetadataMsg, __LINE__); CMreturn_buffer(Stream->CPInfo->cm, List->MetadataMsg); free(List); break; @@ -1078,7 +1149,7 @@ static void FreeTimestep(SstStream Stream, long Timestep) static TSMetadataList waitForNextMetadata(SstStream Stream, long LastTimestep) { TSMetadataList FoundTS = NULL; - pthread_mutex_lock(&Stream->DataLock); + PTHREAD_MUTEX_LOCK(&Stream->DataLock); CP_verbose(Stream, "Wait for next metadata after last timestep %d\n", LastTimestep); while (1) @@ -1126,7 +1197,7 @@ static TSMetadataList waitForNextMetadata(SstStream Stream, long LastTimestep) } if (FoundTS) { - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); CP_verbose(Stream, "Returning metadata for Timestep %d\n", FoundTS->MetadataMsg->Timestep); Stream->CurrentWorkingTimestep = FoundTS->MetadataMsg->Timestep; @@ -1137,7 +1208,7 @@ static TSMetadataList waitForNextMetadata(SstStream Stream, long LastTimestep) ((Stream->FinalTimestep != INT_MAX) && (Stream->FinalTimestep >= LastTimestep))) { - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); CP_verbose(Stream, "Stream Final Timestep is %d, last timestep was %d\n", Stream->FinalTimestep, LastTimestep); @@ -1167,13 +1238,14 @@ static TSMetadataList waitForNextMetadata(SstStream Stream, long LastTimestep) CP_verbose(Stream, "Waiting for metadata for a Timestep later than TS %d\n", LastTimestep); - CP_verbose(Stream, "(PID %x) Stream status is %s\n", getpid(), + CP_verbose(Stream, "(PID %lx, TID %lx) Stream status is %s\n", + (long)getpid(), (long)gettid(), SSTStreamStatusStr[Stream->Status]); /* wait until we get the timestep metadata or something else changes */ pthread_cond_wait(&Stream->DataCondition, &Stream->DataLock); } /* NOTREACHED */ - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); } extern SstFullMetadata SstGetCurMetadata(SstStream Stream) @@ -1195,6 +1267,7 @@ extern void *SstReadRemoteMemory(SstStream Stream, int Rank, long Timestep, static void sendOneToEachWriterRank(SstStream s, CMFormat f, void *Msg, void **WS_StreamPtr) { + SST_ASSERT_UNLOCKED(); if (s->WriterConfigParams->CPCommPattern == SstCPCommPeer) { int i = 0; @@ -1239,6 +1312,7 @@ extern void SstReaderDefinitionLock(SstStream Stream, long EffectiveTimestep) memset(&Msg, 0, sizeof(Msg)); Msg.Timestep = EffectiveTimestep; + SST_ASSERT_UNLOCKED(); sendOneToEachWriterRank(Stream, Stream->CPInfo->LockReaderDefinitionsFormat, &Msg, &Msg.WSR_Stream); } @@ -1252,9 +1326,9 @@ extern void SstReleaseStep(SstStream Stream) if ((Stream->WriterConfigParams->CPCommPattern == SstCPCommPeer) || (Stream->Rank == 0)) { - pthread_mutex_lock(&Stream->DataLock); + PTHREAD_MUTEX_LOCK(&Stream->DataLock); FreeTimestep(Stream, Timestep); - pthread_mutex_unlock(&Stream->DataLock); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); } SMPI_Barrier(Stream->mpiComm); @@ -1270,6 +1344,7 @@ extern void SstReleaseStep(SstStream Stream) Stream, "Sending ReleaseTimestep message for timestep %d, one to each writer\n", Timestep); + SST_ASSERT_UNLOCKED(); sendOneToEachWriterRank(Stream, Stream->CPInfo->ReleaseTimestepFormat, &Msg, &Msg.WSR_Stream); @@ -1810,6 +1885,7 @@ extern void SstReaderClose(SstStream Stream) gettimeofday(&CloseTime, NULL); timersub(&CloseTime, &Stream->ValidStartTime, &Diff); memset(&Msg, 0, sizeof(Msg)); + SST_ASSERT_UNLOCKED(); sendOneToEachWriterRank(Stream, Stream->CPInfo->ReaderCloseFormat, &Msg, &Msg.WSR_Stream); if (Stream->Stats) diff --git a/source/adios2/toolkit/sst/cp/cp_writer.c b/source/adios2/toolkit/sst/cp/cp_writer.c index 4ca07dd28f..5120938737 100644 --- a/source/adios2/toolkit/sst/cp/cp_writer.c +++ b/source/adios2/toolkit/sst/cp/cp_writer.c @@ -19,6 +19,7 @@ #include "adios2/toolkit/profiling/taustubs/taustubs.h" #include "cp_internal.h" +#define gettid() pthread_self() extern void CP_verbose(SstStream Stream, char *Format, ...); static void sendOneToEachWriterRank(SstStream s, CMFormat f, void *Msg, @@ -27,17 +28,21 @@ static void CP_PeerFailCloseWSReader(WS_ReaderInfo CP_WSR_Stream, enum StreamStatus NewState); static int locked = 0; +#define MUTEX_DEBUG #ifdef MUTEX_DEBUG #define PTHREAD_MUTEX_LOCK(lock) \ - printf("Trying lock line %d\n", __LINE__); \ + printf("(PID %lx, TID %lx) CP_WRITER Trying lock line %d\n", \ + (long)getpid(), (long)gettid(), __LINE__); \ pthread_mutex_lock(lock); \ locked++; \ - printf("Got lock\n"); + printf("(PID %lx) Got lock\n", (long)getpid()); #define PTHREAD_MUTEX_UNLOCK(lock) \ - printf("UNlocking line %d\n", __LINE__); \ + printf("(PID %lx, TID %lx) CP_WRITER UNlocking line %d\n", (long)getpid(), \ + (long)gettid(), __LINE__); \ locked--; \ pthread_mutex_unlock(lock); #define SST_ASSERT_LOCKED() assert(locked) +#define SST_ASSERT_UNLOCKED() /* gotta lock to really do this */ #else #define PTHREAD_MUTEX_LOCK(lock) \ { \ @@ -50,7 +55,7 @@ static int locked = 0; pthread_mutex_unlock(lock); \ } #define SST_ASSERT_LOCKED() assert(locked) -#define SST_ASSERT_UNLOCKED() assert(unlocked) +#define SST_ASSERT_UNLOCKED() /* gotta lock to really do this */ #endif static char *buildContactInfo(SstStream Stream, attr_list DPAttrs) @@ -439,11 +444,17 @@ static void SendPeerSetupMsg(WS_ReaderInfo reader, int reversePeer, int myRank) setup.RS_Stream = reader->Connections[reversePeer].RemoteStreamID; setup.WriterRank = myRank; setup.WriterCohortSize = Stream->CohortSize; + printf("Sending Peer setup message to rank %d remote stream (%p) \n", + reversePeer, setup.RS_Stream); + SST_ASSERT_UNLOCKED(); if (CMwrite(conn, Stream->CPInfo->PeerSetupFormat, &setup) != 1) { CP_verbose(Stream, "Message failed to send to reader in sendPeerSetup in " "reader open\n"); + printf("FAILED TO Send Peer setup message to rank %d remote stream " + "(%p) \n", + reversePeer, setup.RS_Stream); } } @@ -513,6 +524,7 @@ static int initWSReader(WS_ReaderInfo reader, int ReaderSize, * failure awareness. */ + sleep(1); getPeerArrays(WriterSize, WriterRank, ReaderSize, &reader->Peers, &reverseArray); @@ -546,6 +558,7 @@ static int initWSReader(WS_ReaderInfo reader, int ReaderSize, CMconn_register_close_handler(reader->Connections[peer].CMconn, WriterConnCloseHandler, (void *)reader); + SST_ASSERT_UNLOCKED(); if (i == 0) { /* failure awareness for reader rank */ @@ -596,6 +609,7 @@ static int initWSReader(WS_ReaderInfo reader, int ReaderSize, WriterConnCloseHandler, (void *)reader); /* failure awareness for reader rank */ + SST_ASSERT_UNLOCKED(); CP_verbose(reader->ParentStream, "Sending peer setup to rank %d\n", peer); SendPeerSetupMsg(reader, peer, reader->ParentStream->Rank); @@ -844,6 +858,7 @@ WS_ReaderInfo WriterParticipateInReaderOpen(SstStream Stream) (struct _CP_WriterInitInfo *)pointers[i]->CP_Info; response.DP_WriterInfo[i] = pointers[i]->DP_Info; } + SST_ASSERT_UNLOCKED(); if (CMwrite(conn, Stream->CPInfo->WriterResponseFormat, &response) != 1) { CP_verbose(Stream, @@ -872,6 +887,7 @@ void sendOneToWSRCohort(WS_ReaderInfo CP_WSR_Stream, CMFormat f, void *Msg, SstStream s = CP_WSR_Stream->ParentStream; int j = 0; + SST_ASSERT_UNLOCKED(); if (s->ConfigParams->CPCommPattern == SstCPCommPeer) { while (CP_WSR_Stream->Peers[j] != -1) @@ -1016,6 +1032,7 @@ static void SendTimestepEntryToSingleReader(SstStream Stream, "READER %p, reference count is now %d\n", Entry->Timestep, rank, CP_WSR_Stream, Entry->ReferenceCount); AddTSToSentList(Stream, CP_WSR_Stream, Entry->Timestep); + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); if (Stream->DP_Interface->readerRegisterTimestep) { (Stream->DP_Interface->readerRegisterTimestep)( @@ -1023,8 +1040,8 @@ static void SendTimestepEntryToSingleReader(SstStream Stream, CP_WSR_Stream->PreloadMode); } + SST_ASSERT_UNLOCKED(); Entry->Msg->PreloadMode = CP_WSR_Stream->PreloadMode; - PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); sendOneToWSRCohort(CP_WSR_Stream, Stream->CPInfo->DeliverTimestepMetadataFormat, Entry->Msg, &Entry->Msg->RS_Stream); @@ -1049,7 +1066,9 @@ static void waitForReaderResponseAndSendQueued(WS_ReaderInfo Reader) while (Reader->ReaderStatus != Established) { /* NEED TO HANDLE FAILURE HERE */ - CP_verbose(Stream, "Waiting for Reader ready on WSR %p.\n", Reader); + CP_verbose(Stream, + "(PID %lx, TID %lx) Waiting for Reader ready on WSR %p.\n", + (long)getpid(), (long)gettid(), Reader); pthread_cond_wait(&Stream->DataCondition, &Stream->DataLock); } @@ -1080,8 +1099,15 @@ static void waitForReaderResponseAndSendQueued(WS_ReaderInfo Reader) CPTimestepList List = Stream->QueuedTimesteps; while (List) { + CP_verbose( + Stream, + "In send queued, trying to send TS %ld, examining TS %ld\n", TS, + List->Timestep); if (Reader->ReaderStatus != Established) - continue; /* do nothing if we've fallen out of established */ + { + break; /* break out of while if we've fallen out of established + */ + } if (List->Timestep == TS) { FFSFormatList SavedFormats = List->Msg->Formats; @@ -1092,7 +1118,7 @@ static void waitForReaderResponseAndSendQueued(WS_ReaderInfo Reader) "and not precious\n", List->Timestep, TS); List = List->Next; - continue; /* do nothing timestep is expired, but not + continue; /* skip timestep is expired, but not precious */ } if (TS == Reader->StartingTimestep) @@ -1134,6 +1160,8 @@ SstStream SstWriterOpen(const char *Name, SstParams Params, MPI_Comm comm) SMPI_Comm_rank(Stream->mpiComm, &Stream->Rank); SMPI_Comm_size(Stream->mpiComm, &Stream->CohortSize); + printf("WRITER main program thread PID is %lx, TID %lx in writer open\n", + (long)getpid(), (long)gettid()); Stream->DP_Interface = SelectDP(&Svcs, Stream, Stream->ConfigParams); if (!Stream->DP_Interface) @@ -1236,6 +1264,7 @@ void sendOneToEachReaderRank(SstStream s, CMFormat f, void *Msg, { for (int i = 0; i < s->ReaderCount; i++) { + SST_ASSERT_UNLOCKED(); WS_ReaderInfo CP_WSR_Stream = s->Readers[i]; if (CP_WSR_Stream->ReaderStatus == Established) { @@ -1259,6 +1288,9 @@ static void CloseWSRStream(CManager cm, void *WSR_Stream_v) SstStream ParentStream = CP_WSR_Stream->ParentStream; PTHREAD_MUTEX_LOCK(&ParentStream->DataLock); + CP_verbose(ParentStream, + "Delayed task Moving Reader stream %p to status %s\n", + CP_WSR_Stream, SSTStreamStatusStr[PeerClosed]); CP_PeerFailCloseWSReader(CP_WSR_Stream, PeerClosed); PTHREAD_MUTEX_UNLOCK(&ParentStream->DataLock); } @@ -1333,6 +1365,7 @@ void SstWriterClose(SstStream Stream) struct _WriterCloseMsg Msg; struct timeval CloseTime, Diff; Msg.FinalTimestep = Stream->LastProvidedTimestep; + SST_ASSERT_UNLOCKED(); sendOneToEachReaderRank(Stream, Stream->CPInfo->WriterCloseFormat, &Msg, &Msg.RS_Stream); @@ -1619,9 +1652,12 @@ static void ActOnTSLockStatus(SstStream Stream) Stream->Readers[i]->ReaderSelectionLockTimestep); } Msg.Timestep = Stream->Readers[i]->ReaderSelectionLockTimestep; + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); + SST_ASSERT_UNLOCKED(); sendOneToWSRCohort(Stream->Readers[i], Stream->CPInfo->CommPatternLockedFormat, &Msg, &Msg.RS_Stream); + PTHREAD_MUTEX_LOCK(&Stream->DataLock); Stream->Readers[i]->ReaderDefinitionsLocked = 2; Stream->Readers[i]->PreloadMode = SstPreloadLearned; } @@ -1867,9 +1903,11 @@ extern void SstInternalProvideTimestep( PTHREAD_MUTEX_LOCK(&Stream->DataLock); Stream->WriterTimestep = Timestep; + PTHREAD_MUTEX_UNLOCK(&Stream->DataLock); Stream->DP_Interface->provideTimestep(&Svcs, Stream->DP_Stream, Data, LocalMetadata, Timestep, &DP_TimestepInfo); + PTHREAD_MUTEX_LOCK(&Stream->DataLock); /* Md is the local contribution to MetaData */ Md.Formats = Formats; @@ -2029,6 +2067,7 @@ extern void SstInternalProvideTimestep( "timestep %d, one to each reader\n", Timestep); + SST_ASSERT_UNLOCKED(); sendOneToEachReaderRank(Stream, Stream->CPInfo->DeliverTimestepMetadataFormat, Msg, &Msg->RS_Stream); @@ -2199,6 +2238,9 @@ void CP_ReaderRegisterHandler(CManager cm, CMConnection conn, void *Msg_v, // } Stream = Msg->WriterFile; + printf("WRITER network handler PID is %lx, TID %lx in reader register " + "handler\n", + (long)getpid(), (long)gettid()); /* arrange for this message data to stay around */ CMtake_buffer(cm, Msg); @@ -2256,6 +2298,7 @@ extern void CP_ReleaseTimestepHandler(CManager cm, CMConnection conn, /* decrement the reference count for the released timestep */ PTHREAD_MUTEX_LOCK(&ParentStream->DataLock); + CP_verbose(ParentStream, "Got the lock in release timestep\n"); Reader->LastReleasedTimestep = Msg->Timestep; if ((ParentStream->Rank == 0) && (ParentStream->ConfigParams->CPCommPattern == SstCPCommMin)) @@ -2268,10 +2311,13 @@ extern void CP_ReleaseTimestepHandler(CManager cm, CMConnection conn, ParentStream->ReleaseList[ParentStream->ReleaseCount].Reader = Reader; ParentStream->ReleaseCount++; } + CP_verbose(ParentStream, "Doing dereference sent\n"); DerefSentTimestep(ParentStream, Reader, Msg->Timestep); + CP_verbose(ParentStream, "Doing QueueMaint\n"); QueueMaintenance(ParentStream); Reader->OldestUnreleasedTimestep = Msg->Timestep + 1; pthread_cond_signal(&ParentStream->DataCondition); + CP_verbose(ParentStream, "Releasing the lock in release timestep\n"); PTHREAD_MUTEX_UNLOCK(&ParentStream->DataLock); TAU_STOP_FUNC(); } diff --git a/source/adios2/toolkit/sst/dp/evpath_dp.c b/source/adios2/toolkit/sst/dp/evpath_dp.c index 6c32c83314..8cfa5e221d 100644 --- a/source/adios2/toolkit/sst/dp/evpath_dp.c +++ b/source/adios2/toolkit/sst/dp/evpath_dp.c @@ -1011,9 +1011,18 @@ static void SendSpeculativePreloadMsgs(CP_Services Svcs, { if (!WSR_Stream->ReaderContactInfo[i].Conn) { + extern int CMtrace_val[]; attr_list List = attr_list_from_string( WSR_Stream->ReaderContactInfo[i].ContactString); + printf("Doing speculative preload, attr list :"); + dump_attr_list(List); + int tmp = CMtrace_val[5]; + int tmp2 = CMtrace_val[3]; + CMtrace_val[5] = 1; + CMtrace_val[3] = 1; CMConnection Conn = CMget_conn(cm, List); + CMtrace_val[5] = tmp; + CMtrace_val[3] = tmp2; free_attr_list(List); if (!Conn) { @@ -1024,9 +1033,12 @@ static void SendSpeculativePreloadMsgs(CP_Services Svcs, i); return; } + Svcs->verbose(WS_Stream->CP_Stream, + "Got a connection to reader rank %d \n", i); WSR_Stream->ReaderContactInfo[i].Conn = Conn; } PreloadMsg.RS_Stream = WSR_Stream->ReaderContactInfo[i].RS_Stream; + Svcs->verbose(WS_Stream->CP_Stream, "Doing the preload write \n"); CMwrite(WSR_Stream->ReaderContactInfo[i].Conn, WS_Stream->PreloadFormat, &PreloadMsg); } diff --git a/testing/adios2/engine/staging-common/run_test.py.gen.in b/testing/adios2/engine/staging-common/run_test.py.gen.in index 056693143d..b0ebb0fd69 100755 --- a/testing/adios2/engine/staging-common/run_test.py.gen.in +++ b/testing/adios2/engine/staging-common/run_test.py.gen.in @@ -52,8 +52,10 @@ def clean_server_kill(writer): touch('DieTest') print("TestDriver: Waiting for writer after touching file") + sys.stdout.flush() writer.wait() print("TestDriver: Writer complete") + sys.stdout.flush() os.remove('DieTest') @@ -159,7 +161,9 @@ def do_kill_writer_test(writer_cmd, reader_cmd, interval): def do_kill_readers_test(writer_cmd, reader_cmd, duration, interval): return_code = 0 - writer = subprocess.Popen(writer_cmd) + my_env = os.environ + my_env["SstVerbose"] = "1" + writer = subprocess.Popen(writer_cmd, env=my_env) start = time.time() timeout = time.time() + duration readers = [] diff --git a/thirdparty/EVPath/EVPath/cm.c b/thirdparty/EVPath/EVPath/cm.c index 3c0ba89d3e..f125ade809 100644 --- a/thirdparty/EVPath/EVPath/cm.c +++ b/thirdparty/EVPath/EVPath/cm.c @@ -108,7 +108,8 @@ struct CMtrans_services_s CMstatic_trans_svcs = {INT_CMmalloc, INT_CMrealloc, IN INT_CMConnection_dereference, INT_CMConnection_add_reference, INT_CMConnection_failed, - CMwake_server_thread + CMwake_server_thread, + INT_CMCondition_signal }; static void INT_CMControlList_close(CMControlList cl, CManager cm); static int CMcontrol_list_poll(CMControlList cl); @@ -1480,6 +1481,7 @@ INT_CMget_ip_config_diagnostics(CManager cm) /* we will await his respone */ conn->handshake_condition = INT_CMCondition_get(cm, conn); } + printf("(PID %ld) Sending a handshake write, waitcondition %d\n", (long) getpid(), conn->handshake_condition); actual = conn->trans->writev_func(&CMstatic_trans_svcs, conn->transport_data, &tmp_vec[0], 1, NULL); @@ -1493,17 +1495,54 @@ INT_CMget_ip_config_diagnostics(CManager cm) } if ((conn->remote_format_server_ID == 0) && reliable) { CMtrace_out(conn->cm, CMLowLevelVerbose, "CM - waiting for handshake response\n"); + printf("(PID %ld) Waiting for handshake, waitcondition %d\n", (long) getpid(), conn->handshake_condition); INT_CMCondition_wait(cm, conn->handshake_condition); } + printf("(PID %ld) Falling out of handshake wait\n", (long) getpid()); } +static +void +timeout_conn(CManager cm, void *client_data) +{ + printf("TIMEOUT CONNECTION in CM.c, signalling %ld\n", (long) client_data); + INT_CMCondition_fail(cm, (long) client_data); +} + static CMConnection try_conn_init(CManager cm, transport_entry trans, attr_list attrs) { - CMConnection conn; - conn = trans->initiate_conn(cm, &CMstatic_trans_svcs, - trans, attrs); + CMConnection conn = NULL; + if (trans->initiate_conn) { + conn = trans->initiate_conn(cm, &CMstatic_trans_svcs, + trans, attrs); + } else if (trans->initiate_conn_nonblocking) { + int result; + long wait_condition = INT_CMCondition_get(cm, NULL); + CMTaskHandle task = INT_CMadd_delayed_task(cm, 5, 0, timeout_conn, + (void*)wait_condition); + if (CMtrace_on(cm, CMConnectionVerbose)) { + char *attr_str = attr_list_to_string(attrs); + CMtrace_out(cm, CMConnectionVerbose, + "CM - Try to establish connection %p - %s, wait condition %ld\n", (void*)conn, + attr_str, wait_condition); + INT_CMfree(attr_str); + } + void *client_data = trans->initiate_conn_nonblocking(cm, &CMstatic_trans_svcs, + trans, attrs, wait_condition); + // Upon wake, condition will have either been signaled (return 1) or failed (return 0) + result = INT_CMCondition_wait(cm, wait_condition); + CMtrace_out(cm, CMConnectionVerbose, + "CM - CMConnection wait returned, result %d\n", result); + if (result == 1) { + INT_CMremove_task(task); + } + conn = trans->finalize_conn_nonblocking(cm, &CMstatic_trans_svcs, + trans, client_data, result); + } else { + assert(0); + } if (conn != NULL) { if (CMtrace_on(conn->cm, CMConnectionVerbose)) { char *attr_str = attr_list_to_string(attrs); @@ -2175,6 +2214,7 @@ INT_CMget_ip_config_diagnostics(CManager cm) remote_CManager_ID = ((int *) base)[1]; } + printf("(PID %ld) Got a handshake message\n", (long) getpid()); CMtrace_out(conn->cm, CMLowLevelVerbose, "CM - Received CONN handshake message\n"); if ((remote_CManager_ID & 0x80000000) == 0x80000000) { /* the other fellow already has our ID */ @@ -2199,9 +2239,11 @@ INT_CMget_ip_config_diagnostics(CManager cm) } } if (do_send) { + printf("(PID %ld) Responding to handshake\n", (long) getpid()); CMtrace_out(conn->cm, CMLowLevelVerbose, "CM - Sending CONN handshake message\n"); send_and_maybe_wait_for_handshake(conn->cm, conn); } else { + printf("(PID %ld) Don't need to respond to handshake\n", (long) getpid()); CMtrace_out(conn->cm, CMLowLevelVerbose, "CM - *NOT* Sending CONN handshake message\n"); } } diff --git a/thirdparty/EVPath/EVPath/cm_control.c b/thirdparty/EVPath/EVPath/cm_control.c index a2813cd312..72f0f5426b 100644 --- a/thirdparty/EVPath/EVPath/cm_control.c +++ b/thirdparty/EVPath/EVPath/cm_control.c @@ -120,6 +120,21 @@ CMconn_fail_conditions(CMConnection conn) } } +extern void +INT_CMCondition_fail(CManager cm, int condition) +{ + CMControlList cl = cm->control_list; + CMCondition cond = CMCondition_find(cl, condition); + if (!cond) return; + cond->failed = 1; + CMCondition_trigger(cm, cond, cm->control_list); + + if (cl->cond_polling) { + /* wake the server thread in case we're not him */ + CMwake_server_thread(cm); + } +} + void CMCondition_destroy(CMControlList cl, int condition) { diff --git a/thirdparty/EVPath/EVPath/cm_internal.h b/thirdparty/EVPath/EVPath/cm_internal.h index 829e03136d..351f400655 100644 --- a/thirdparty/EVPath/EVPath/cm_internal.h +++ b/thirdparty/EVPath/EVPath/cm_internal.h @@ -413,6 +413,7 @@ void INT_CMCondition_set_client_data(CManager cm, int condition, void *client_data); void *INT_CMCondition_get_client_data(CManager cm, int condition); int INT_CMCondition_wait(CManager cm, int condition); +extern void INT_CMCondition_fail(CManager cm, int condition); extern attr_list INT_CMget_contact_list(CManager cm); extern void INT_CMregister_non_CM_message_handler(int header, CMNonCMHandler handler); extern void *INT_CMtake_buffer(CManager cm, void *data); diff --git a/thirdparty/EVPath/EVPath/cm_transport.c b/thirdparty/EVPath/EVPath/cm_transport.c index 420bf7668f..ba6f179bcb 100644 --- a/thirdparty/EVPath/EVPath/cm_transport.c +++ b/thirdparty/EVPath/EVPath/cm_transport.c @@ -161,6 +161,10 @@ load_transport(CManager cm, const char *trans_name, int quiet) lt_dlsym(handle, "non_blocking_listen"); transport->initiate_conn = (CMConnection(*)()) lt_dlsym(handle, "initiate_conn"); + transport->initiate_conn_nonblocking = (CMTransport_NBconn_func) + lt_dlsym(handle, "initiate_conn_nonblocking"); + transport->finalize_conn_nonblocking = (CMConnection(*)()) + lt_dlsym(handle, "finalize_conn_nonblocking"); transport->self_check = (int (*)()) lt_dlsym(handle, "self_check"); transport->connection_eq = (int (*)()) lt_dlsym(handle, "connection_eq"); diff --git a/thirdparty/EVPath/EVPath/cm_transport.h b/thirdparty/EVPath/EVPath/cm_transport.h index 451925f357..69d9a331af 100644 --- a/thirdparty/EVPath/EVPath/cm_transport.h +++ b/thirdparty/EVPath/EVPath/cm_transport.h @@ -32,6 +32,7 @@ typedef void *(*CMTransport_malloc_func)(int); typedef void *(*CMTransport_realloc_func)(void*, int); typedef void (*CMTransport_free_func)(void*); typedef void (*CMTransport_wake_comm_thread_func)(CManager cm); +typedef void (*CMTransport_condition_signal_func)(CManager cm, int condition); typedef void (*select_list_func)(void *, void*); @@ -109,6 +110,7 @@ typedef struct CMtrans_services_s { CMTransport_connection_close connection_addref; CMTransport_connection_close connection_fail; CMTransport_wake_comm_thread_func wake_comm_thread; + CMTransport_condition_signal_func condition_signal; } *CMtrans_services; #define DROP_CM_LOCK(svc, cm) (svc)->drop_CM_lock((cm), __FILE__, __LINE__) #define ACQUIRE_CM_LOCK(svc, cm) (svc)->acquire_CM_lock((cm), __FILE__, __LINE__) @@ -145,6 +147,18 @@ typedef CMConnection (*CMTransport_conn_func)(CManager cm, transport_entry trans, attr_list attrs); +typedef CMConnection (*CMTransport_NBconn_func)(CManager cm, + CMtrans_services svc, + transport_entry trans, + attr_list attrs, + int condition); + +typedef CMConnection (*CMTransport_NBconn_final_func)(CManager cm, + CMtrans_services svc, + transport_entry trans, + void *client_data, + int result); + typedef int (*CMTransport_self_check_func)(CManager cm, CMtrans_services svc, transport_entry trans, @@ -182,6 +196,8 @@ struct _transport_item { CMTransport_func transport_init; CMTransport_listen_func listen; CMTransport_conn_func initiate_conn; + CMTransport_NBconn_func initiate_conn_nonblocking; + CMTransport_NBconn_final_func finalize_conn_nonblocking; CMTransport_self_check_func self_check; CMTransport_connection_eq_func connection_eq; CMTransport_shutdown_conn_func shutdown_conn; @@ -277,6 +293,18 @@ IP_get_diagnostics(CManager cm, CMTransport_trace trace_out); * establishing a periodic task that will check for data, etc.) * Generally, when data is available on a connection, a call to * trans->data_available() should be performed. + * - CMConnection initiate_conn_nonblocking(CManager cm, CMtrans_services svc, + * transport_entry trans, attr_list attrs, int condition); + * This routine should initiate a connection to the host/process + * specified by the attrs parameters. The return value is a + * CMConnection whose private data will be specific to this + * particular connection (which will be provided to routines + * below as 'conn_data'). The routine should also perform + * whatever tasks are necessary for servicing this connection + * (e.g. adding the appropriate FD to the select() list, + * establishing a periodic task that will check for data, etc.) + * Generally, when data is available on a connection, a call to + * trans->data_available() should be performed. * - int self_check(CManager cm, CMtrans_services svc, * transport_entry trans, attr_list attrs); * Because only the individual CMtransports can fully interpret diff --git a/thirdparty/EVPath/EVPath/cmenet.c b/thirdparty/EVPath/EVPath/cmenet.c index a4af8c995e..de0a147faf 100644 --- a/thirdparty/EVPath/EVPath/cmenet.c +++ b/thirdparty/EVPath/EVPath/cmenet.c @@ -11,6 +11,8 @@ #include #include #include +#include +#define gettid() pthread_self() #ifdef USE_ZPL_ENET #define ENET_IMPLEMENTATION @@ -121,6 +123,7 @@ typedef struct enet_client_data { CMTaskHandle periodic_handle; pthread_mutex_t enet_lock; int enet_locked; + struct enet_connection_data *pending_connections; } *enet_client_data_ptr; typedef struct enet_connection_data { @@ -138,6 +141,9 @@ typedef struct enet_connection_data { ENetPacket *packet; enet_client_data_ptr ecd; CMConnection conn; + attr_list conn_attr_list; + int connect_condition; + struct enet_connection_data *next_pending; } *enet_conn_data_ptr; static atom_t CM_PEER_IP = -1; @@ -167,15 +173,16 @@ INTERFACE_NAME(non_blocking_listen)(CManager cm, CMtrans_services svc, static void IntENET_lock(enet_client_data_ptr ecd, char *file, int line) { -// if (file) printf("Trying ENET Lock at %s, line %d\n", file, line); + if (file) printf("(PID %lx, TID %lx) Trying ENET Lock at %s, line %d\n", (long) getpid(), (long)gettid(), file, line); pthread_mutex_lock(&ecd->enet_lock); + if (file) printf("GOT ENET Lock at %s, line %d\n", file, line); ecd->enet_locked++; } static void IntENET_unlock(enet_client_data_ptr ecd, char *file, int line) { -// if (file) printf("ENET Unlock at %s, line %d\n", file, line); + if (file) printf("(PID %lx, TID %lx) ENET Unlock at %s, line %d\n", (long) getpid(), (long) gettid(), file, line); ecd->enet_locked--; pthread_mutex_unlock(&ecd->enet_lock); } @@ -275,12 +282,12 @@ enet_service_network(CManager cm, void *void_trans) while (ecd->server) { IntENET_lock(ecd, NULL, 0); int ret = enet_host_service (ecd->server, & event, 0); - IntENET_unlock(ecd, NULL, 0); if (enet_host_service_warn_interval && (enet_time_get() > (ecd->last_host_service_zero_return + enet_host_service_warn_interval))) { fprintf(stderr, "WARNING, time between zero return for enet_host_service = %d msecs\n", enet_time_get() - ecd->last_host_service_zero_return); } + IntENET_unlock(ecd, NULL, 0); if (ret <= 0) { break; } @@ -288,7 +295,27 @@ enet_service_network(CManager cm, void *void_trans) case ENET_EVENT_TYPE_NONE: break; case ENET_EVENT_TYPE_CONNECT: { - void *enet_connection_data; + enet_conn_data_ptr enet_connection_data = NULL; + if (event.peer->data) { + enet_conn_data_ptr last = NULL; + enet_connection_data = ecd->pending_connections; + while (enet_connection_data) { + if (enet_connection_data->peer == event.peer) { + if (last) { + last->next_pending = enet_connection_data->next_pending; + } else { + ecd->pending_connections = enet_connection_data->next_pending; + } + enet_connection_data->next_pending = NULL; + break; + } + enet_connection_data = enet_connection_data->next_pending; + } + } + if (enet_connection_data) { + svc->condition_signal(cm, enet_connection_data->connect_condition); + break; + } #ifndef USE_IPV6 struct in_addr addr; addr.s_addr = event.peer->address.host; @@ -308,14 +335,17 @@ enet_service_network(CManager cm, void *void_trans) svc->trace_out(cm, "That was IPV4 address %s\n", inet_ntoa(addr)); } #endif - enet_connection_data = enet_accept_conn(ecd, trans, &event.peer->address); + printf("(PID %lx, TID %lx) ACCEPTED CONNECTION, NEW PEER HAS OUTGOING SESSION ID %d\n", (long)getpid(), (long)getpid(), event.peer->outgoingSessionID); /* Store any relevant client information here. */ svc->trace_out(cm, "ENET ======== Assigning peer %p has data %p\n", event.peer, enet_connection_data); + enet_peer_timeout(event.peer, 0, 0, 200); event.peer->data = enet_connection_data; ((enet_conn_data_ptr)enet_connection_data)->peer = event.peer; - +// IntENET_lock(ecd, NULL, 0); +// enet_host_flush(ecd->server); +// IntENET_unlock(ecd, NULL, 0); break; } case ENET_EVENT_TYPE_RECEIVE: { @@ -344,17 +374,21 @@ enet_service_network(CManager cm, void *void_trans) #endif case ENET_EVENT_TYPE_DISCONNECT: { enet_conn_data_ptr enet_conn_data = (enet_conn_data_ptr) event.peer->data; - svc->trace_out(cm, "Got a disconnect on connection %p\n", - event.peer->data); + svc->trace_out(cm, "Got a disconnect on connection %p\n", event.peer->data); enet_conn_data = (enet_conn_data_ptr) event.peer->data; enet_conn_data->read_buffer_len = -1; - svc->connection_fail(enet_conn_data->conn); + if (enet_conn_data->conn) { + svc->connection_fail(enet_conn_data->conn); + } + break; + } + default: + printf("UNKNOWN EVENT TYPE! %d\n", event.type); + break; } - } } ecd->last_host_service_zero_return = enet_time_get(); - IntENET_unlock(ecd, NULL, 0); } static @@ -488,13 +522,18 @@ enet_accept_conn(enet_client_data_ptr ecd, transport_entry trans, enet_conn_data->remote_contact_port); #endif + printf("IN ENET_accept_conn, creating connection: "); + dump_attr_list(conn_attr_list); + printf("\n"); free_attr_list(conn_attr_list); /* * try flushing connection verify message here to make * sure it's established */ - enet_host_flush(ecd->server); +// ENETlock(ecd); +// enet_host_flush(ecd->server); +// ENETunlock(ecd); return enet_conn_data; } @@ -507,14 +546,18 @@ extern void INTERFACE_NAME(shutdown_conn)(CMtrans_services svc, enet_conn_data_ptr scd) { +// printf("(PID %lx) shutting down connection CONNECTION, old PEER HAS OUTGOING SESSION ID %d\n", (long)getpid(), scd->peer->outgoingSessionID); +// ENETlock(scd->ecd); +// enet_peer_disconnect_later(scd->peer, 0) ; +// ENETunlock(scd->ecd); svc->connection_deref(scd->conn); if (scd->remote_host) free(scd->remote_host); free(scd); } -static int -initiate_conn(CManager cm, CMtrans_services svc, transport_entry trans, +static void * +enet_initiate_conn(CManager cm, CMtrans_services svc, transport_entry trans, attr_list attrs, enet_conn_data_ptr enet_conn_data, attr_list conn_attr_list) { @@ -527,7 +570,11 @@ initiate_conn(CManager cm, CMtrans_services svc, transport_entry trans, #endif struct in_addr sin_addr; (void)conn_attr_list; - int timeout = 5000; /* connection time out default 5 seconds */ + int timeout = 200; + + if (!(CM_LOCKED(svc, ecd->cm))) { + printf("Enet service network, CManager not locked in enet_initiate_conn\n"); + } if (!query_attr(attrs, CM_ENET_HOSTNAME, /* type pointer */ NULL, /* value pointer */ (attr_value *)(long) & host_name)) { @@ -573,7 +620,6 @@ initiate_conn(CManager cm, CMtrans_services svc, transport_entry trans, /* ENET connection, host_name is the machine name */ ENetAddress address; - ENetEvent event; ENetPeer *peer; sin_addr.s_addr = htonl(host_ip); @@ -587,7 +633,7 @@ initiate_conn(CManager cm, CMtrans_services svc, transport_entry trans, int_port_num); #else char straddr[INET6_ADDRSTRLEN]; - inet_ntop(AF_INET6, &event.peer->address.host, straddr, + inet_ntop(AF_INET6, &address.host, straddr, sizeof(straddr)); svc->trace_out(cm, "Attempting ENET RUDP connection, USING host=\"%s\", IP = %s, port %d", host_name == 0 ? "(unknown)" : host_name, @@ -600,6 +646,7 @@ initiate_conn(CManager cm, CMtrans_services svc, transport_entry trans, svc->trace_out(cm, "Attempting ENET RUDP connection, USING IP = %s, port %d", inet_ntoa(sin_addr), int_port_num); + printf("(PID %lx, TID %lx) CALLING ENET_HOST_CONNECT on ADDRESS %s, port %d\n", (long)getpid(), (long)gettid(), inet_ntoa(sin_addr), int_port_num); #else char straddr[INET6_ADDRSTRLEN]; ((enet_uint32 *)&host_ipv6.s6_addr)[0] = 0; @@ -608,6 +655,7 @@ initiate_conn(CManager cm, CMtrans_services svc, transport_entry trans, ((enet_uint32 *)&host_ipv6.s6_addr)[3] = htonl(host_ip); inet_ntop(AF_INET6, &host_ipv6, straddr, sizeof(straddr)); + printf("(PID %lx, TID %lx) CALLING ENET_HOST_CONNECT on ADDRESS %s, port %d\n", (long)getpid(), (long)gettid(), straddr, int_port_num); svc->trace_out(cm, "Attempting ENET RUDP connection, USING host=\"%s\", IP = %s, port %d", host_name == 0 ? "(unknown)" : host_name, @@ -636,129 +684,23 @@ initiate_conn(CManager cm, CMtrans_services svc, transport_entry trans, exit (EXIT_FAILURE); } - enet_peer_timeout(peer, 0, 0, 5000); + enet_peer_timeout(peer, 0, 0, 200); ENETunlock(ecd); peer->data = enet_conn_data; - svc->trace_out(cm, "ENET ======== On init Assigning peer %p has data %p\n", peer, enet_conn_data); - - /* Wait up to 'timeout' milliseconds for the connection attempt to succeed. */ - int finished = 0; - int got_connection = 0; - enet_uint32 end = enet_time_get() + timeout; - while (!finished) { - ENETlock(ecd); - int ret = enet_host_service (ecd->server, & event, 100); - ENETunlock(ecd); - enet_uint32 now = enet_time_get(); - if (enet_host_service_warn_interval && - (enet_time_get() > (ecd->last_host_service_zero_return + enet_host_service_warn_interval))) { - fprintf(stderr, "WARNING, time between zero return for enet_host_service = %d msecs\n", - enet_time_get() - ecd->last_host_service_zero_return); - } - if (now > end) { - finished = 1; - } - if (ret <= 0) { - ecd->last_host_service_zero_return = enet_time_get(); - continue; - } - switch(event.type) { - case ENET_EVENT_TYPE_CONNECT: { - if (event.peer != peer) { - enet_conn_data_ptr enet_connection_data; -#ifndef USE_IPV6 - struct in_addr addr; - addr.s_addr = event.peer->address.host; - svc->trace_out(cm, "A new client connected from %s:%u.\n", - inet_ntoa(addr), - event.peer->address.port); -#else - char straddr[INET6_ADDRSTRLEN]; - inet_ntop(AF_INET6, &event.peer->address.host, straddr, - sizeof(straddr)); - svc->trace_out(cm, "A new client connected from %s:%u.\n", - &straddr[0], - event.peer->address.port); - enet_peer_timeout(event.peer, 0, 0, 5000); -#endif - enet_connection_data = (enet_conn_data_ptr) enet_accept_conn(ecd, trans, &event.peer->address); - - /* Store any relevant client information here. */ - svc->trace_out(cm, "ENET ======== Assigning peer %p has data %p\n", event.peer, enet_connection_data); - event.peer->data = enet_connection_data; - ((enet_conn_data_ptr)enet_connection_data)->peer = event.peer; - ENETlock(ecd); - enet_host_flush (ecd->server); - ENETunlock(ecd); - } else { - ENETlock(ecd); - enet_host_flush (ecd->server); - ENETunlock(ecd); - svc->trace_out(cm, "Connection to %s:%d succeeded.\n", inet_ntoa(sin_addr), address.port); - finished = 1; - got_connection = 1; - } - break; - } - case ENET_EVENT_TYPE_NONE: - break; -#ifdef USE_ZPL_ENET - case ENET_EVENT_TYPE_DISCONNECT_TIMEOUT: -#endif - case ENET_EVENT_TYPE_DISCONNECT: - if (event.peer == peer) { - enet_peer_reset (peer); - - svc->trace_out(cm, "Connection to %s:%d failed type was %d.\n", inet_ntoa(sin_addr), address.port, event.type); - return 0; - } else { - enet_conn_data_ptr enet_conn_data = (enet_conn_data_ptr) event.peer->data; - svc->trace_out(cm, "Got a disconnect on connection %p\n", - event.peer->data); - - enet_conn_data = (enet_conn_data_ptr) event.peer->data; - enet_conn_data->read_buffer_len = -1; - svc->connection_fail(enet_conn_data->conn); - } - break; - case ENET_EVENT_TYPE_RECEIVE: { - enet_conn_data_ptr econn_d = (enet_conn_data_ptr) event.peer->data; - queued_data entry = (queued_data) malloc(sizeof(*entry)); - entry->next = NULL; - entry->econn_d = econn_d; - entry->packet = event.packet; - /* add at the end */ - if (econn_d->ecd->pending_data == NULL) { - econn_d->ecd->pending_data = entry; - } else { - queued_data last = econn_d->ecd->pending_data; - while (last->next != NULL) { - last = last->next; - } - last->next = entry; - } - break; - } - } - } - - if (!got_connection) { - svc->trace_out(cm, "--> Connection failed because of timeout"); - return 0; - } - svc->trace_out(cm, "--> Connection established\n"); enet_conn_data->remote_host = host_name == NULL ? NULL : strdup(host_name); #ifndef USE_IPV6 enet_conn_data->remote_IP = htonl(host_ip); #else memcpy(&enet_conn_data->remote_IP, &host_ipv6, sizeof(enet_conn_data->remote_IP)); - enet_conn_data->remote_IPv4 = htonl(host_ip); + enet_conn_data->remote_IPv4 = htonl(((enet_uint32 *)&host_ipv6.s6_addr)[3]); #endif enet_conn_data->remote_contact_port = int_port_num; enet_conn_data->ecd = ecd; enet_conn_data->peer = peer; peer->data = enet_conn_data; - return 1; + svc->trace_out(cm, "ENET ======== On init Assigning peer %p has data %p moving to wait phase\n", peer, enet_conn_data); + + return enet_conn_data; } /* @@ -769,22 +711,70 @@ extern "C" #else extern #endif -CMConnection -INTERFACE_NAME(initiate_conn)(CManager cm, CMtrans_services svc, - transport_entry trans, attr_list attrs) +void * +INTERFACE_NAME(initiate_conn_nonblocking)(CManager cm, CMtrans_services svc, + transport_entry trans, attr_list attrs, int connect_condition) { enet_conn_data_ptr enet_conn_data = create_enet_conn_data(svc); attr_list conn_attr_list = create_attr_list(); + enet_conn_data_ptr ret; + + enet_conn_data->conn_attr_list = conn_attr_list; + enet_conn_data->connect_condition = connect_condition; + ret = enet_initiate_conn(cm, svc, trans, attrs, enet_conn_data, conn_attr_list); + if (ret) { + enet_client_data_ptr ecd = (enet_client_data_ptr) trans->trans_data; + ret->next_pending = ecd->pending_connections; + ecd->pending_connections = ret; + } + return (void*)ret; +} + +/* + * Initiate a ENET RUDP connection with another CM. + */ +#ifdef __cplusplus +extern "C" +#else +extern +#endif +CMConnection +INTERFACE_NAME(finalize_conn_nonblocking)(CManager cm, CMtrans_services svc, + transport_entry trans, void *client_data, int result) +{ + enet_client_data_ptr ecd = (enet_client_data_ptr) trans->trans_data; + enet_conn_data_ptr final_conn_data = (enet_conn_data_ptr) client_data; + enet_conn_data_ptr last = NULL, enet_conn_data = ecd->pending_connections; CMConnection conn; + attr_list conn_attr_list = final_conn_data->conn_attr_list; + + if (!result) { + while (enet_conn_data) { + if (enet_conn_data == final_conn_data) { + if (last) { + last->next_pending = enet_conn_data->next_pending; + } else { + ecd->pending_connections = enet_conn_data->next_pending; + } + enet_conn_data->next_pending = NULL; + printf("REMOVE PENDING\n"); + break; + } + last = enet_conn_data; + enet_conn_data = enet_conn_data->next_pending; + } - if (!initiate_conn(cm, svc, trans, attrs, enet_conn_data, conn_attr_list)) - return NULL; + free_attr_list(conn_attr_list); + free(enet_conn_data); + return NULL; + } add_attr(conn_attr_list, CM_PEER_LISTEN_PORT, Attr_Int4, - (attr_value) (long)enet_conn_data->remote_contact_port); - conn = svc->connection_create(trans, enet_conn_data, conn_attr_list); - enet_conn_data->conn = conn; + (attr_value) (long)final_conn_data->remote_contact_port); + conn = svc->connection_create(trans, final_conn_data, conn_attr_list); + final_conn_data->conn = conn; free_attr_list(conn_attr_list); + final_conn_data->conn_attr_list = NULL; svc->connection_addref(conn); /* one ref count went to CM, the other to the user */ @@ -873,28 +863,29 @@ INTERFACE_NAME(connection_eq)(CManager cm, CMtrans_services svc, (void) trans; if (!query_attr(attrs, CM_ENET_HOSTNAME, /* type pointer */ NULL, /* value pointer */ (attr_value *)(long) & host_name)) { - svc->trace_out(cm, "CMEnet transport found no CM_ENET_HOST attribute"); + printf("CMEnet transport found no CM_ENET_HOST attribute\n"); } if (!query_attr(attrs, CM_ENET_PORT, /* type pointer */ NULL, /* value pointer */ (attr_value *)(long) & int_port_num)) { - svc->trace_out(cm, "Conn Eq CMenet transport found no CM_ENET_PORT attribute"); + printf("Conn Eq CMenet transport found no CM_ENET_PORT attribute, RETURNING FALSE\n"); return 0; } if (!query_attr(attrs, CM_ENET_ADDR, /* type pointer */ NULL, /* value pointer */ (attr_value *)(long) & requested_IP)) { - svc->trace_out(cm, "CMENET transport found no CM_ENET_ADDR attribute"); + printf("CMENET transport found no CM_ENET_ADDR attribute\n"); } if (requested_IP == -1) { check_host(host_name, (void *) &requested_IP); requested_IP = ntohl(requested_IP); struct in_addr addr; addr.s_addr = htonl(requested_IP); - svc->trace_out(cm, "IP translation for hostname %s is %s", host_name, + printf("IP translation for hostname %s is %s\n", host_name, inet_ntoa(addr)); } /* requested IP is in host byte order */ + printf("(PID %lx) CONN EQ PEER HAS OUTGOING SESSION ID %d\n", (long)getpid(), ecd->peer->outgoingSessionID); if (ecd->peer->state != ENET_PEER_STATE_CONNECTED) { - svc->trace_out(cm, "ENET Conn_eq returning FALSE, peer not connected"); + printf("ENET Conn_eq returning FALSE, peer not connected, state %d\n", ecd->peer->state); return 0; } struct in_addr addr1, addr2; @@ -904,7 +895,7 @@ INTERFACE_NAME(connection_eq)(CManager cm, CMtrans_services svc, addr1.s_addr = htonl(ecd->remote_IP); #endif addr2.s_addr = htonl(requested_IP); - svc->trace_out(cm, "ENET Conn_eq comparing IP/ports %s/%d and %s/%d", + printf("ENET Conn_eq comparing IP/ports %s/%d and %s/%d\n", inet_ntoa(addr1), ecd->remote_contact_port, inet_ntoa(addr2), int_port_num); #ifdef USE_IPV6 @@ -913,10 +904,10 @@ INTERFACE_NAME(connection_eq)(CManager cm, CMtrans_services svc, if ((ecd->remote_IP == requested_IP) && /* both in host byte order */ #endif (ecd->remote_contact_port == int_port_num)) { - svc->trace_out(cm, "ENET Conn_eq returning TRUE"); + printf("ENET Conn_eq returning TRUE\n"); return 1; } - svc->trace_out(cm, "ENET Conn_eq returning FALSE"); + printf("ENET Conn_eq returning FALSE\n"); return 0; } @@ -994,6 +985,9 @@ INTERFACE_NAME(non_blocking_listen)(CManager cm, CMtrans_services svc, int attr_port_num = 0; u_short port_num = 0; + if (!(CM_LOCKED(svc, cm))) { + printf("ENET non_blocking listen, CManager not locked\n"); + } /* * Check to see if a bind to a specific port was requested */ @@ -1023,7 +1017,31 @@ INTERFACE_NAME(non_blocking_listen)(CManager cm, CMtrans_services svc, return NULL; } } - if (port_num != 0) { + attr_port_num = -1; + if (attr_port_num == -1) { + /* Bind the server to the default localhost. */ + /* A specific host address can be specified by */ + /* enet_address_set_host (& address, "x.x.x.x"); */ + + address.port = 0; + + svc->trace_out(cm, "CMEnet trying to bind selected port %d", port_num); + ENETlock(ecd); + server = enet_host_create (& address /* the address to bind the server host to */, + MAX_CLIENTS, /* max 4095 connections */ + 1 /* allow up to 2 channels to be used, 0 and 1 */, + 0 /* assume any amount of incoming bandwidth */, + 0 /* assume any amount of outgoing bandwidth */); + address.port = server->address.port; + printf("LISTENING ON PORT %d\n", address.port); + ENETunlock(ecd); + if (server == NULL) { + fprintf (stderr, + "An error occurred while trying to create an ENet server host.\n"); + return NULL; + } + ecd->server = server; + } else if (port_num != 0) { /* Bind the server to the default localhost. */ /* A specific host address can be specified by */ /* enet_address_set_host (& address, "x.x.x.x"); */ @@ -1174,17 +1192,21 @@ INTERFACE_NAME(writev_func)(CMtrans_services svc, enet_conn_data_ptr ecd, { int i; int length = 0; - static time_t last_flush_call = 0; +// static time_t last_flush_call = 0; (void) attrs; for (i = 0; i < iovcnt; i++) { length += iov[i].iov_len; } + printf("(PID %lx, TID %lx) writev PEER %p HAS OUTGOING SESSION ID %d\n", (long)getpid(), (long)gettid(), ecd->peer, ecd->peer->outgoingSessionID); svc->trace_out(ecd->ecd->cm, "CMENET vector write of %d bytes on peer %p", length, ecd->peer); /* Create a reliable packet of the right size */ + if (!(CM_LOCKED(svc, ecd->ecd->cm))) { + printf("ENET writev, CManager not locked\n"); + } ENETlock(ecd->ecd); ENetPacket * packet = enet_packet_create (NULL, length, ENET_PACKET_FLAG_RELIABLE); @@ -1208,18 +1230,18 @@ INTERFACE_NAME(writev_func)(CMtrans_services svc, enet_conn_data_ptr ecd, wake_enet_server_thread(ecd->ecd); - ENETlock(ecd->ecd); - if (last_flush_call == 0) { - enet_host_flush(ecd->ecd->server); - last_flush_call = time(NULL); - } else { - time_t now = time(NULL); - if (now > last_flush_call) { - last_flush_call = now; - enet_host_flush(ecd->ecd->server); - } - } - ENETunlock(ecd->ecd); + /* ENETlock(ecd->ecd); */ + /* if (last_flush_call == 0) { */ + /* enet_host_flush(ecd->ecd->server); */ + /* last_flush_call = time(NULL); */ + /* } else { */ + /* time_t now = time(NULL); */ + /* if (now > last_flush_call) { */ + /* last_flush_call = now; */ + /* enet_host_flush(ecd->ecd->server); */ + /* } */ + /* } */ + /* ENETunlock(ecd->ecd); */ return iovcnt; } @@ -1301,6 +1323,7 @@ INTERFACE_NAME(initialize)(CManager cm, CMtrans_services svc, fprintf(stderr, "DEBUG: Setting enet_host_service_warn_interval to %d\n", enet_host_service_warn_interval); } enet_data = (enet_client_data_ptr) svc->malloc_func(sizeof(struct enet_client_data)); + memset(enet_data, 0, sizeof(struct enet_client_data)); pthread_mutex_init(&enet_data->enet_lock, NULL); enet_data->enet_locked = 0; enet_data->cm = cm; @@ -1338,7 +1361,9 @@ extern transport_entry cmenet_add_static_transport(CManager cm, CMtrans_services transport->cm = cm; transport->transport_init = (CMTransport_func)INTERFACE_NAME(initialize); transport->listen = (CMTransport_listen_func)INTERFACE_NAME(non_blocking_listen); - transport->initiate_conn = (CMTransport_conn_func)INTERFACE_NAME(initiate_conn); + transport->initiate_conn = NULL; + transport->initiate_conn_nonblocking = (CMTransport_NBconn_func)INTERFACE_NAME(initiate_conn_nonblocking); + transport->finalize_conn_nonblocking = (CMTransport_NBconn_final_func)INTERFACE_NAME(finalize_conn_nonblocking); transport->self_check = (CMTransport_self_check_func)INTERFACE_NAME(self_check); transport->connection_eq = (CMTransport_connection_eq_func)INTERFACE_NAME(connection_eq); transport->shutdown_conn = (CMTransport_shutdown_conn_func)INTERFACE_NAME(shutdown_conn); diff --git a/thirdparty/EVPath/EVPath/evp.c b/thirdparty/EVPath/EVPath/evp.c index b08a6ba456..61a85c28c0 100644 --- a/thirdparty/EVPath/EVPath/evp.c +++ b/thirdparty/EVPath/EVPath/evp.c @@ -2096,6 +2096,7 @@ do_bridge_action(CManager cm, int s) stone = stone_struct(evp, s); if (stone->is_frozen || (stone->is_draining == 2)) return 0; + if (stone->is_outputting) return 0; stone->is_outputting = 1; for (a=0 ; a < stone->proto_action_count && stone->is_frozen == 0 && (stone->is_draining != 2); a++) { if (stone->proto_actions[a].action_type == Action_Bridge) { diff --git a/thirdparty/EVPath/EVPath/zpl-enet/include/enet.h b/thirdparty/EVPath/EVPath/zpl-enet/include/enet.h index bc1df0edbb..14d3242d10 100644 --- a/thirdparty/EVPath/EVPath/zpl-enet/include/enet.h +++ b/thirdparty/EVPath/EVPath/zpl-enet/include/enet.h @@ -1,5 +1,6 @@ /* * THIS FILE HAS BEEN MODIFIED SO THAT ALL API FUNCTIONS ARE STATIC FOR INCLUDE-ONLY USAGE. + * ALSO ADDED FALLBACK non-atomic ENET_ATOMIC_READ and ENET_ATOMIC_CAS * Greg Eisenhauer, Georgia Tech College of Computing. Tue Nov 5 10:59:29 EST 2019 */ @@ -1195,7 +1196,18 @@ extern "C" { #undef AT_HAVE_ATOMICS #endif /* defined(_MSC_VER) */ - +#ifndef ENET_ATOMIC_READ +// On a compiler that supports none of the above? Do without atomics... +#define ENET_ATOMIC_READ(variable) (*(uint64_t*)(variable)) +static uint64_t ENET_SIMPLE_CAS(uint64_t *ptr, uint64_t oldvalue, uint64_t newvalue) +{ + uint64_t temp = *ptr; + if(*ptr == oldvalue) + *ptr = newvalue; + return temp; +} +#define ENET_ATOMIC_CAS(ptr, oldvalue, newvalue) ENET_SIMPLE_CAS((ptr), (oldvalue), (newvalue)) +#endif // =======================================================================// // ! @@ -2428,6 +2440,7 @@ extern "C" { (peer->outgoingPeerID < ENET_PROTOCOL_MAXIMUM_PEER_ID && sessionID != peer->incomingSessionID) ) { + printf("============================= POSSIBLE SESSION ID DISCARD, %d, %d\n", sessionID, peer->incomingSessionID); return 0; } }