Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inserting data at the same time, AWS glue? Not sure, Trino, Not sure. 400 error. Same problem Iceberg, and HIve. #16762

Open
bbarclay opened this issue Mar 28, 2023 · 2 comments

Comments

@bbarclay
Copy link

bbarclay commented Mar 28, 2023

I've run in Kubernetes, as Iceberg, and as Hive table inserts into AWS glue. I'm unsure if this is an AWS Glue problem or a Trino issue. It happens when I run an insert script, at the same time and with concurrency.

  1. Start two of the same insert scripts simultaneously, and they will somehow collide causing this error.
  2. Run using concurrency with more than 1 worker, and the error occurs.
  3. The error is different with Iceberg tables, but it's pretty much the same. There is some blocking that is coming from AWS, but the error codes are generic and difficult to find a resolution for.
  • Question: Is this a limitation from AWS glue, or a problem with Trino handing multiple connections with GLUE?

MY CODE:

from trino.dbapi import connect
import concurrent.futures
from multiprocessing import freeze_support
conn = connect(
    host="localhost",
    port=8080,
    user="trino",
    catalog="iceberg",
    schema="order_data",
)

#create table called iceberg using the iceberg connector
cur = conn.cursor()

def doWork(i):
    print(i)
    try:
            cur.execute("INSERT INTO order_test3 (order_id, order_date, order_customer_id, order_status) VALUES ('"+str(i)+"', '2021-01-01', '"+str(i)+"', 'COMPLETE')")
            #  print response
            response = cur.fetchall()
            #  close
            print(response)

            print(i)
    except Exception as e:
       print(i)
       print("Error")
       print(e)
    #    print detailed error
       print(e.args[0])
def main (): 
    with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
        for i in range(1500,1510):
            print(i)
            executor.submit(doWork, i)

if __name__ == '__main__':
	# enable support for multiprocessing for main 
      freeze_support()
      main()

THE ERROR:

TrinoQueryError(type=INTERNAL_ERROR, name=GENERIC_INTERNAL_ERROR, message="Failed to commit to Glue table: order_data.order_test3", query_id=20230328_033606_00016_fe4cq)

{
  "message": "Failed to commit to Glue table: order_data.order_test3",
  "errorCode": 65536,
  "errorName": "GENERIC_INTERNAL_ERROR",
  "errorType": "INTERNAL_ERROR",
  "failureInfo": {
    "type": "org.apache.iceberg.exceptions.CommitFailedException",
    "message": "Failed to commit to Glue table: order_data.order_test3",
    "stack": [
      "io.trino.plugin.iceberg.catalog.glue.GlueIcebergTableOperations.commitToExistingTable(GlueIcebergTableOperations.java:147)",
      "io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations.commit(AbstractIcebergTableOperations.java:151)",
      "org.apache.iceberg.BaseTransaction.lambda$commitSimpleTransaction$5(BaseTransaction.java:403)",
      "org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:402)",
      "org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:212)",
      "org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)",
      "org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:189)",
      "org.apache.iceberg.BaseTransaction.commitSimpleTransaction(BaseTransaction.java:394)",
      "org.apache.iceberg.BaseTransaction.commitTransaction(BaseTransaction.java:277)",
      "io.trino.plugin.iceberg.IcebergMetadata.finishInsert(IcebergMetadata.java:880)",
      "io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.finishInsert(ClassLoaderSafeConnectorMetadata.java:527)",
      "io.trino.metadata.MetadataManager.finishInsert(MetadataManager.java:932)",
      "io.trino.sql.planner.LocalExecutionPlanner.lambda$createTableFinisher$4(LocalExecutionPlanner.java:4128)",
      "io.trino.operator.TableFinishOperator.getOutput(TableFinishOperator.java:319)",
      "io.trino.operator.Driver.processInternal(Driver.java:396)",
      "io.trino.operator.Driver.lambda$process$8(Driver.java:299)",
      "io.trino.operator.Driver.tryWithLock(Driver.java:691)",
      "io.trino.operator.Driver.process(Driver.java:291)",
      "io.trino.operator.Driver.processForDuration(Driver.java:262)",
      "io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:773)",
      "io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:165)",
      "io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:523)",
      "io.trino.$gen.Trino_410____20230328_032911_2.run(Unknown Source)",
      "java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)",
      "java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)",
      "java.base/java.lang.Thread.run(Thread.java:833)"
    ],
    "cause": {
      "type": "com.amazonaws.services.glue.model.ConcurrentModificationException",
      "message": "Update table failed due to concurrent modifications. (Service: AWSGlue; Status Code: 400; Error Code: ConcurrentModificationException; Request ID: 00ed5238-e716-41da-91dd-89bdf628d75c; Proxy: null)",
      "stack": [
        "com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1879)",
        "com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1418)",
        "com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1387)",
        "com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157)",
        "com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814)",
        "com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)",
        "com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)",
        "com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715)",
        "com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697)",
        "com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561)",
        "com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541)",
        "com.amazonaws.services.glue.AWSGlueClient.doInvoke(AWSGlueClient.java:12473)",
        "com.amazonaws.services.glue.AWSGlueClient.invoke(AWSGlueClient.java:12440)",
        "com.amazonaws.services.glue.AWSGlueClient.invoke(AWSGlueClient.java:12429)",
        "com.amazonaws.services.glue.AWSGlueClient.executeUpdateTable(AWSGlueClient.java:12198)",
        "com.amazonaws.services.glue.AWSGlueClient.updateTable(AWSGlueClient.java:12167)",
        "io.trino.plugin.iceberg.catalog.glue.GlueIcebergTableOperations.lambda$commitToExistingTable$1(GlueIcebergTableOperations.java:143)",
        "io.trino.plugin.hive.aws.AwsApiCallStats.call(AwsApiCallStats.java:37)",
        "io.trino.plugin.iceberg.catalog.glue.GlueIcebergTableOperations.commitToExistingTable(GlueIcebergTableOperations.java:143)",
        "io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations.commit(AbstractIcebergTableOperations.java:151)",
        "org.apache.iceberg.BaseTransaction.lambda$commitSimpleTransaction$5(BaseTransaction.java:403)",
        "org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:402)",
        "org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:212)",
        "org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)",
        "org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:189)",
        "org.apache.iceberg.BaseTransaction.commitSimpleTransaction(BaseTransaction.java:394)",
        "org.apache.iceberg.BaseTransaction.commitTransaction(BaseTransaction.java:277)",
        "io.trino.plugin.iceberg.IcebergMetadata.finishInsert(IcebergMetadata.java:880)",
        "io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.finishInsert(ClassLoaderSafeConnectorMetadata.java:527)",
        "io.trino.metadata.MetadataManager.finishInsert(MetadataManager.java:932)",
        "io.trino.sql.planner.LocalExecutionPlanner.lambda$createTableFinisher$4(LocalExecutionPlanner.java:4128)",
        "io.trino.operator.TableFinishOperator.getOutput(TableFinishOperator.java:319)",
        "io.trino.operator.Driver.processInternal(Driver.java:396)",
        "io.trino.operator.Driver.lambda$process$8(Driver.java:299)",
        "io.trino.operator.Driver.tryWithLock(Driver.java:691)",
        "io.trino.operator.Driver.process(Driver.java:291)",
        "io.trino.operator.Driver.processForDuration(Driver.java:262)",
        "io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:773)",
        "io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:165)",
        "io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:523)",
        "io.trino.$gen.Trino_410____20230328_032911_2.run(Unknown Source)",
        "java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)",
        "java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)",
        "java.base/java.lang.Thread.run(Thread.java:833)"
      ],
      "suppressed": []
    },
    "suppressed": []
  }
}
@electrum
Copy link
Member

This error should be classified better. It should be translated to TrinoException with error code TRANSACTION_CONFLICT. However, the root cause is the concurrency model used by Iceberg.

The table can only be updated by one writer at once. When there are multiple writers, Glue will reject the update for all but the first writer due to the table version being outdated. This is what saves you from losing updates. Iceberg will retry up to 4 times, but with a concurrency of 10, it's likely that one of your workers is always "unlucky" and fails each retry attempt.

Iceberg is not designed for doing many small inserts from independent writers. Some options:

  • Coordinate among the workers so that only one is inserting at once.
  • Stage all the data somewhere, then insert it at once in one batch.
  • Write the data to a queue such as Kafka or Kinesis, then load it from there.

@bbarclay
Copy link
Author

The same thing happens with Athena.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants