Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingest -> Rollback - Ingest comes to different results. (kafka topic ingestion) #6733

Closed
YuriyGavrilov opened this issue Dec 12, 2022 · 21 comments
Labels
bug Bug report ingestion PR or Issue related to the ingestion of metadata stale

Comments

@YuriyGavrilov
Copy link
Contributor

YuriyGavrilov commented Dec 12, 2022

Describe the bug
There is an ingestion for the Kafka topic. Everything was normal and nothing foreshadowed trouble. ingestion works perfectly but with zero records update due to no read asscess rights to the topic. pic1.
image
after adding access rights everythinkg works nice
image
20 topics added. The problems appears after Rollback this 20 topics and trying to run ingestion again. The logs shows 20 errors. right the same quantity as topics with an overal log as zero record updated.

To Reproduce
Steps to reproduce the behavior:

  1. Go to ingestion run and run the igestion. ( for examples there will be 20 new records)
  2. Click on RALLBACK
  3. Go to ingestion run and run the igestion again. at this stage it shows zero record update.
  4. Scroll to the results and see zero recocd update again but with 20 (errors records) .
  5. See error in the logs

Expected behavior
Ingest -> Rollback - Ingest should goes to the same results.
there shoud be no such error in the log like ... for each updated entities.

   '[2022-12-12 11:09:21,222] ERROR    {datahub.ingestion.run.pipeline:367} - Failed to process some records. Continuing.\n'
           'Traceback (most recent call last):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 358, in run\n'
           '    for record_envelope in self.transform(record_envelopes):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 195, '
           'in transform\n'
           '    envelope = self._transform_or_record_mce(envelope)\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 130, '
           'in _transform_or_record_mce\n'
           '    old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/emitter/mce_builder.py", line 359, in '
           'get_aspect_if_available\n'
           '    assert can_add_aspect(mce, AspectType)\n'
           'AssertionError\n'

Screenshots
If applicable, add screenshots to help explain your problem.

Desktop (please complete the following information):

  • OS: [e.g. iOS]
  • Browser [e.g. chrome, safari]
  • Version [e.g. 22]

Additional context

Logs: 
           '[2022-12-12 11:09:21,186] ERROR    {datahub.ingestion.run.pipeline:367} - Failed to process some records. Continuing.\n'
           'Traceback (most recent call last):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 358, in run\n'
           '    for record_envelope in self.transform(record_envelopes):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 195, '
           'in transform\n'
           '    envelope = self._transform_or_record_mce(envelope)\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 130, '
           'in _transform_or_record_mce\n'
           '    old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/emitter/mce_builder.py", line 359, in '
           'get_aspect_if_available\n'
           '    assert can_add_aspect(mce, AspectType)\n'
           'AssertionError\n'
           '[2022-12-12 11:09:21,222] ERROR    {datahub.ingestion.run.pipeline:367} - Failed to process some records. Continuing.\n'
           'Traceback (most recent call last):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 358, in run\n'
           '    for record_envelope in self.transform(record_envelopes):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 195, '
           'in transform\n'
           '    envelope = self._transform_or_record_mce(envelope)\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 130, '
           'in _transform_or_record_mce\n'
           '    old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/emitter/mce_builder.py", line 359, in '
           'get_aspect_if_available\n'
           '    assert can_add_aspect(mce, AspectType)\n'
           'AssertionError\n'
           '[2022-12-12 11:09:21,224] ERROR    {datahub.ingestion.run.pipeline:367} - Failed to process some records. Continuing.\n'
           'Traceback (most recent call last):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 358, in run\n'
           '    for record_envelope in self.transform(record_envelopes):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 195, '
           'in transform\n'
           '    envelope = self._transform_or_record_mce(envelope)\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 130, '
           'in _transform_or_record_mce\n'
           '    old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/emitter/mce_builder.py", line 359, in '
           'get_aspect_if_available\n'
           '    assert can_add_aspect(mce, AspectType)\n'
           'AssertionError\n'
           '[2022-12-12 11:09:21,225] ERROR    {datahub.ingestion.run.pipeline:367} - Failed to process some records. Continuing.\n'
           'Traceback (most recent call last):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 358, in run\n'
           '    for record_envelope in self.transform(record_envelopes):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 195, '
           'in transform\n'
           '    envelope = self._transform_or_record_mce(envelope)\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 130, '
           'in _transform_or_record_mce\n'
           '    old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/emitter/mce_builder.py", line 359, in '
           'get_aspect_if_available\n'
           '    assert can_add_aspect(mce, AspectType)\n'
           'AssertionError\n'
           '[2022-12-12 11:09:21,231] ERROR    {datahub.ingestion.run.pipeline:367} - Failed to process some records. Continuing.\n'
           'Traceback (most recent call last):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 358, in run\n'
           '    for record_envelope in self.transform(record_envelopes):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 195, '
           'in transform\n'
           '    envelope = self._transform_or_record_mce(envelope)\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 130, '
           'in _transform_or_record_mce\n'
           '    old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/emitter/mce_builder.py", line 359, in '
           'get_aspect_if_available\n'
           '    assert can_add_aspect(mce, AspectType)\n'
           'AssertionError\n'
           '[2022-12-12 11:09:21,247] ERROR    {datahub.ingestion.run.pipeline:367} - Failed to process some records. Continuing.\n'
           'Traceback (most recent call last):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 358, in run\n'
           '    for record_envelope in self.transform(record_envelopes):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 195, '
           'in transform\n'
           '    envelope = self._transform_or_record_mce(envelope)\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 130, '
           'in _transform_or_record_mce\n'
           '    old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/emitter/mce_builder.py", line 359, in '
           'get_aspect_if_available\n'
           '    assert can_add_aspect(mce, AspectType)\n'
           'AssertionError\n'
           '[2022-12-12 11:09:21,252] ERROR    {datahub.ingestion.run.pipeline:367} - Failed to process some records. Continuing.\n'
           'Traceback (most recent call last):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 358, in run\n'
           '    for record_envelope in self.transform(record_envelopes):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 195, '
           'in transform\n'
           '    envelope = self._transform_or_record_mce(envelope)\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 130, '
           'in _transform_or_record_mce\n'
           '    old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/emitter/mce_builder.py", line 359, in '
           'get_aspect_if_available\n'
           '    assert can_add_aspect(mce, AspectType)\n'
           'AssertionError\n'
           '[2022-12-12 11:09:21,254] ERROR    {datahub.ingestion.run.pipeline:367} - Failed to process some records. Continuing.\n'
           'Traceback (most recent call last):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 358, in run\n'
           '    for record_envelope in self.transform(record_envelopes):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 195, '
           'in transform\n'
           '    envelope = self._transform_or_record_mce(envelope)\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 130, '
           'in _transform_or_record_mce\n'
           '    old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/emitter/mce_builder.py", line 359, in '
           'get_aspect_if_available\n'
           '    assert can_add_aspect(mce, AspectType)\n'
           'AssertionError\n'
           '[2022-12-12 11:09:21,265] ERROR    {datahub.ingestion.run.pipeline:367} - Failed to process some records. Continuing.\n'
           'Traceback (most recent call last):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 358, in run\n'
           '    for record_envelope in self.transform(record_envelopes):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 195, '
           'in transform\n'
           '    envelope = self._transform_or_record_mce(envelope)\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 130, '
           'in _transform_or_record_mce\n'
           '    old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/emitter/mce_builder.py", line 359, in '
           'get_aspect_if_available\n'
           '    assert can_add_aspect(mce, AspectType)\n'
           'AssertionError\n'
           '[2022-12-12 11:09:21,267] ERROR    {datahub.ingestion.run.pipeline:367} - Failed to process some records. Continuing.\n'
           'Traceback (most recent call last):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 358, in run\n'
           '    for record_envelope in self.transform(record_envelopes):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 195, '
           'in transform\n'
           '    envelope = self._transform_or_record_mce(envelope)\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 130, '
           'in _transform_or_record_mce\n'
           '    old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/emitter/mce_builder.py", line 359, in '
           'get_aspect_if_available\n'
           '    assert can_add_aspect(mce, AspectType)\n'
           'AssertionError\n'
           '[2022-12-12 11:09:21,272] ERROR    {datahub.ingestion.run.pipeline:367} - Failed to process some records. Continuing.\n'
           'Traceback (most recent call last):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 358, in run\n'
           '    for record_envelope in self.transform(record_envelopes):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 195, '
           'in transform\n'
           '    envelope = self._transform_or_record_mce(envelope)\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 130, '
           'in _transform_or_record_mce\n'
           '    old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/emitter/mce_builder.py", line 359, in '
           'get_aspect_if_available\n'
           '    assert can_add_aspect(mce, AspectType)\n'
           'AssertionError\n'
           '[2022-12-12 11:09:21,298] ERROR    {datahub.ingestion.run.pipeline:367} - Failed to process some records. Continuing.\n'
           'Traceback (most recent call last):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 358, in run\n'
           '    for record_envelope in self.transform(record_envelopes):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 195, '
           'in transform\n'
           '    envelope = self._transform_or_record_mce(envelope)\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 130, '
           'in _transform_or_record_mce\n'
           '    old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/emitter/mce_builder.py", line 359, in '
           'get_aspect_if_available\n'
           '    assert can_add_aspect(mce, AspectType)\n'
           'AssertionError\n'
           '[2022-12-12 11:09:21,300] ERROR    {datahub.ingestion.run.pipeline:367} - Failed to process some records. Continuing.\n'
           'Traceback (most recent call last):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 358, in run\n'
           '    for record_envelope in self.transform(record_envelopes):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 195, '
           'in transform\n'
           '    envelope = self._transform_or_record_mce(envelope)\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 130, '
           'in _transform_or_record_mce\n'
           '    old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/emitter/mce_builder.py", line 359, in '
           'get_aspect_if_available\n'
           '    assert can_add_aspect(mce, AspectType)\n'
           'AssertionError\n'
           '[2022-12-12 11:09:21,303] ERROR    {datahub.ingestion.run.pipeline:367} - Failed to process some records. Continuing.\n'
           'Traceback (most recent call last):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 358, in run\n'
           '    for record_envelope in self.transform(record_envelopes):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 195, '
           'in transform\n'
           '    envelope = self._transform_or_record_mce(envelope)\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 130, '
           'in _transform_or_record_mce\n'
           '    old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/emitter/mce_builder.py", line 359, in '
           'get_aspect_if_available\n'
           '    assert can_add_aspect(mce, AspectType)\n'
           'AssertionError\n'
           '[2022-12-12 11:09:21,306] ERROR    {datahub.ingestion.run.pipeline:367} - Failed to process some records. Continuing.\n'
           'Traceback (most recent call last):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 358, in run\n'
           '    for record_envelope in self.transform(record_envelopes):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 195, '
           'in transform\n'
           '    envelope = self._transform_or_record_mce(envelope)\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 130, '
           'in _transform_or_record_mce\n'
           '    old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/emitter/mce_builder.py", line 359, in '
           'get_aspect_if_available\n'
           '    assert can_add_aspect(mce, AspectType)\n'
           'AssertionError\n'
           '[2022-12-12 11:09:21,308] ERROR    {datahub.ingestion.run.pipeline:367} - Failed to process some records. Continuing.\n'
           'Traceback (most recent call last):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 358, in run\n'
           '    for record_envelope in self.transform(record_envelopes):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 195, '
           'in transform\n'
           '    envelope = self._transform_or_record_mce(envelope)\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 130, '
           'in _transform_or_record_mce\n'
           '    old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/emitter/mce_builder.py", line 359, in '
           'get_aspect_if_available\n'
           '    assert can_add_aspect(mce, AspectType)\n'
           'AssertionError\n'
           '[2022-12-12 11:09:21,309] ERROR    {datahub.ingestion.run.pipeline:367} - Failed to process some records. Continuing.\n'
           'Traceback (most recent call last):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 358, in run\n'
           '    for record_envelope in self.transform(record_envelopes):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 195, '
           'in transform\n'
           '    envelope = self._transform_or_record_mce(envelope)\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 130, '
           'in _transform_or_record_mce\n'
           '    old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/emitter/mce_builder.py", line 359, in '
           'get_aspect_if_available\n'
           '    assert can_add_aspect(mce, AspectType)\n'
           'AssertionError\n'
           '[2022-12-12 11:09:21,311] ERROR    {datahub.ingestion.run.pipeline:367} - Failed to process some records. Continuing.\n'
           'Traceback (most recent call last):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 358, in run\n'
           '    for record_envelope in self.transform(record_envelopes):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 195, '
           'in transform\n'
           '    envelope = self._transform_or_record_mce(envelope)\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 130, '
           'in _transform_or_record_mce\n'
           '    old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/emitter/mce_builder.py", line 359, in '
           'get_aspect_if_available\n'
           '    assert can_add_aspect(mce, AspectType)\n'
           'AssertionError\n'
           '[2022-12-12 11:09:21,313] ERROR    {datahub.ingestion.run.pipeline:367} - Failed to process some records. Continuing.\n'
           'Traceback (most recent call last):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 358, in run\n'
           '    for record_envelope in self.transform(record_envelopes):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 195, '
           'in transform\n'
           '    envelope = self._transform_or_record_mce(envelope)\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 130, '
           'in _transform_or_record_mce\n'
           '    old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/emitter/mce_builder.py", line 359, in '
           'get_aspect_if_available\n'
           '    assert can_add_aspect(mce, AspectType)\n'
           'AssertionError\n'
           '[2022-12-12 11:09:21,314] ERROR    {datahub.ingestion.run.pipeline:367} - Failed to process some records. Continuing.\n'
           'Traceback (most recent call last):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 358, in run\n'
           '    for record_envelope in self.transform(record_envelopes):\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 195, '
           'in transform\n'
           '    envelope = self._transform_or_record_mce(envelope)\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/ingestion/transformer/base_transformer.py", line 130, '
           'in _transform_or_record_mce\n'
           '    old_aspect = datahub.emitter.mce_builder.get_aspect_if_available(\n'
           '  File "/tmp/datahub/ingest/venv-kafka-0.9.2/lib/python3.9/site-packages/datahub/emitter/mce_builder.py", line 359, in '
           'get_aspect_if_available\n'
           '    assert can_add_aspect(mce, AspectType)\n'
           'AssertionError\n'
           '[2022-12-12 11:09:22,832] INFO     {datahub.ingestion.reporting.file_reporter:54} - Wrote SUCCESS report successfully to '
           "<_io.TextIOWrapper name='/tmp/datahub/ingest/f833aca4-b460-491a-8f7f-d3b06c0ec92e/ingestion_report.json' mode='w' encoding='UTF-8'>\n"
           '[2022-12-12 11:09:22,833] INFO     {datahub.cli.ingest_cli:135} - Finished metadata ingestion\n'
           '\n'
           'Cli report:\n'
           "{'cli_version': '0.9.2',\n"
@YuriyGavrilov YuriyGavrilov added the bug Bug report label Dec 12, 2022
@YuriyGavrilov
Copy link
Contributor Author

YuriyGavrilov commented Dec 12, 2022

There is a bad way to avoid this bug by changing platform_instance: ********
just simply change big chracters on small and ingestion works perfectly till new ROLLBACK applyed.

@treff7es
Copy link
Contributor

@YuriyGavrilov Did you have any transformer in your recipe?

@laulpogan laulpogan added the ingestion PR or Issue related to the ingestion of metadata label Dec 12, 2022
@YuriyGavrilov
Copy link
Contributor Author

YuriyGavrilov commented Dec 14, 2022

@treff7es Sure


sink:
type: datahub-rest
config:
server: 'http://datahub-gms-datahub-gms:8080'
token: '${ingestions_by_yua}'
source:
type: kafka
config:
connection:
consumer_config:
security.protocol: SSL
ssl.keystore.password: '${datacatalog_cert_key}'
ssl.keystore.location: /opt/certs/ns-cert/pkcs12.crt
bootstrap: 'bootstrap.kaas1pss25.epaas.s7.aero:9093'
schema_registry_url: 'https://ksr-eip-m1.s7.aero'
env: PROD
platform_instance: Kafka.Pss
pipeline_name: 'urn:li:dataHubIngestionSource:719290ee-ebe2-47fd-9686-de37a0f1d814'
transformers:
-
type: simple_add_dataset_tags
config:
tag_urns:
- 'urn:li:tag:product.PSS'
-
type: simple_add_dataset_ownership
config:
owner_urns:
- 'urn:li:corpuser:m.filippov'
ownership_type: PRODUCER
-
type: simple_add_dataset_domain
config:
domains:
- 'urn:li:domain:1c86cb03-d74b-4cbb-b502-f0550f54b15d'

@hsheth2
Copy link
Collaborator

hsheth2 commented Dec 14, 2022

We recently made some fixes to the transformers setup in acryl-datahub v0.9.3.2. Could you try bumping your cli version? https://datahubproject.io/docs/ui-ingestion/#advanced-running-with-a-specific-cli-version

@YuriyGavrilov
Copy link
Contributor Author

ok. but it takes some time. we deside to wait 0.9.4. I will inform you about results here.

@YuriyGavrilov
Copy link
Contributor Author

@hsheth2 Rollback working perfect for the new ingestion. One thing for this moment disappointing me is that there is a data still presenting from the previous ingestion runs. so i Rolled back everything but still see a data. Don't know how to remove it properly. If i change "platform_instance: Kafka.Pss" back to big character like "platform_instance: Kafka.PSS" i received double data. So curently There is no proper function to rollback "everything" without doubling data.

@YuriyGavrilov
Copy link
Contributor Author

maybe i need to change running environment back to old version and make roll back. will try it.

@YuriyGavrilov
Copy link
Contributor Author

no, it doesn't help me. After all rollbacks there should be no any data but i see it.

@hsheth2
Copy link
Collaborator

hsheth2 commented Jan 3, 2023

@YuriyGavrilov what do you mean by this, if there's still data present from previous runs?

Rollback working perfect for the new ingestion

If you'd like to delete everything in kafka and start over, you can use datahub delete --platform kafka to do so.

@jjoyce0510
Copy link
Collaborator

Yes - changing platform instance will cause all new entities to be minted. That being said, previous rollback should still end up clearing all data. However, it will take some time for all deletes to be applied (can take up to 30 minutes).

@chriscollins3456
Copy link
Collaborator

@YuriyGavrilov are you still experiencing issues with rollback? If not, I might close this issue as it's been without activity for some time now

@YuriyGavrilov
Copy link
Contributor Author

@hsheth2 Yes data still present when i rollback everything. thanks for this "datahub delete --platform kafka"

@YuriyGavrilov
Copy link
Contributor Author

@chriscollins3456 i think this case coud be closed due to existing solution to delete all platform. (datahub delete --platform kafka)

@YuriyGavrilov
Copy link
Contributor Author

this what i receive after run datahub delete --platform kafka

datahub delete --platform kafka
[2023-01-10 17:55:58,050] INFO {datahub.cli.delete_cli:286} - datahub configured with https://datahub.uat.edp.s7.aero/
[2023-01-10 17:55:58,132] ERROR {datahub.cli.cli_utils:418} - Failed to execute search query with b'\n\n \n <title>Not Found</title>\n \n<style type="text/css">\n html, body, pre {\n margin: 0;\n padding: 0;\n font-family: Monaco, 'Lucida Console', monospace;\n background: #ECECEC;\n }\n h1 {\n margin: 0;\n background: #AD632A;\n padding: 20px 45px;\n color: #fff;\n text-shadow: 1px 1px 1px rgba(0,0,0,.3);\n border-bottom: 1px solid #9F5805;\n font-size: 28px;\n }\n p#detail {\n margin: 0;\n padding: 15px 45px;\n background: #F6A960;\n border-top: 4px solid #D29052;\n color: #733512;\n text-shadow: 1px 1px 1px rgba(255,255,255,.3);\n font-size: 14px;\n border-bottom: 1px solid #BA7F5B;\n }\n </style>\n \n \n

Not Found

\n\n

\n For request 'POST /entities?action=search'\n

\n\n \n\n'
[2023-01-10 17:55:58,133] ERROR {datahub.entrypoints:213} - Command failed: 404 Client Error: Not Found for url: https://datahub.uat.edp.s7.aero/entities?action=search
Traceback (most recent call last):
File "/home/yuandronnikov/venv/datahub-env/lib/python3.8/site-packages/datahub/entrypoints.py", line 171, in main
sys.exit(datahub(standalone_mode=False, **kwargs))
File "/home/yuandronnikov/venv/datahub-env/lib/python3.8/site-packages/click/core.py", line 1130, in call
return self.main(*args, **kwargs)
File "/home/yuandronnikov/venv/datahub-env/lib/python3.8/site-packages/click/core.py", line 1055, in main
rv = self.invoke(ctx)
File "/home/yuandronnikov/venv/datahub-env/lib/python3.8/site-packages/click/core.py", line 1657, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/yuandronnikov/venv/datahub-env/lib/python3.8/site-packages/click/core.py", line 1404, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/yuandronnikov/venv/datahub-env/lib/python3.8/site-packages/click/core.py", line 760, in invoke
return __callback(*args, **kwargs)
File "/home/yuandronnikov/venv/datahub-env/lib/python3.8/site-packages/datahub/upgrade/upgrade.py", line 385, in async_wrapper
loop.run_until_complete(run_func_check_upgrade())
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/home/yuandronnikov/venv/datahub-env/lib/python3.8/site-packages/datahub/upgrade/upgrade.py", line 372, in run_func_check_upgrade
ret = await the_one_future
File "/home/yuandronnikov/venv/datahub-env/lib/python3.8/site-packages/datahub/upgrade/upgrade.py", line 365, in run_inner_func
return await loop.run_in_executor(
File "/usr/lib/python3.8/concurrent/futures/thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "/home/yuandronnikov/venv/datahub-env/lib/python3.8/site-packages/datahub/telemetry/telemetry.py", line 344, in wrapper
raise e
File "/home/yuandronnikov/venv/datahub-env/lib/python3.8/site-packages/datahub/telemetry/telemetry.py", line 296, in wrapper
res = func(*args, **kwargs)
File "/home/yuandronnikov/venv/datahub-env/lib/python3.8/site-packages/datahub/cli/delete_cli.py", line 235, in delete
deletion_result = delete_with_filters(
File "/home/yuandronnikov/venv/datahub-env/lib/python3.8/site-packages/datahub/telemetry/telemetry.py", line 344, in wrapper
raise e
File "/home/yuandronnikov/venv/datahub-env/lib/python3.8/site-packages/datahub/telemetry/telemetry.py", line 296, in wrapper
res = func(*args, **kwargs)
File "/home/yuandronnikov/venv/datahub-env/lib/python3.8/site-packages/datahub/cli/delete_cli.py", line 292, in delete_with_filters
urns = list(
File "/home/yuandronnikov/venv/datahub-env/lib/python3.8/site-packages/datahub/cli/cli_utils.py", line 419, in get_urns_by_filter
response.raise_for_status()
File "/home/yuandronnikov/venv/datahub-env/lib/python3.8/site-packages/requests/models.py", line 1021, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 404 Client Error: Not Found for url: https://datahub.uat.edp.s7.aero/entities?action=search

@hsheth2
Copy link
Collaborator

hsheth2 commented Jan 11, 2023

@YuriyGavrilov your CLI should be configured with the address of datahub-gms, not the address of the datahub-frontend.

That's likely why it's giving a 404 not found error

@YuriyGavrilov
Copy link
Contributor Author

YuriyGavrilov commented Jan 11, 2023

thanks @hsheth2 actually don't know yet how to do it

@YuriyGavrilov
Copy link
Contributor Author

there is also linked bug for cli delete option. - > #5992

@hsheth2
Copy link
Collaborator

hsheth2 commented Jan 13, 2023

@YuriyGavrilov that bug is with the delete command, but it seems this issue with with rollback - so my understanding is that that issue is different?

@YuriyGavrilov
Copy link
Contributor Author

@hsheth2 yes it is different

@github-actions
Copy link

This issue is stale because it has been open for 30 days with no activity. If you believe this is still an issue on the latest DataHub release please leave a comment with the version that you tested it with. If this is a question/discussion please head to https://slack.datahubproject.io. For feature requests please use https://feature-requests.datahubproject.io

@github-actions github-actions bot added the stale label Feb 16, 2023
@YuriyGavrilov
Copy link
Contributor Author

@YuriyGavrilov that bug is with the delete command, but it seems this issue with with rollback - so my understanding is that that issue is different?

ok let's close it.
there is another issue with delete #7212

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Bug report ingestion PR or Issue related to the ingestion of metadata stale
Projects
None yet
Development

No branches or pull requests

6 participants