Skip to content

Commit

Permalink
DownloaderCURL: fix too many open files and other fixes (axmolengine#…
Browse files Browse the repository at this point in the history
…2182)

* DownloaderCURL: fix too many open file handles and other fixes

* Downloader: remove unused transferDataToBuffer

* DownloaderCURL: remove unused variables

* DownloadTaskCURL: add explicit for constructor
  • Loading branch information
smilediver authored Sep 24, 2024
1 parent 2bf320e commit 5eab46f
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 101 deletions.
172 changes: 84 additions & 88 deletions core/network/Downloader-curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,23 +75,26 @@ class DownloadTaskCURL : public IDownloadTask

// if more than one task write to one file, cause file broken
// so use a set to check this situation
static std::mutex _sStoragePathSetMutex;
static std::set<std::string> _sStoragePathSet;

public:
int serialId;
DownloaderCURL& owner;

DownloadTaskCURL(DownloaderCURL& o) : serialId(_sSerialId++), owner(o), _requestHeaders(nullptr)

explicit DownloadTaskCURL(DownloaderCURL& o) : serialId(_sSerialId++), owner(o)
{
_initInternal();
AXLOGD("Construct DownloadTaskCURL {}", fmt::ptr(this));
}

virtual ~DownloadTaskCURL()
{
AXLOGD("Destruct DownloadTaskCURL {}", fmt::ptr(this));

if (_errCode != DownloadTask::ERROR_TASK_DUPLICATED)
{
auto lock = std::lock_guard<std::mutex>(_sStoragePathSetMutex);

// if task destroyed unnormally, we should release WritenFileName stored in set.
// Normally, this action should done when task finished.
if (_tempFileName.length() && _sStoragePathSet.end() != _sStoragePathSet.find(_tempFileName))
Expand All @@ -102,37 +105,42 @@ class DownloadTaskCURL : public IDownloadTask

_fs.reset();
_fsMd5.reset();

if (_requestHeaders)
curl_slist_free_all(_requestHeaders);

AXLOGD("Destruct DownloadTaskCURL {}", fmt::ptr(this));
}

bool init(std::string_view filename, std::string_view tempSuffix)
{
if (0 == filename.length())
{
// data task
_buf.reserve(CURL_MAX_WRITE_SIZE);
// data task
if (filename.empty())
return true;
}

// file task
_fileName = filename;
_tempFileName = filename;
_tempFileName.append(tempSuffix);

if (_sStoragePathSet.end() != _sStoragePathSet.find(_tempFileName))
{
// there is another task uses this storage path
_errCode = DownloadTask::ERROR_TASK_DUPLICATED;
_errCodeInternal = 0;
_errDescription = "More than one download file task write to same file:";
_errDescription.append(_tempFileName);
return false;
auto lock = std::lock_guard<std::mutex>(_sStoragePathSetMutex);

if (_sStoragePathSet.end() != _sStoragePathSet.find(_tempFileName))
{
// there is another task uses this storage path
_errCode = DownloadTask::ERROR_TASK_DUPLICATED;
_errCodeInternal = 0;
_errDescription = "More than one download file task write to same file:";
_errDescription.append(_tempFileName);
return false;
}
_sStoragePathSet.insert(_tempFileName);
}
_sStoragePathSet.insert(_tempFileName);

return true;
}

bool onStart()
{
// data task
if (_fileName.empty())
return true;

// open temp file handle for write
bool ret = false;
Expand Down Expand Up @@ -161,6 +169,7 @@ class DownloadTaskCURL : public IDownloadTask
break;
}
}

// open file
_fs = FileUtils::getInstance()->openFileStream(_tempFileName, IFileStream::Mode::APPEND);
if (!_fs)
Expand Down Expand Up @@ -306,24 +315,22 @@ class DownloadTaskCURL : public IDownloadTask
std::recursive_mutex _mutex;

// header info
bool _acceptRanges;
int64_t _totalBytesExpected;
int64_t _totalBytesExpected = -1; // some server may not send data size, so set it to -1

curl_off_t _speed;
CURL* _curl;
curl_off_t _speed = 0;
CURL* _curl = nullptr;
curl_socket_t _sockfd = -1; // store the sockfd to support cancel download manually
bool _cancelled = false;

curl_slist* _requestHeaders;

// progress
int64_t _transferOffset;
int64_t _bytesReceived;
int64_t _totalBytesReceived;
bool _alreadyDownloaded = false;
int64_t _transferOffset = 0;
int64_t _bytesReceived = 0;
int64_t _totalBytesReceived = 0;

// error
int _errCode;
int _errCodeInternal;
int _errCode = DownloadTask::ERROR_NO_ERROR;
int _errCodeInternal = CURLE_OK;
std::string _errDescription;

// for saving data
Expand All @@ -336,19 +343,9 @@ class DownloadTaskCURL : public IDownloadTask
// calculate md5 in downloading time support
std::unique_ptr<IFileStream> _fsMd5{}; // store md5 state realtime
MD5state_st _md5State;

void _initInternal()
{
_bytesReceived = (0);
_totalBytesReceived = (0);
_totalBytesExpected = (-1); // some server may not replay data size, so set it to -1
_speed = 0;
_curl = nullptr;
_errCode = (DownloadTask::ERROR_NO_ERROR);
_errCodeInternal = (CURLE_OK);
}
};
int DownloadTaskCURL::_sSerialId;
std::mutex DownloadTaskCURL::_sStoragePathSetMutex;
std::set<std::string> DownloadTaskCURL::_sStoragePathSet;

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -369,17 +366,8 @@ class DownloaderCURL::Impl : public std::enable_shared_from_this<DownloaderCURL:

void addTask(std::shared_ptr<DownloadTask> task, DownloadTaskCURL* coTask)
{
int status = coTask->verifyFileIntegrity(task->checksum);

if (status & kCheckSumStateSucceed || DownloadTask::ERROR_NO_ERROR != coTask->_errCode)
{
_owner->_onDownloadFinished(*task, status);
}
else
{
std::lock_guard<std::mutex> lock(_requestMutex);
_requestQueue.emplace_back(task);
}
std::lock_guard<std::mutex> lock(_requestMutex);
_requestQueue.emplace_back(task);
}

void run()
Expand Down Expand Up @@ -470,7 +458,7 @@ class DownloaderCURL::Impl : public std::enable_shared_from_this<DownloaderCURL:
{
auto& downloaderImpl = coTask->owner;
downloaderImpl._updateTaskProgressInfo(*task);
downloaderImpl.onTaskProgress(*task, downloaderImpl._transferDataToBuffer);
downloaderImpl.onTaskProgress(*task);
}
return 0;
}
Expand All @@ -489,7 +477,6 @@ class DownloaderCURL::Impl : public std::enable_shared_from_this<DownloaderCURL:

/* Resolve host domain to ip */
std::string internalURL = task->requestURL;
// Curl_custom_setup(handle, internalURL, (void**)& coTask->_requestHeaders);

// set url
curl_easy_setopt(handle, CURLOPT_URL, internalURL.c_str());
Expand Down Expand Up @@ -657,13 +644,7 @@ class DownloaderCURL::Impl : public std::enable_shared_from_this<DownloaderCURL:
}
}

if (task->background)
_owner->_onDownloadFinished(*task);
else
{
std::lock_guard<std::mutex> lock(_finishedMutex);
_finishedQueue.emplace_back(task);
}
finishTask(task);
}
} while (m);
}
Expand Down Expand Up @@ -693,13 +674,30 @@ class DownloaderCURL::Impl : public std::enable_shared_from_this<DownloaderCURL:
}

auto coTask = static_cast<DownloadTaskCURL*>(task->_coTask.get());

// Init task, open file handles, etc
if (!coTask->onStart())
{
finishTask(task);
continue;
}

// Check if the file has already been downloaded
int status = coTask->verifyFileIntegrity(task->checksum);
if (status & kCheckSumStateSucceed || DownloadTask::ERROR_NO_ERROR != coTask->_errCode)
{
coTask->_alreadyDownloaded = status & kCheckSumStateSucceed;
finishTask(task);
continue;
}

// create curl handle from task and add into curl multi handle
CURL* curlHandle = curl_easy_init();

if (nullptr == curlHandle)
{
coTask->setErrorDesc(DownloadTask::ERROR_IMPL_INTERNAL, 0, "Alloc curl handle failed.");
_owner->_onDownloadFinished(*task);
finishTask(task);
continue;
}

Expand All @@ -711,7 +709,7 @@ class DownloaderCURL::Impl : public std::enable_shared_from_this<DownloaderCURL:
if (CURLM_OK != mcode)
{
coTask->setErrorDesc(DownloadTask::ERROR_IMPL_INTERNAL, mcode, curl_multi_strerror(mcode));
_owner->_onDownloadFinished(*task);
finishTask(task);
continue;
}

Expand All @@ -728,6 +726,19 @@ class DownloaderCURL::Impl : public std::enable_shared_from_this<DownloaderCURL:
AXLOGD("----DownloaderCURL::Impl::_threadProc end");
}

void finishTask(std::shared_ptr<DownloadTask>& task)
{
if (task->background)
{
_owner->_onDownloadFinished(*task);
}
else
{
std::lock_guard<std::mutex> lock(_finishedMutex);
_finishedQueue.emplace_back(task);
}
}

std::thread _thread;
std::atomic_bool _tasksFinished{};
std::deque<std::shared_ptr<DownloadTask>> _requestQueue;
Expand All @@ -745,24 +756,11 @@ class DownloaderCURL::Impl : public std::enable_shared_from_this<DownloaderCURL:

////////////////////////////////////////////////////////////////////////////////
// Implementation DownloaderCURL
DownloaderCURL::DownloaderCURL(const DownloaderHints& hints) : _impl(std::make_shared<Impl>()), _currTask(nullptr)
DownloaderCURL::DownloaderCURL(const DownloaderHints& hints) : _impl(std::make_shared<Impl>())
{
AXLOGD("Construct DownloaderCURL {}", fmt::ptr(this));
_impl->hints = hints;
_impl->_owner = this;

_transferDataToBuffer = [this](void* buf, int64_t len) -> int64_t {
DownloadTaskCURL& coTask = *_currTask;
int64_t dataLen = coTask._buf.size();
if (len < dataLen)
{
return 0;
}

memcpy(buf, coTask._buf.data(), dataLen);
coTask._buf.resize(0);
return dataLen;
};
}

DownloaderCURL::~DownloaderCURL()
Expand Down Expand Up @@ -828,10 +826,8 @@ void DownloaderCURL::_onUpdate(float)
std::lock_guard<std::recursive_mutex> lock(coTask._mutex);
if (coTask._bytesReceived)
{
_currTask = &coTask;
_updateTaskProgressInfo(*task);
onTaskProgress(*task, _transferDataToBuffer);
_currTask = nullptr;
onTaskProgress(*task);
coTask._bytesReceived = 0;
}
}
Expand Down Expand Up @@ -860,18 +856,16 @@ void DownloaderCURL::_updateTaskProgressInfo(DownloadTask& task, int64_t totalEx
task.progressInfo.speedInBytes = coTask._speed;
}

void DownloaderCURL::_onDownloadFinished(DownloadTask& task, int checkState)
void DownloaderCURL::_onDownloadFinished(DownloadTask& task)
{
auto& coTask = static_cast<DownloadTaskCURL&>(*task._coTask);

// if there is bytesReceived, call progress update first
if (coTask._bytesReceived)
{
_currTask = &coTask;
_updateTaskProgressInfo(task);
onTaskProgress(task, _transferDataToBuffer);
onTaskProgress(task);
coTask._bytesReceived = 0;
_currTask = nullptr;
}

// if file task, close file handle and rename file if needed
Expand All @@ -883,7 +877,7 @@ void DownloaderCURL::_onDownloadFinished(DownloadTask& task, int checkState)
coTask._fs.reset();
coTask._fsMd5.reset();

if (checkState & kCheckSumStateSucceed) // No need download
if (coTask._alreadyDownloaded) // No need to download
{
auto fsOrigin = pFileUtils->openFileStream(coTask._fileName, IFileStream::Mode::READ);
if (fsOrigin)
Expand All @@ -896,7 +890,7 @@ void DownloaderCURL::_onDownloadFinished(DownloadTask& task, int checkState)

pFileUtils->removeFile(coTask._tempFileName);

onTaskProgress(task, _transferDataToBuffer);
onTaskProgress(task);

fsOrigin = nullptr;
}
Expand Down Expand Up @@ -954,6 +948,8 @@ void DownloaderCURL::_onDownloadFinished(DownloadTask& task, int checkState)
// Rename file work fine.
if (pFileUtils->renameFile(coTask._tempFileName, coTask._fileName))
{
auto lock = std::lock_guard<std::mutex>(DownloadTaskCURL::_sStoragePathSetMutex);

// success, remove storage from set
DownloadTaskCURL::_sStoragePathSet.erase(coTask._tempFileName);
break;
Expand Down
6 changes: 1 addition & 5 deletions core/network/Downloader-curl.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,12 @@ class DownloaderCURL : public IDownloaderImpl
class Impl;
std::shared_ptr<Impl> _impl;

// for transfer data on schedule
DownloadTaskCURL* _currTask; // temp ref
std::function<int64_t(void*, int64_t)> _transferDataToBuffer;

void _lazyScheduleUpdate();

static void _updateTaskProgressInfo(DownloadTask& task, int64_t totalExpected = -1);

// scheduler for update processing and finished task in main schedule
void _onDownloadFinished(DownloadTask& task, int checkState = 0);
void _onDownloadFinished(DownloadTask& task);

// scheduler for update processing and finished task in main schedule
void _onUpdate(float);
Expand Down
4 changes: 1 addition & 3 deletions core/network/Downloader-wasm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,8 @@ namespace ax { namespace network {
}

DownloadTaskEmscripten *coTask = iter->second;
function<int64_t(void*, int64_t)> transferDataToBuffer; // just a placeholder
// int dl = dlNow - coTask->bytesReceived;
coTask->bytesReceived = dlNow;
downloader->onTaskProgress(*coTask->task, transferDataToBuffer);
downloader->onTaskProgress(*coTask->task);
}

void DownloaderEmscripten::onError(emscripten_fetch_t *fetch)
Expand Down
3 changes: 1 addition & 2 deletions core/network/Downloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ Downloader::Downloader(const DownloaderHints& hints)
{
AXLOGD("Construct Downloader {}", fmt::ptr(this));
_impl.reset(new DownloaderImpl(hints));
_impl->onTaskProgress = [this](const DownloadTask& task,
std::function<int64_t(void* buffer, int64_t len)>& /*transferDataToBuffer*/) {
_impl->onTaskProgress = [this](const DownloadTask& task) {
if (onTaskProgress)
{
onTaskProgress(task);
Expand Down
Loading

0 comments on commit 5eab46f

Please sign in to comment.