Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ParallelIterable is deadlocking and is generally really complicated #11768

Closed
1 of 3 tasks
sopel39 opened this issue Dec 12, 2024 · 11 comments · Fixed by #11781
Closed
1 of 3 tasks

ParallelIterable is deadlocking and is generally really complicated #11768

sopel39 opened this issue Dec 12, 2024 · 11 comments · Fixed by #11781
Labels
bug Something isn't working

Comments

@sopel39
Copy link
Contributor

sopel39 commented Dec 12, 2024

Apache Iceberg version

1.7.1 (latest release)

Query engine

Trino

Please describe the bug 🐞

ParallelIterable implementation is really complicated and has subtle concurrency bugs.

Context #1

It was observed that with high concurrency/high workload scenario cluster concurrency is reduced to 0 or 1 due to S3 Timeout waiting for connection from pool errors. Once that starts to happening, it will continue to go on effectively making cluster unusable.

Context #2

ManifestGroup#plan will create ManifestReader per every ParallelIterable.Task. These readers will effectively hold onto S3 connection from the pool. When ParallelIterable queue is full, Task will be tabled for later use. The number of tasks is not bounded by worker pool size, but rather X = num(ParallelIterable instances) * size(ParallelIterator#taskFutures). One can see that X can be significant with high number of concurrent queries.

Issue #1

ParallelIterable is not batch based. This means it will produce read-ahead results even if downstream consumer doesn't have slots for them. This can lead to subtle concurrency issues. For instance consider two parallel iterables P1, P2. Let's assume single threaded reader consumes 500 elements from P1, then P2 then P1 and so on (this could be splits for instance). If P1 becomes full then it will no longer fetch more elements while holding of tasks (which in turn hold S3 connections). This will prevent fetching of tasks from P2 from completion (because there are no "free" S3 slots).

Consider scenario:
S3 connection pool size=1
approximateMaxQueueSize=1
workerPoolSize=1

P1: starts TaskP1
P1: produces result, queue full, TaskP1 put on hold (holds S3 connection)
P2: starts TaskP2, TaskP2 is scheduled on workerPool but is blocked on S3 connection pool
P1: result consumed, TaskP1 is scheduled again
P1: TaskP1 waits for workerPool to be free, but TaskP2 is waiting for TaskP1 to release connection
DEADLOCK

Issue #2

Active waiting. This one is a known one. However, if one looks at ParallelIterable.ParallelIterator#checkTasks there is:

        if (taskFutures[i] == null || taskFutures[i].isDone()) {
           continuation.ifPresent(yieldedTasks::addLast);
...
          taskFutures[i] = submitNextTask();
      }

which means active waiting is actually happening though workerPool (e.g. task is started on worker pool just to check that queue is full and it should be put on hold).

Short term fix?

Once ParallelIterable.Task is started it should continue until entire task is consumed. This will prevent putting limited resourcs on hold. if (queue.size() >= approximateMaxQueueSize) { check should only happen once per task before iterator is created.

Long term fix?

Perhaps the code can be refactored to be more readable and streamlined?

cc @findepi @raunaqmorarka

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time
@sopel39 sopel39 added the bug Something isn't working label Dec 12, 2024
@sopel39
Copy link
Contributor Author

sopel39 commented Dec 12, 2024

@alexjo2144 had a fix that tries to workaround this bug trinodb/trino#23321, but it's only mitigates effects rather than fixing core issue

@osscm
Copy link

osscm commented Dec 12, 2024

cc @RussellSpitzer @rdblue

@findepi
Copy link
Member

findepi commented Dec 13, 2024

Good writeup thanks for filing this issue @sopel39. How hard would be to do the long term fix?

@rdblue you usually have opinions on ParallelIterable. do you have one this time?

sopel39 added a commit to sopel39/iceberg that referenced this issue Dec 13, 2024
It was observed that with high concurrency/high workload scenario
cluster deadlocks due to manifest readers waiting for connection from S3 pool.

Specifically, ManifestGroup#plan will create ManifestReader per every ParallelIterable.Task.
These readers will effectively hold onto S3 connection from the pool.
When ParallelIterable queue is full, Task will be tabled for later use.

Consider scenario:
S3 connection pool size=1
approximateMaxQueueSize=1
workerPoolSize=1

ParallelIterable1: starts TaskP1
ParallelIterable1: TaskP1 produces result, queue gets full, TaskP1 is put on hold (holds S3 connection)
ParallelIterable2: starts TaskP2, TaskP2 is scheduled on workerPool but is blocked on S3 connection pool
ParallelIterable1: result gets consumed, TaskP1 is scheduled again
ParallelIterable1: TaskP1 waits for workerPool to be free, but TaskP2 is waiting for TaskP1 to release connection

The fix make sure Task is finished once it's started. This way limited resources like
connection pool are not put on hold. Queue size might exceed strict limits, but it should
still be bounded.

Fixes apache#11768
sopel39 added a commit to sopel39/iceberg that referenced this issue Dec 13, 2024
It was observed that with high concurrency/high workload scenario
cluster deadlocks due to manifest readers waiting for connection from S3 pool.

Specifically, ManifestGroup#plan will create ManifestReader per every ParallelIterable.Task.
These readers will effectively hold onto S3 connection from the pool.
When ParallelIterable queue is full, Task will be tabled for later use.

Consider scenario:
S3 connection pool size=1
approximateMaxQueueSize=1
workerPoolSize=1

ParallelIterable1: starts TaskP1
ParallelIterable1: TaskP1 produces result, queue gets full, TaskP1 is put on hold (holds S3 connection)
ParallelIterable2: starts TaskP2, TaskP2 is scheduled on workerPool but is blocked on S3 connection pool
ParallelIterable1: result gets consumed, TaskP1 is scheduled again
ParallelIterable1: TaskP1 waits for workerPool to be free, but TaskP2 is waiting for TaskP1 to release connection

The fix make sure Task is finished once it's started. This way limited resources like
connection pool are not put on hold. Queue size might exceed strict limits, but it should
still be bounded.

Fixes apache#11768
@tbaeg
Copy link

tbaeg commented Dec 18, 2024

Thanks @sopel39 for putting this all together.

We are currently seeing this exact issue. Happy to provide additional context if desired, but I think it's thoroughly covered above.

sopel39 added a commit to sopel39/iceberg that referenced this issue Dec 18, 2024
It was observed that with high concurrency/high workload scenario
cluster deadlocks due to manifest readers waiting for connection from S3 pool.

Specifically, ManifestGroup#plan will create ManifestReader per every ParallelIterable.Task.
These readers will effectively hold onto S3 connection from the pool.
When ParallelIterable queue is full, Task will be tabled for later use.

Consider scenario:
S3 connection pool size=1
approximateMaxQueueSize=1
workerPoolSize=1

ParallelIterable1: starts TaskP1
ParallelIterable1: TaskP1 produces result, queue gets full, TaskP1 is put on hold (holds S3 connection)
ParallelIterable2: starts TaskP2, TaskP2 is scheduled on workerPool but is blocked on S3 connection pool
ParallelIterable1: result gets consumed, TaskP1 is scheduled again
ParallelIterable1: TaskP1 waits for workerPool to be free, but TaskP2 is waiting for TaskP1 to release connection

The fix make sure Task is finished once it's started. This way limited resources like
connection pool are not put on hold. Queue size might exceed strict limits, but it should
still be bounded.

Fixes apache#11768
sopel39 added a commit to sopel39/iceberg that referenced this issue Dec 18, 2024
It was observed that with high concurrency/high workload scenario
cluster deadlocks due to manifest readers waiting for connection from S3 pool.

Specifically, ManifestGroup#plan will create ManifestReader per every ParallelIterable.Task.
These readers will effectively hold onto S3 connection from the pool.
When ParallelIterable queue is full, Task will be tabled for later use.

Consider scenario:
S3 connection pool size=1
approximateMaxQueueSize=1
workerPoolSize=1

ParallelIterable1: starts TaskP1
ParallelIterable1: TaskP1 produces result, queue gets full, TaskP1 is put on hold (holds S3 connection)
ParallelIterable2: starts TaskP2, TaskP2 is scheduled on workerPool but is blocked on S3 connection pool
ParallelIterable1: result gets consumed, TaskP1 is scheduled again
ParallelIterable1: TaskP1 waits for workerPool to be free, but TaskP2 is waiting for TaskP1 to release connection

The fix make sure Task is finished once it's started. This way limited resources like
connection pool are not put on hold. Queue size might exceed strict limits, but it should
still be bounded.

Fixes apache#11768
sopel39 added a commit to sopel39/iceberg that referenced this issue Dec 18, 2024
It was observed that with high concurrency/high workload scenario
cluster deadlocks due to manifest readers waiting for connection from S3 pool.

Specifically, ManifestGroup#plan will create ManifestReader per every ParallelIterable.Task.
These readers will effectively hold onto S3 connection from the pool.
When ParallelIterable queue is full, Task will be tabled for later use.

Consider scenario:
S3 connection pool size=1
approximateMaxQueueSize=1
workerPoolSize=1

ParallelIterable1: starts TaskP1
ParallelIterable1: TaskP1 produces result, queue gets full, TaskP1 is put on hold (holds S3 connection)
ParallelIterable2: starts TaskP2, TaskP2 is scheduled on workerPool but is blocked on S3 connection pool
ParallelIterable1: result gets consumed, TaskP1 is scheduled again
ParallelIterable1: TaskP1 waits for workerPool to be free, but TaskP2 is waiting for TaskP1 to release connection

The fix make sure Task is finished once it's started. This way limited resources like
connection pool are not put on hold. Queue size might exceed strict limits, but it should
still be bounded.

Fixes apache#11768
sopel39 added a commit to sopel39/iceberg that referenced this issue Dec 18, 2024
It was observed that with high concurrency/high workload scenario
cluster deadlocks due to manifest readers waiting for connection from S3 pool.

Specifically, ManifestGroup#plan will create ManifestReader per every ParallelIterable.Task.
These readers will effectively hold onto S3 connection from the pool.
When ParallelIterable queue is full, Task will be tabled for later use.

Consider scenario:
S3 connection pool size=1
approximateMaxQueueSize=1
workerPoolSize=1

ParallelIterable1: starts TaskP1
ParallelIterable1: TaskP1 produces result, queue gets full, TaskP1 is put on hold (holds S3 connection)
ParallelIterable2: starts TaskP2, TaskP2 is scheduled on workerPool but is blocked on S3 connection pool
ParallelIterable1: result gets consumed, TaskP1 is scheduled again
ParallelIterable1: TaskP1 waits for workerPool to be free, but TaskP2 is waiting for TaskP1 to release connection

The fix make sure Task is finished once it's started. This way limited resources like
connection pool are not put on hold. Queue size might exceed strict limits, but it should
still be bounded.

Fixes apache#11768
sopel39 added a commit to sopel39/iceberg that referenced this issue Dec 18, 2024
It was observed that with high concurrency/high workload scenario
cluster deadlocks due to manifest readers waiting for connection from S3 pool.

Specifically, ManifestGroup#plan will create ManifestReader per every ParallelIterable.Task.
These readers will effectively hold onto S3 connection from the pool.
When ParallelIterable queue is full, Task will be tabled for later use.

Consider scenario:
S3 connection pool size=1
approximateMaxQueueSize=1
workerPoolSize=1

ParallelIterable1: starts TaskP1
ParallelIterable1: TaskP1 produces result, queue gets full, TaskP1 is put on hold (holds S3 connection)
ParallelIterable2: starts TaskP2, TaskP2 is scheduled on workerPool but is blocked on S3 connection pool
ParallelIterable1: result gets consumed, TaskP1 is scheduled again
ParallelIterable1: TaskP1 waits for workerPool to be free, but TaskP2 is waiting for TaskP1 to release connection

The fix make sure Task is finished once it's started. This way limited resources like
connection pool are not put on hold. Queue size might exceed strict limits, but it should
still be bounded.

Fixes apache#11768
@sopel39
Copy link
Contributor Author

sopel39 commented Dec 18, 2024

@tbaeg could you try this patch and see if it resolves your issue?

@tbaeg
Copy link

tbaeg commented Dec 18, 2024

@sopel39 Yup! Already forked and cherry-picked your commits off the 1.7.1 tag.

I'm trying to get it deployed to a cluster where this is reproduce-able but it's a bit of a challenge.

@tbaeg
Copy link

tbaeg commented Dec 19, 2024

@sopel39 I think the issue is resolved with this patch.

Previously, we made an incorrect assumption the read timeout was due to slowness in processing the avro files, not a deadlock (we introduced separate executor for scan planning with fewer threads). We increased our read time out in our http connections in hopes to mitigate the issue and was likely masking the deadlock.

I reverted the read time out to the default (in our case 30 seconds) and have not seen the issue when running with the patch. I was only able to test in an isolated setting, but it looks promising.

@sopel39
Copy link
Contributor Author

sopel39 commented Dec 19, 2024

@tbaeg awesome. Thanks for reporting

@tbaeg
Copy link

tbaeg commented Dec 20, 2024

@sopel39 Another data point.. an airflow DAG in production that was consistently failing with socket timeout is no longer failing with this patch.

@sopel39
Copy link
Contributor Author

sopel39 commented Dec 20, 2024

@sopel39 Another data point.. an airflow DAG in production that was consistently failing with socket timeout is no longer failing with this patch.

That's Trino or Spark?

@tbaeg
Copy link

tbaeg commented Dec 20, 2024

@sopel39 Another data point.. an airflow DAG in production that was consistently failing with socket timeout is no longer failing with this patch.

That's Trino or Spark?

Trino.

sopel39 added a commit to sopel39/iceberg that referenced this issue Jan 7, 2025
It was observed that with high concurrency/high workload scenario
cluster deadlocks due to manifest readers waiting for connection from S3 pool.

Specifically, ManifestGroup#plan will create ManifestReader per every ParallelIterable.Task.
These readers will effectively hold onto S3 connection from the pool.
When ParallelIterable queue is full, Task will be tabled for later use.

Consider scenario:
S3 connection pool size=1
approximateMaxQueueSize=1
workerPoolSize=1

ParallelIterable1: starts TaskP1
ParallelIterable1: TaskP1 produces result, queue gets full, TaskP1 is put on hold (holds S3 connection)
ParallelIterable2: starts TaskP2, TaskP2 is scheduled on workerPool but is blocked on S3 connection pool
ParallelIterable1: result gets consumed, TaskP1 is scheduled again
ParallelIterable1: TaskP1 waits for workerPool to be free, but TaskP2 is waiting for TaskP1 to release connection

The fix make sure Task is finished once it's started. This way limited resources like
connection pool are not put on hold. Queue size might exceed strict limits, but it should
still be bounded.

Fixes apache#11768
Fokko pushed a commit that referenced this issue Jan 13, 2025
* Fix ParallelIterable deadlock

It was observed that with high concurrency/high workload scenario
cluster deadlocks due to manifest readers waiting for connection from S3 pool.

Specifically, ManifestGroup#plan will create ManifestReader per every ParallelIterable.Task.
These readers will effectively hold onto S3 connection from the pool.
When ParallelIterable queue is full, Task will be tabled for later use.

Consider scenario:
S3 connection pool size=1
approximateMaxQueueSize=1
workerPoolSize=1

ParallelIterable1: starts TaskP1
ParallelIterable1: TaskP1 produces result, queue gets full, TaskP1 is put on hold (holds S3 connection)
ParallelIterable2: starts TaskP2, TaskP2 is scheduled on workerPool but is blocked on S3 connection pool
ParallelIterable1: result gets consumed, TaskP1 is scheduled again
ParallelIterable1: TaskP1 waits for workerPool to be free, but TaskP2 is waiting for TaskP1 to release connection

The fix make sure Task is finished once it's started. This way limited resources like
connection pool are not put on hold. Queue size might exceed strict limits, but it should
still be bounded.

Fixes #11768

* Do not submit a task when there is no space in queue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants