Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] [sink elasticsearch] the savemode of sink-es conficts with es automatically creating indexes based on templates #7430

Closed
3 tasks done
jw-itq opened this issue Aug 19, 2024 · 2 comments · Fixed by #7443
Labels

Comments

@jw-itq
Copy link
Contributor

jw-itq commented Aug 19, 2024

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

Elasticsearch can automatically create indexes based on templates, such as creating indexes by year based on time, which eliminates the need for savemode. Therefore, the savemode of sink-es should be supported.

SeaTunnel Version

dev

SeaTunnel Config

env {
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
  checkpoint.timeout = 600000
  read_limit.rows_per_second=2000
}

source {
  SqlServer-CDC {
    result_table_name = "**"
    username = "sa"
    password = "****"
    database-names = ["**"]
    table-names = ["**.dbo.**"]
    base-url = "jdbc:sqlserver://127.0.0.1:1433;databaseName=**"
  }

}

transform {

 Sql {
  query = """
    SELECT
      'test_1' AS source,
      FORMATDATETIME ( createTime, 'yyyy' ) AS index_key,
      CONCAT ( 's_', CAST ( id AS string ) ) AS id
    FROM
      test_content
  """
 }

}


sink {
  Elasticsearch {
        hosts = ["127.0.0.1:9200"]
        index = "test_${index_key}"
        username="elastic"
        password="****"
        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
        data_save_mode = "APPEND_DATA"
        primary_keys=["id"]
    }
}

Running Command

seatunnel.sh -c config/test_es.conf

Error Exception

2024-08-19 13:34:32,351 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:211)
        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
Caused by: java.util.concurrent.CompletionException: org.apache.seatunnel.engine.common.exception.JobException: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: ErrorCode:[API-09], ErrorDescription:[Handle save mode failed]
        at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:377)
        at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:383)
        at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.Iterator.forEachRemaining(Iterator.java:116)
        at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
        at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
        at org.apache.seatunnel.engine.server.master.JobMaster.init(JobMaster.java:242)
        at org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$4(CoordinatorService.java:499)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalArgumentException: Illegal character in path at index 17: /test_${index_key}
        at org.elasticsearch.client.RestClient.buildUri(RestClient.java:569)
        at org.elasticsearch.client.RestClient$InternalRequest.<init>(RestClient.java:699)
        at org.elasticsearch.client.RestClient.performRequest(RestClient.java:234)
        at org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient.createIndex(EsRestClient.java:438)
        at org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient.createIndex(EsRestClient.java:428)
        at org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchCatalog.createTable(ElasticSearchCatalog.java:185)
        at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createTable(DefaultSaveModeHandler.java:186)
        at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.createSchemaWhenNotExist(DefaultSaveModeHandler.java:112)
        at org.apache.seatunnel.api.sink.DefaultSaveModeHandler.handleSchemaSaveMode(DefaultSaveModeHandler.java:69)
        at org.apache.seatunnel.api.sink.SaveModeHandler.handleSaveMode(SaveModeHandler.java:38)
        at org.apache.seatunnel.api.sink.SaveModeExecuteWrapper.execute(SaveModeExecuteWrapper.java:36)
        at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:375)
        ... 20 more
Caused by: java.net.URISyntaxException: Illegal character in path at index 17: /test_${index_key}
        at java.net.URI$Parser.fail(URI.java:2845)
        at java.net.URI$Parser.checkChars(URI.java:3018)
        at java.net.URI$Parser.parseHierarchical(URI.java:3102)
        at java.net.URI$Parser.parse(URI.java:3060)
        at java.net.URI.<init>(URI.java:588)
        at org.apache.http.client.utils.URIBuilder.<init>(URIBuilder.java:82)
        at org.elasticsearch.client.RestClient.buildUri(RestClient.java:563)
        ... 31 more

        at com.hazelcast.spi.impl.AbstractInvocationFuture.wrapInCompletionException(AbstractInvocationFuture.java:1347)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.cascadeException(AbstractInvocationFuture.java:1340)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.access$200(AbstractInvocationFuture.java:65)
        at com.hazelcast.spi.impl.AbstractInvocationFuture$ApplyNode.execute(AbstractInvocationFuture.java:1478)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockOtherNode(AbstractInvocationFuture.java:797)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockAll(AbstractInvocationFuture.java:759)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.complete0(AbstractInvocationFuture.java:1235)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionallyInternal(AbstractInvocationFuture.java:1223)
        at com.hazelcast.spi.impl.AbstractInvocationFuture.completeExceptionally(AbstractInvocationFuture.java:709)
        at com.hazelcast.client.impl.spi.impl.ClientInvocation.completeExceptionally(ClientInvocation.java:294)
        at com.hazelcast.client.impl.spi.impl.ClientInvocation.notifyExceptionWithOwnedPermission(ClientInvocation.java:321)
        at com.hazelcast.client.impl.spi.impl.ClientInvocation.notifyException(ClientInvocation.java:304)
        at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.handleResponse(ClientResponseHandlerSupplier.java:164)
        at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.process(ClientResponseHandlerSupplier.java:141)
        at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier.access$300(ClientResponseHandlerSupplier.java:60)
        at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier$DynamicResponseHandler.accept(ClientResponseHandlerSupplier.java:251)
        at com.hazelcast.client.impl.spi.impl.ClientResponseHandlerSupplier$DynamicResponseHandler.accept(ClientResponseHandlerSupplier.java:243)
        at com.hazelcast.client.impl.connection.tcp.TcpClientConnection.handleClientMessage(TcpClientConnection.java:245)
        at com.hazelcast.client.impl.protocol.util.ClientMessageDecoder.handleMessage(ClientMessageDecoder.java:135)
        at com.hazelcast.client.impl.protocol.util.ClientMessageDecoder.onRead(ClientMessageDecoder.java:89)
        at com.hazelcast.internal.networking.nio.NioInboundPipeline.process(NioInboundPipeline.java:136)
        at com.hazelcast.internal.networking.nio.NioThread.processSelectionKey(NioThread.java:383)
        at com.hazelcast.internal.networking.nio.NioThread.processSelectionKeys(NioThread.java:368)
        at com.hazelcast.internal.networking.nio.NioThread.selectLoop(NioThread.java:294)
        at com.hazelcast.internal.networking.nio.NioThread.executeRun(NioThread.java:249)
        at com.hazelcast.internal.util.executor.HazelcastManagedThread.run(HazelcastManagedThread.java:102)
Caused by: org.apache.seatunnel.engine.common.exception.JobException: org.apache.seatunnel.common.exception.SeaTunnelRuntimeException: ErrorCode:[API-09], ErrorDescription:[Handle save mode failed]
        at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:377)
        at org.apache.seatunnel.engine.server.master.JobMaster.handleSaveMode(JobMaster.java:383)
        at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
        at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
        at java.util.Iterator.forEachRemaining(Iterator.java:116)
        at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
        at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
        at org.apache.seatunnel.engine.server.master.JobMaster.init(JobMaster.java:242)
        at org.apache.seatunnel.engine.server.CoordinatorService.lambda$submitJob$4(CoordinatorService.java:499)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

Zeta or Flink or Spark Version

No response

Java or Scala Version

jdk1.8

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@jw-itq jw-itq added the bug label Aug 19, 2024
jw-itq added a commit to jw-itq/seatunnel that referenced this issue Aug 19, 2024
@liunaijie
Copy link
Member

Hi @jw-itq , the error message is Illegal character in path at index 17: /test_${index_key}. it mens the index name not replace well.

savemode function is execute before job execute, but for your case, it need executre in run time.
you need update the es writer, replace the index name when receive record. and create this index when not exist.

@jw-itq
Copy link
Contributor Author

jw-itq commented Aug 19, 2024

Hi @jw-itq , the error message is Illegal character in path at index 17: /test_${index_key}. it mens the index name not replace well.

savemode function is execute before job execute, but for your case, it need executre in run time. you need update the es writer, replace the index name when receive record. and create this index when not exist.

Thanks. I'll try it

jw-itq added a commit to jw-itq/seatunnel that referenced this issue Aug 21, 2024
jw-itq added a commit to jw-itq/seatunnel that referenced this issue Aug 21, 2024
jw-itq added a commit to jw-itq/seatunnel that referenced this issue Aug 21, 2024
jw-itq added a commit to jw-itq/seatunnel that referenced this issue Aug 21, 2024
Hisoka-X pushed a commit to jw-itq/seatunnel that referenced this issue Aug 22, 2024
Hisoka-X pushed a commit to jw-itq/seatunnel that referenced this issue Aug 22, 2024
Hisoka-X pushed a commit to jw-itq/seatunnel that referenced this issue Aug 22, 2024
Hisoka-X pushed a commit to jw-itq/seatunnel that referenced this issue Aug 22, 2024
Hisoka-X pushed a commit to jw-itq/seatunnel that referenced this issue Aug 22, 2024
Hisoka-X pushed a commit to jw-itq/seatunnel that referenced this issue Nov 15, 2024
Hisoka-X pushed a commit to jw-itq/seatunnel that referenced this issue Nov 15, 2024
Hisoka-X pushed a commit to jw-itq/seatunnel that referenced this issue Nov 15, 2024
Hisoka-X pushed a commit to jw-itq/seatunnel that referenced this issue Nov 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants