Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [V2 Connector RabbitMQ] job failed after submit to cluster #4820

Closed
2 of 3 tasks
jobmission opened this issue May 24, 2023 · 1 comment · Fixed by #4842
Closed
2 of 3 tasks

[Bug] [V2 Connector RabbitMQ] job failed after submit to cluster #4820

jobmission opened this issue May 24, 2023 · 1 comment · Fixed by #4842
Labels

Comments

@jobmission
Copy link

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

job failed after submit to cluster

SeaTunnel Version

2.3.1

SeaTunnel Config

seatunnel:
  engine:
    backup-count: 1
    queue-type: blockingqueue
    print-execution-info-interval: 60
    print-job-metrics-info-interval: 60
    slot-service:
      dynamic-slot: true
    checkpoint:
      interval: 10000
      timeout: 60000
      max-concurrent: 5
      tolerable-failure: 2
      storage:
        type: hdfs
        max-retained: 3
        plugin-config:
          namespace: /tmp/seatunnel/checkpoint_snapshot
          storage.type: hdfs
          fs.defaultFS: file:///tmp/

Running Command

env {
    job.mode = "STREAMING"
}

source {
    RabbitMQ {
        host = "rabbitmq.local"
            port = 5672
            virtual_host = "/v2"
            username = "guest"
            password = "guest"
            automaticRecovery = "true"
            queue_name = "jian2"
            schema = {
            fields {
                commandType = string
                requestId = string
            }
        }
    }
}

transform {}



sink {
    Redis {
      host = "redis.local"
      port = 6379
      key = commandType
      data_type = key
      auth="xxxxxxx"
    }
}

Error Exception

java.util.concurrent.CompletionException: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: CheckpointCoordinator inside have error.
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:215)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.handleCoordinatorError(CheckpointCoordinator.java:211)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$null$7(CheckpointCoordinator.java:390)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
	at org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture.internalComplete(PassiveCompletableFuture.java:70)
	at org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture.lambda$new$0(PassiveCompletableFuture.java:33)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
	at org.apache.seatunnel.engine.server.checkpoint.PendingCheckpoint.acknowledgeTask(PendingCheckpoint.java:147)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.acknowledgeTask(CheckpointCoordinator.java:581)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.acknowledgeTask(CheckpointManager.java:261)
	at org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation.run(TaskAcknowledgeOperation.java:81)
	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:411)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:438)
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:601)
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:580)
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:541)
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:241)
	at com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:61)
	at org.apache.seatunnel.engine.server.utils.NodeEngineUtil.sendOperationToMasterNode(NodeEngineUtil.java:41)
	at org.apache.seatunnel.engine.server.execution.TaskExecutionContext.sendToMaster(TaskExecutionContext.java:43)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.ack(SeaTunnelTask.java:335)
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:196)
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:59)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:71)
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:51)
	at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:52)
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:161)
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:526)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.CompletionException: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException: ErrorCode:[RABBITMQ-05], ErrorDescription:[messages could not be acknowledged during checkpoint creation] - java.io.IOException
	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
	at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:1540)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.txCommit(AutorecoveringChannel.java:663)
	at org.apache.seatunnel.connectors.seatunnel.rabbitmq.source.RabbitmqSourceReader.acknowledgeDeliveryTags(RabbitmqSourceReader.java:196)
	at org.apache.seatunnel.connectors.seatunnel.rabbitmq.source.RabbitmqSourceReader.notifyCheckpointComplete(RabbitmqSourceReader.java:187)
	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.notifyCheckpointComplete(SourceFlowLifeCycle.java:230)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyCheckpointComplete$7(SeaTunnelTask.java:352)
	at org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky(ExceptionUtil.java:130)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$notifyAllAction$11(SeaTunnelTask.java:366)
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown Source)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
	at java.base/java.util.stream.ReferencePipeline$2$1.accept(Unknown Source)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source)
	at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
	at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(Unknown Source)
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(Unknown Source)
	at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
	at java.base/java.util.stream.ReferencePipeline.forEach(Unknown Source)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyAllAction(SeaTunnelTask.java:366)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.notifyCheckpointComplete(SeaTunnelTask.java:352)
	at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:91)
	at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48)
	at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.run(CheckpointFinishedOperation.java:81)
	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:411)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:438)
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:601)
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:580)
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:541)
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:241)
	at com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:61)
	at org.apache.seatunnel.engine.server.utils.NodeEngineUtil.sendOperationToMemberNode(NodeEngineUtil.java:51)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.sendOperationToMemberNode(CheckpointManager.java:272)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
	at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(Unknown Source)
	at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
	at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
	at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(Unknown Source)
	at java.base/java.util.stream.ReferencePipeline.toArray(Unknown Source)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.notifyCheckpointCompleted(CheckpointCoordinator.java:662)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:645)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.lambda$null$7(CheckpointCoordinator.java:388)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
	at org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture.internalComplete(PassiveCompletableFuture.java:70)
	at org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture.lambda$new$0(PassiveCompletableFuture.java:33)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
	at org.apache.seatunnel.engine.server.checkpoint.PendingCheckpoint.acknowledgeTask(PendingCheckpoint.java:147)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator.acknowledgeTask(CheckpointCoordinator.java:581)
	at org.apache.seatunnel.engine.server.checkpoint.CheckpointManager.acknowledgeTask(CheckpointManager.java:261)
	at org.apache.seatunnel.engine.server.checkpoint.operation.TaskAcknowledgeOperation.run(TaskAcknowledgeOperation.java:81)
	at com.hazelcast.spi.impl.operationservice.Operation.call(Operation.java:189)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call(OperationRunnerImpl.java:273)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:248)
	at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:213)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.run(OperationExecutorImpl.java:411)
	at com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl.runOrExecute(OperationExecutorImpl.java:438)
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvokeLocal(Invocation.java:601)
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.doInvoke(Invocation.java:580)
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke0(Invocation.java:541)
	at com.hazelcast.spi.impl.operationservice.impl.Invocation.invoke(Invocation.java:241)
	at com.hazelcast.spi.impl.operationservice.impl.InvocationBuilderImpl.invoke(InvocationBuilderImpl.java:61)
	at org.apache.seatunnel.engine.server.utils.NodeEngineUtil.sendOperationToMasterNode(NodeEngineUtil.java:41)
	at org.apache.seatunnel.engine.server.execution.TaskExecutionContext.sendToMaster(TaskExecutionContext.java:43)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.ack(SeaTunnelTask.java:335)
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:196)
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:59)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27)
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:71)
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:51)
	at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:52)
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:161)
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:526)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
	at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
	at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
	at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
	at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
	... 91 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
	at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517)
	at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341)
	at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
	at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
	at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739)
	at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
	... 1 more

Flink or Spark Version

zera

Java or Scala Version

jre:11

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@jobmission
Copy link
Author

Have tested in the PR, already fixed, thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
1 participant