Skip to content

Commit

Permalink
Rendezvous after receiving "exit" notification
Browse files Browse the repository at this point in the history
  • Loading branch information
MaskRay committed Dec 13, 2018
1 parent 0fac577 commit b95393e
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 17 deletions.
4 changes: 2 additions & 2 deletions src/messages/initialize.cc
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ void *Indexer(void *arg_) {
std::string name = "indexer" + std::to_string(idx);
set_thread_name(name.c_str());
pipeline::Indexer_Main(h->manager, h->vfs, h->project, h->wfiles);
ThreadLeave();
return nullptr;
}
} // namespace
Expand Down Expand Up @@ -365,7 +366,6 @@ void MessageHandler::shutdown(EmptyParam &, ReplyOnce &reply) {
}

void MessageHandler::exit(EmptyParam &) {
// FIXME cancel threads
::exit(0);
pipeline::quit = true;
}
} // namespace ccls
51 changes: 41 additions & 10 deletions src/pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ using namespace llvm;
#endif

namespace ccls {
#if defined(__unix__) || defined(__APPLE__)
extern std::mutex thread_mtx;
extern std::condition_variable no_pending_threads;
extern int pending_threads;

void WaitAll() {
std::unique_lock lock(thread_mtx);
no_pending_threads.wait(lock, [] { return !pending_threads; });
}
#else
void WaitAll() {}
#endif

namespace {
struct PublishDiagnosticParam {
DocumentUri uri;
Expand Down Expand Up @@ -77,6 +90,7 @@ void StandaloneInitialize(MessageHandler &, const std::string &root);

namespace pipeline {

std::atomic<bool> quit;
std::atomic<int64_t> loaded_ts = ATOMIC_VAR_INIT(0),
pending_index_requests = ATOMIC_VAR_INIT(0);
int64_t tick = 0;
Expand Down Expand Up @@ -361,6 +375,14 @@ bool Indexer_Parse(SemaManager *completion, WorkingFiles *wfiles,
return true;
}

void Quit(SemaManager &manager) {
quit = true;
manager.Quit();
indexer_waiter->cv.notify_all();
stdout_waiter->cv.notify_one();
WaitAll();
}

} // namespace

void Init() {
Expand All @@ -380,7 +402,8 @@ void Indexer_Main(SemaManager *manager, VFS *vfs, Project *project,
GroupMatch matcher(g_config->index.whitelist, g_config->index.blacklist);
while (true)
if (!Indexer_Parse(manager, wfiles, project, vfs, matcher))
indexer_waiter->Wait(index_request);
if (indexer_waiter->Wait(quit, index_request))
break;
}

void Main_OnIndexed(DB *db, WorkingFiles *wfiles, IndexUpdate *update) {
Expand Down Expand Up @@ -419,6 +442,7 @@ void Main_OnIndexed(DB *db, WorkingFiles *wfiles, IndexUpdate *update) {
}

void LaunchStdin() {
ThreadEnter();
std::thread([]() {
set_thread_name("stdin");
std::string str;
Expand Down Expand Up @@ -469,12 +493,13 @@ void LaunchStdin() {
if (method == "exit")
break;
}
})
.detach();
}).detach();
ThreadLeave();
}

void LaunchStdout() {
std::thread([=]() {
ThreadEnter();
std::thread([]() {
set_thread_name("stdout");

while (true) {
Expand All @@ -483,10 +508,11 @@ void LaunchStdout() {
llvm::outs() << "Content-Length: " << s.size() << "\r\n\r\n" << s;
llvm::outs().flush();
}
stdout_waiter->Wait(for_stdout);
if (stdout_waiter->Wait(quit, for_stdout))
break;
}
})
.detach();
}).detach();
ThreadLeave();
}

void MainLoop() {
Expand Down Expand Up @@ -540,16 +566,20 @@ void MainLoop() {
Main_OnIndexed(&db, &wfiles, &*update);
}

if (did_work)
if (did_work) {
has_indexed |= indexed;
else {
if (quit)
break;
} else {
if (has_indexed) {
FreeUnusedMemory();
has_indexed = false;
}
main_waiter->Wait(on_indexed, on_request);
main_waiter->Wait(quit, on_indexed, on_request);
}
}

Quit(manager);
}

void Standalone(const std::string &root) {
Expand Down Expand Up @@ -590,6 +620,7 @@ void Standalone(const std::string &root) {
}
if (tty)
puts("");
Quit(manager);
}

void Index(const std::string &path, const std::vector<const char *> &args,
Expand Down
1 change: 1 addition & 0 deletions src/pipeline.hh
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ enum class IndexMode {
};

namespace pipeline {
extern std::atomic<bool> quit;
extern std::atomic<int64_t> loaded_ts, pending_index_requests;
extern int64_t tick;
void Init();
Expand Down
4 changes: 4 additions & 0 deletions src/platform.hh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
#include <string_view>
#include <vector>

namespace ccls {
std::string NormalizePath(const std::string &path);

// Free any unused memory and return it to the system.
Expand All @@ -31,3 +32,6 @@ std::string GetExternalCommandOutput(const std::vector<std::string> &command,
std::string_view input);

void SpawnThread(void *(*fn)(void *), void *arg);
void ThreadEnter();
void ThreadLeave();
} // namespace ccls
23 changes: 21 additions & 2 deletions src/platform_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,12 @@ limitations under the License.
#include <llvm/ADT/SmallString.h>
#include <llvm/Support/Path.h>

#include <atomic>
#include <condition_variable>
#include <mutex>
#include <string>

namespace ccls {
std::string NormalizePath(const std::string &path) {
llvm::SmallString<256> P(path);
llvm::sys::path::remove_dots(P, true);
Expand Down Expand Up @@ -108,19 +112,34 @@ std::string GetExternalCommandOutput(const std::vector<std::string> &command,
return ret;
}

std::mutex thread_mtx;
std::condition_variable no_pending_threads;
int pending_threads;

void SpawnThread(void *(*fn)(void *), void *arg) {
pthread_t thd;
pthread_attr_t attr;
struct rlimit rlim;
size_t stack_size = 4 * 1024 * 1024;
if (getrlimit(RLIMIT_STACK, &rlim) == 0 &&
rlim.rlim_cur != RLIM_INFINITY)
if (getrlimit(RLIMIT_STACK, &rlim) == 0 && rlim.rlim_cur != RLIM_INFINITY)
stack_size = rlim.rlim_cur;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_attr_setstacksize(&attr, stack_size);
ThreadEnter();
pthread_create(&thd, &attr, fn, arg);
pthread_attr_destroy(&attr);
}

void ThreadEnter() {
std::lock_guard lock(thread_mtx);
pending_threads++;
}
void ThreadLeave() {
std::lock_guard lock(thread_mtx);
if (!--pending_threads)
no_pending_threads.notify_one();
}
} // namespace ccls

#endif
5 changes: 5 additions & 0 deletions src/platform_win.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ limitations under the License.
#include <string>
#include <thread>

namespace ccls {
std::string NormalizePath(const std::string &path) {
DWORD retval = 0;
TCHAR buffer[MAX_PATH] = TEXT("");
Expand Down Expand Up @@ -65,4 +66,8 @@ void SpawnThread(void *(*fn)(void *), void *arg) {
std::thread(fn, arg).detach();
}

void ThreadEnter() {}
void ThreadLeave() {}
}

#endif
18 changes: 18 additions & 0 deletions src/sema_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
#include "clang_tu.hh"
#include "filesystem.hh"
#include "log.hh"
#include "pipeline.hh"
#include "platform.hh"

#include <clang/Lex/PreprocessorOptions.h>
Expand Down Expand Up @@ -403,6 +404,8 @@ void *PreambleMain(void *manager_) {
set_thread_name("preamble");
while (true) {
SemaManager::PreambleTask task = manager->preamble_tasks.Dequeue();
if (pipeline::quit)
break;

bool created = false;
std::shared_ptr<Session> session =
Expand All @@ -424,6 +427,7 @@ void *PreambleMain(void *manager_) {
manager->ScheduleDiag(task.path, debounce);
}
}
ThreadLeave();
return nullptr;
}

Expand All @@ -432,6 +436,8 @@ void *CompletionMain(void *manager_) {
set_thread_name("comp");
while (true) {
std::unique_ptr<SemaManager::CompTask> task = manager->comp_tasks.Dequeue();
if (pipeline::quit)
break;

// Drop older requests if we're not buffering.
while (g_config->completion.dropOldRequests &&
Expand All @@ -440,6 +446,8 @@ void *CompletionMain(void *manager_) {
task->Consumer.reset();
task->on_complete(nullptr);
task = manager->comp_tasks.Dequeue();
if (pipeline::quit)
break;
}

std::shared_ptr<Session> session = manager->EnsureSession(task->path);
Expand Down Expand Up @@ -480,6 +488,7 @@ void *CompletionMain(void *manager_) {

task->on_complete(&Clang->getCodeCompletionConsumer());
}
ThreadLeave();
return nullptr;
}

Expand Down Expand Up @@ -516,6 +525,8 @@ void *DiagnosticMain(void *manager_) {
set_thread_name("diag");
while (true) {
SemaManager::DiagTask task = manager->diag_tasks.Dequeue();
if (pipeline::quit)
break;
int64_t wait = task.wait_until -
chrono::duration_cast<chrono::milliseconds>(
chrono::high_resolution_clock::now().time_since_epoch())
Expand Down Expand Up @@ -627,6 +638,7 @@ void *DiagnosticMain(void *manager_) {
}
manager->on_diagnostic_(task.path, ls_diags);
}
ThreadLeave();
return nullptr;
}

Expand Down Expand Up @@ -704,4 +716,10 @@ void SemaManager::Clear() {
std::lock_guard lock(mutex);
sessions.Clear();
}

void SemaManager::Quit() {
comp_tasks.PushBack(nullptr);
diag_tasks.PushBack({});
preamble_tasks.PushBack({});
}
} // namespace ccls
3 changes: 2 additions & 1 deletion src/sema_manager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ struct SemaManager {
void OnClose(const std::string &path);
std::shared_ptr<ccls::Session> EnsureSession(const std::string &path,
bool *created = nullptr);
void Clear(void);
void Clear();
void Quit();

// Global state.
Project *project_;
Expand Down
9 changes: 7 additions & 2 deletions src/threaded_queue.hh
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,15 @@ struct MultiQueueWaiter {
return false;
}

template <typename... BaseThreadQueue> void Wait(BaseThreadQueue... queues) {
template <typename... BaseThreadQueue>
bool Wait(std::atomic<bool> &quit, BaseThreadQueue... queues) {
MultiQueueLock<BaseThreadQueue...> l(queues...);
while (!HasState({queues...}))
while (!quit) {
if (HasState({queues...}))
return false;
cv.wait(l);
}
return true;
}
};

Expand Down

0 comments on commit b95393e

Please sign in to comment.