Skip to content

Commit

Permalink
Optimize file downloading for requests with strong worker filters (#9452
Browse files Browse the repository at this point in the history
)
  • Loading branch information
resetius authored Sep 18, 2024
1 parent d8ee31d commit 0d768e9
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,29 @@ TWorkerFilter::TWorkerFilter(const Yql::DqsProto::TWorkerFilter& filter)
}
}

TWorkerFilter::EMatchStatus TWorkerFilter::Match(const TWorkerInfo::TPtr& workerInfo, int taskId, TStats* stats) const {
bool allExists = true;
bool partial = false;
bool TWorkerFilter::MatchHost(const NDqs::TWorkerInfo::TPtr& workerInfo) const {
if (FullMatch) {
if (Filter.GetClusterName() && workerInfo->ClusterName != Filter.GetClusterName()) {
return EFAIL;
return false;
}
if (!Addresses.empty() && Addresses.find(workerInfo->Address) == Addresses.end()) {
return EFAIL;
return false;
}
if (!NodeIds.empty() && NodeIds.find(workerInfo->NodeId) == NodeIds.end()) {
return EFAIL;
return false;
}
}

return true;
}

TWorkerFilter::EMatchStatus TWorkerFilter::Match(const TWorkerInfo::TPtr& workerInfo, int taskId, TStats* stats) const {
bool allExists = true;
bool partial = false;

if (!MatchHost(workerInfo)) {
return EFAIL;
}
if (Filter.GetClusterNameHint() && workerInfo->ClusterName != Filter.GetClusterNameHint()) {
partial = true;
}
Expand All @@ -52,7 +61,10 @@ TWorkerFilter::EMatchStatus TWorkerFilter::Match(const TWorkerInfo::TPtr& worker
(*stats->WaitingResources)[id].insert(taskId);
} else {
(*stats->WaitingResources)[id].erase(taskId);
stats->Uploaded->find(id)->second.TryCount ++;
auto maybeUploadedStats = stats->Uploaded->find(id);
if (maybeUploadedStats != stats->Uploaded->end()) {
maybeUploadedStats->second.TryCount ++;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class TWorkerFilter {
TWorkerFilter(const Yql::DqsProto::TWorkerFilter& filter);

EMatchStatus Match(const NDqs::TWorkerInfo::TPtr& workerInfo, int taskId, TStats* stats) const;
// match mandatory host-specific fields like Address, NodeId, ClusterName
bool MatchHost(const NDqs::TWorkerInfo::TPtr& workerInfo) const;

void Visit(const std::function<void(const Yql::DqsProto::TFile&)>& visitor) const;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,9 @@ TVector<TWorkerInfo::TPtr> TWorkersStorage::TryAllocate(const NDq::IScheduler::T
if (workerInfo->Stopping) {
continue;
}
if (!filter.MatchHost(workerInfo)) {
continue;
}
filter.Visit([&](const auto& file) {
if (workerInfo->AddToDownloadList(file.GetObjectId(), file)) {
YQL_CLOG(TRACE, ProviderDq) << "Added " << file.GetName() << "|" << file.GetObjectId() << " to worker's " << GetGuidAsString(workerInfo->WorkerId) << " download list" ;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,64 @@ Y_UNIT_TEST_SUITE(WorkersBenchmark) {
UNIT_ASSERT_VALUES_EQUAL(all.size(), 100);
UNIT_ASSERT_VALUES_EQUAL(0, storage.FreeSlots());
}

Y_UNIT_TEST(ScheduleDownload) {
int workers = 10;
TWorkersStorage storage(1, new TSensorsGroup, new TSensorsGroup);
storage.Clear();
for (int i = 0; i < workers; i++) {
TGUID guid;
Yql::DqsProto::RegisterNodeRequest request;
request.SetCapacity(100);
request.AddKnownNodes(1);
CreateGuid(&guid);
storage.CreateOrUpdate(100+i, guid, request);
}

{
auto request = NDqProto::TAllocateWorkersRequest();
request.SetCount(10);

auto waitInfo1 = IScheduler::TWaitInfo(request, NActors::TActorId());
auto result = storage.TryAllocate(waitInfo1);

UNIT_ASSERT_VALUES_EQUAL(result.size(), 10);
}

{
auto request = NDqProto::TAllocateWorkersRequest();
auto workerFilter = Yql::DqsProto::TWorkerFilter();
workerFilter.AddNodeId(102);

request.SetCount(10);
for (ui32 i = 0; i < request.GetCount(); i++) {
*request.AddWorkerFilterPerTask() = workerFilter;
}
auto waitInfo2 = IScheduler::TWaitInfo(request, NActors::TActorId());
auto result = storage.TryAllocate(waitInfo2);
UNIT_ASSERT_VALUES_EQUAL(result.size(), 10);
}

{
auto request = NDqProto::TAllocateWorkersRequest();
auto workerFilter = Yql::DqsProto::TWorkerFilter();
workerFilter.AddNodeId(102);
Yql::DqsProto::TFile file;
file.SetObjectId("fileId");
file.SetLocalPath("/tmp/test");
*workerFilter.AddFile() = file;
request.SetCount(10);
for (ui32 i = 0; i < request.GetCount(); i++) {
*request.AddWorkerFilterPerTask() = workerFilter;
}

auto waitInfo3 = IScheduler::TWaitInfo(request, NActors::TActorId());
auto result = storage.TryAllocate(waitInfo3);
UNIT_ASSERT_VALUES_EQUAL(result.size(), 0);

storage.Visit([](const NDqs::TWorkerInfo::TPtr& workerInfo) {
UNIT_ASSERT(workerInfo->GetDownloadList().size() == 0 || workerInfo->NodeId == 102);
});
}
}
}

0 comments on commit 0d768e9

Please sign in to comment.