Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve logging to include taskId in handoff notifier thread #17185

Merged
merged 5 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ protected boolean waitForSegmentAvailability(

try (
SegmentHandoffNotifier notifier = toolbox.getSegmentHandoffNotifierFactory()
.createSegmentHandoffNotifier(segmentsToWaitFor.get(0).getDataSource())
.createSegmentHandoffNotifier(segmentsToWaitFor.get(0).getDataSource(), getId())
) {

final ExecutorService exec = Execs.directExecutor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1181,7 +1181,7 @@ public void testWaitForSegmentAvailabilityMultipleSegmentsTimeout()
EasyMock.expect(mockToolbox.getSegmentHandoffNotifierFactory()).andReturn(mockFactory).once();
EasyMock.expect(mockToolbox.getEmitter()).andReturn(new NoopServiceEmitter()).anyTimes();
EasyMock.expect(mockDataSegment1.getDataSource()).andReturn("MockDataSource").once();
EasyMock.expect(mockFactory.createSegmentHandoffNotifier("MockDataSource")).andReturn(mockNotifier).once();
EasyMock.expect(mockFactory.createSegmentHandoffNotifier("MockDataSource", indexTask.getId())).andReturn(mockNotifier).once();
mockNotifier.start();
EasyMock.expectLastCall().once();
mockNotifier.registerSegmentHandoffCallback(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ private SegmentHandoffNotifierFactory setUpSegmentHandOffNotifierFactory()
return new SegmentHandoffNotifierFactory()
{
@Override
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource)
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource, String taskId)
{
return new SegmentHandoffNotifier()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ public boolean checkPointDataSourceMetadata(
final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory(
taskActionToolbox
);
final SegmentHandoffNotifierFactory handoffNotifierFactory = dataSource -> new SegmentHandoffNotifier()
final SegmentHandoffNotifierFactory handoffNotifierFactory = (dataSource, taskId) -> new SegmentHandoffNotifier()
{
@Override
public boolean registerSegmentHandoffCallback(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,25 @@ public class CoordinatorBasedSegmentHandoffNotifier implements SegmentHandoffNot
private volatile ScheduledExecutorService scheduledExecutor;
private final Duration pollDuration;
private final String dataSource;
private final String taskId;

public CoordinatorBasedSegmentHandoffNotifier(
String dataSource,
CoordinatorClient coordinatorClient,
CoordinatorBasedSegmentHandoffNotifierConfig config
CoordinatorBasedSegmentHandoffNotifierConfig config,
String taskId
)
{
this.dataSource = dataSource;
this.coordinatorClient = coordinatorClient;
this.pollDuration = config.getPollDuration();
this.taskId = taskId;
}

@Override
public boolean registerSegmentHandoffCallback(SegmentDescriptor descriptor, Executor exec, Runnable handOffRunnable)
{
log.debug("Adding SegmentHandoffCallback for dataSource[%s] Segment[%s]", dataSource, descriptor);
log.debug("Adding SegmentHandoffCallback for dataSource[%s] Segment[%s] for task[%s]", dataSource, descriptor, taskId);
Pair<Executor, Runnable> prev = handOffCallbacks.putIfAbsent(
descriptor,
new Pair<>(exec, handOffRunnable)
Expand Down Expand Up @@ -91,30 +94,32 @@ void checkForSegmentHandoffs()
Boolean handOffComplete =
FutureUtils.getUnchecked(coordinatorClient.isHandoffComplete(dataSource, descriptor), true);
if (Boolean.TRUE.equals(handOffComplete)) {
log.debug("Segment handoff complete for dataSource[%s] segment[%s]", dataSource, descriptor);
log.debug("Segment handoff complete for dataSource[%s] segment[%s] for task[%s]", dataSource, descriptor, taskId);
entry.getValue().lhs.execute(entry.getValue().rhs);
itr.remove();
}
}
catch (Exception e) {
log.error(
e,
"Exception while checking handoff for dataSource[%s] Segment[%s]; will try again after [%s]",
"Exception while checking handoff for dataSource[%s] Segment[%s], taskId[%s]; will try again after [%s]",
dataSource,
descriptor,
taskId,
pollDuration
);
}
}
if (!handOffCallbacks.isEmpty()) {
log.info("Still waiting for handoff for [%d] segments", handOffCallbacks.size());
log.info("Still waiting for handoff for [%d] segments for task[%s]", handOffCallbacks.size(), taskId);
}
}
catch (Throwable t) {
log.error(
t,
"Exception while checking handoff for dataSource[%s]; will try again after [%s]",
"Exception while checking handoff for dataSource[%s], taskId[%s]; will try again after [%s]",
dataSource,
taskId,
pollDuration
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ public CoordinatorBasedSegmentHandoffNotifierFactory(
}

@Override
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource)
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource, String taskId)
{
return new CoordinatorBasedSegmentHandoffNotifier(
dataSource,
client,
config
config,
taskId
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void close()
};

@Override
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource)
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource, String taskId)
{
return NOTIFIER;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@

public interface SegmentHandoffNotifierFactory
{
SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource);
SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource, String taskId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public StreamAppenderatorDriver(
super(appenderator, segmentAllocator, segmentRetriever, dataSegmentKiller);

this.handoffNotifier = Preconditions.checkNotNull(handoffNotifierFactory, "handoffNotifierFactory")
.createSegmentHandoffNotifier(appenderator.getDataSource());
.createSegmentHandoffNotifier(appenderator.getDataSource(), appenderator.getId());
this.metrics = Preconditions.checkNotNull(metrics, "metrics");
this.objectMapper = Preconditions.checkNotNull(objectMapper, "objectMapper");
}
Expand Down Expand Up @@ -360,7 +360,7 @@ public ListenableFuture<SegmentsAndCommitMetadata> registerHandoff(SegmentsAndCo
),
Execs.directExecutor(),
() -> {
log.debug("Segment[%s] successfully handed off, dropping.", segmentIdentifier);
log.debug("Segment[%s] successfully handed off for task[%s], dropping.", segmentIdentifier, appenderator.getId());
metrics.incrementHandOffCount();

final ListenableFuture<?> dropFuture = appenderator.drop(segmentIdentifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public void testHandoffCallbackNotCalled()
CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier(
"test_ds",
coordinatorClient,
notifierConfig
notifierConfig,
"test_task"
);
final AtomicBoolean callbackCalled = new AtomicBoolean(false);
notifier.registerSegmentHandoffCallback(
Expand Down Expand Up @@ -89,7 +90,8 @@ public void testHandoffCallbackCalled()
CoordinatorBasedSegmentHandoffNotifier notifier = new CoordinatorBasedSegmentHandoffNotifier(
"test_ds",
coordinatorClient,
notifierConfig
notifierConfig,
"test_task"
);

notifier.registerSegmentHandoffCallback(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ public Set<SegmentDescriptor> getHandedOffSegmentDescriptors()
}

@Override
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource)
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource, String taskId)
{
return new SegmentHandoffNotifier()
{
Expand Down
Loading