Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workers cannot share data directory (second worker crashes with RocksIOError) #299

Open
2 tasks done
Ed-von-Schleck opened this issue Feb 26, 2019 · 3 comments
Open
2 tasks done

Comments

@Ed-von-Schleck
Copy link

Ed-von-Schleck commented Feb 26, 2019

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

The following errors can be reproduced both with the current stable version (1.4.6) and with the development version (1.5.0b1). The worker code is:

import faust

app = faust.App(
    "example_app",
    broker="kafka://localhost:9092",
    store="rocksdb://",
)

example_topic = app.topic(
    "example_topic",
    partitions=2,
)

example_table = app.Table(
    "example_table",
    partitions=2,
)


@app.agent(example_topic)
async def work(items):
    async for item in items:
        pass

Starting the first worker with faust -A example worker -l info --without-web works fine. As soon as I start a second worker, I get the traceback below and the worker exits.

This is the same error as #184, however my setup is even simpler than that. Also, there seem to be databases for both partitions:

$ ls example_app-data/v1/tables/
example_table-0.db  example_table-1.db

Expected behavior

The second worker assumes responsibility for one partition and the corresponding table.

Actual behavior

The second worker cannot get the lock for the table it is supposed to be responsible for and crashes.

Full traceback

$ faust -A example worker -l info --without-web
┌ƒaµS† v1.5.0b1─────────────────────────────────────────────────────────────────────────────────────────┐
│ id        │ example_app                                                                               │
│ transport │ [URL('kafka://localhost:9092')]                                                           │
│ store     │ rocksdb:                                                                                  │
│ log       │ -stderr- (info)                                                                           │
│ pid       │ 21659                                                                                     │
│ hostname  │ <snip>                                                                                    │
│ platform  │ CPython 3.7.2 (Linux x86_64)                                                              │
│ drivers   │ aiokafka=0.5.2 aiohttp=3.5.4                                                              │
│ datadir   │ <snip>/minimal_failing_faust_example/example_app-data    │
│ appdir    │ <snip>/minimal_failing_faust_example/example_app-data/v1 │
└───────────┴───────────────────────────────────────────────────────────────────────────────────────────┘
[2019-02-26 17:34:45,937: INFO]: [^Worker]: Starting... 
[2019-02-26 17:34:45,940: INFO]: [^-App]: Starting... 
[2019-02-26 17:34:45,940: INFO]: [^--Monitor]: Starting... 
[2019-02-26 17:34:45,940: INFO]: [^--Producer]: Starting... 
[2019-02-26 17:34:45,952: INFO]: [^--Consumer]: Starting... 
[2019-02-26 17:34:45,953: INFO]: [^---MethodQueue]: Starting... 
[2019-02-26 17:34:45,953: INFO]: [^---AIOKafkaConsumerThread]: Starting... 
[2019-02-26 17:34:45,969: INFO]: [^----MethodQueue]: Starting... 
[2019-02-26 17:34:46,942: INFO]: [^--LeaderAssignor]: Starting... 
[2019-02-26 17:34:46,944: INFO]: [^--Producer]: Creating topic example_app-__assignor-__leader 
[2019-02-26 17:34:46,945: INFO]: [^--ReplyConsumer]: Starting... 
[2019-02-26 17:34:46,945: INFO]: [^AgentManager]: Starting... 
[2019-02-26 17:34:46,946: INFO]: [^Agent: example.work]: Starting... 
[2019-02-26 17:34:46,951: INFO]: [^-OneForOneSupervisor]: Starting... 
[2019-02-26 17:34:46,952: INFO]: [^--Conductor]: Starting... 
[2019-02-26 17:34:46,953: INFO]: [^--TableManager]: Starting... 
[2019-02-26 17:34:47,955: INFO]: [^--Table: example_table]: Starting... 
[2019-02-26 17:34:48,001: INFO]: [^---Store: example_table]: Starting... 
[2019-02-26 17:34:48,004: INFO]: [^--Producer]: Creating topic example_app-example_table-changelog 
[2019-02-26 17:34:48,005: INFO]: [^---Recovery]: Starting... 
[2019-02-26 17:34:48,956: INFO]: [^--Producer]: Creating topic example_app-example_table-changelog 
[2019-02-26 17:34:48,958: INFO]: [^--Producer]: Creating topic example_app-__assignor-__leader 
[2019-02-26 17:34:48,990: INFO]: Updating subscribed topics to: frozenset({'example_topic', 'example_app-__assignor-__leader', 'example_app-example_table-changelog'}) 
[2019-02-26 17:34:48,992: INFO]: Subscribed to topic(s): {'example_topic', 'example_app-__assignor-__leader', 'example_app-example_table-changelog'} 
[2019-02-26 17:34:49,005: INFO]: Discovered coordinator 0 for group example_app 
[2019-02-26 17:34:49,006: INFO]: Revoking previously assigned partitions set() for group example_app 
[2019-02-26 17:34:49,990: INFO]: (Re-)joining group example_app 
[2019-02-26 17:34:52,467: INFO]: Joined group 'example_app' (generation 2) with member_id faust-1.5.0b1-3ac6df93-d864-4dc4-b0f6-d0d57a4816b9 
[2019-02-26 17:34:52,480: INFO]: Successfully synced group example_app with generation 2 
[2019-02-26 17:34:52,484: INFO]: Setting newly assigned partitions {TopicPartition(topic='example_topic', partition=0), TopicPartition(topic='example_app-example_table-changelog', partition=0), TopicPartition(topic='example_app-example_table-changelog', partition=1)} for group example_app 
[2019-02-26 17:34:52,995: INFO]: [^---Store: example_table]: DB for partition 0 is locked! Retry in 1s... 
[2019-02-26 17:34:53,999: INFO]: [^---Store: example_table]: DB for partition 0 is locked! Retry in 1s... 
[2019-02-26 17:34:55,003: INFO]: [^---Store: example_table]: DB for partition 0 is locked! Retry in 1s... 
[2019-02-26 17:34:56,007: INFO]: [^---Store: example_table]: DB for partition 0 is locked! Retry in 1s... 
[2019-02-26 17:34:57,010: ERROR]: [^-App]: Crashed reason=RocksIOError(b'IO error: While lock file: example_app-data/v1/tables/example_table-0.db/LOCK: Resource temporarily unavailable') 
Traceback (most recent call last):
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/stores/rocksdb.py", line 218, in _db_for_partition
    return self._dbs[partition]
KeyError: 0

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/app/base.py", line 1396, in _on_partitions_assigned
    assigned, revoked, newly_assigned)
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/app/base.py", line 1161, in corowrapped
    await ret
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/tables/manager.py", line 151, in on_rebalance
    await T(table.on_rebalance)(assigned, revoked, newly_assigned)
  File <snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/app/base.py", line 1161, in corowrapped
    await ret
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/tables/base.py", line 471, in on_rebalance
    await self.data.on_rebalance(self, assigned, revoked, newly_assigned)
  File "<snip>.virtualenvs/faust/lib/python3.7/site-packages/faust/stores/rocksdb.py", line 263, in on_rebalance
    await self.assign_partitions(table, newly_assigned)
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/stores/rocksdb.py", line 282, in assign_partitions
    await self._try_open_db_for_partition(tp.partition)
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/stores/rocksdb.py", line 290, in _try_open_db_for_partition
    return self._db_for_partition(partition)
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/stores/rocksdb.py", line 220, in _db_for_partition
    db = self._dbs[partition] = self._open_for_partition(partition)
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/stores/rocksdb.py", line 224, in _open_for_partition
    return self.options.open(self.partition_path(partition))
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/stores/rocksdb.py", line 111, in open
    return rocksdb.DB(str(path), self.as_options(), read_only=read_only)
  File "rocksdb/_rocksdb.pyx", line 1636, in rocksdb._rocksdb.DB.__cinit__
  File "rocksdb/_rocksdb.pyx", line 85, in rocksdb._rocksdb.check_status
rocksdb.errors.RocksIOError: b'IO error: While lock file: example_app-data/v1/tables/example_table-0.db/LOCK: Resource temporarily unavailable'
[2019-02-26 17:34:57,018: INFO]: [^Worker]: Stopping... 
[2019-02-26 17:34:57,019: INFO]: [^-App]: Stopping... 
[2019-02-26 17:34:57,021: INFO]: [^--Fetcher]: Stopping... 
[2019-02-26 17:34:57,021: INFO]: [^-App]: Wait for streams... 
[2019-02-26 17:34:57,023: INFO]: [^-App]: Flush producer buffer... 
[2019-02-26 17:34:57,024: INFO]: [^--TableManager]: Stopping... 
[2019-02-26 17:34:57,024: INFO]: [^--Fetcher]: Stopping... 
[2019-02-26 17:34:57,025: INFO]: [^---Recovery]: Stopping... 
[2019-02-26 17:34:57,027: INFO]: [^--Table: example_table]: Stopping... 
[2019-02-26 17:34:57,027: INFO]: [^---Store: example_table]: Stopping... 
[2019-02-26 17:34:57,028: INFO]: [^--Conductor]: Stopping... 
[2019-02-26 17:34:57,030: INFO]: [^AgentManager]: Stopping... 
[2019-02-26 17:34:57,030: INFO]: [^Agent: example.work]: Stopping... 
[2019-02-26 17:34:57,031: INFO]: [^-OneForOneSupervisor]: Stopping... 
[2019-02-26 17:34:57,033: INFO]: [^--ReplyConsumer]: Stopping... 
[2019-02-26 17:34:57,033: INFO]: [^--LeaderAssignor]: Stopping... 
[2019-02-26 17:34:57,034: INFO]: [^--Consumer]: Stopping... 
[2019-02-26 17:34:57,034: INFO]: [^---AIOKafkaConsumerThread]: Stopping... 
[2019-02-26 17:34:57,216: INFO]: LeaveGroup request succeeded 
[2019-02-26 17:34:58,003: INFO]: [^---MethodQueue]: Stopping... 
[2019-02-26 17:34:58,005: INFO]: [^--Producer]: Stopping... 
[2019-02-26 17:34:58,007: INFO]: [^--Monitor]: Stopping... 
[2019-02-26 17:34:58,009: INFO]: [^Worker]: Gathering service tasks... 
[2019-02-26 17:34:58,010: INFO]: [^Worker]: Gathering all futures... 
[2019-02-26 17:35:00,019: INFO]: [^Worker]: Closing event loop 
[2019-02-26 17:35:00,021: CRITICAL]: [^Worker]: We experienced a crash! Reraising original exception... 
Traceback (most recent call last):
  File "<snip>/.virtualenvs/faust/bin/faust", line 10, in <module>
    sys.exit(cli())
    │        └ <faust.cli.base._Group object at 0x7fb646e8ef98><module 'sys' (built-in)>
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/click/core.py", line 722, in __call__
    return self.main(*args, **kwargs)
           │          │       └ {}
           │          └ ()
           └ <faust.cli.base._Group object at 0x7fb646e8ef98>
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/click/core.py", line 697, in main
    rv = self.invoke(ctx)
         │           └ <click.core.Context object at 0x7fb6496b6668><faust.cli.base._Group object at 0x7fb646e8ef98>
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
           │               │                      └ <click.core.Context object at 0x7fb646e709e8>
           │               └ <click.core.Context object at 0x7fb646e709e8><function MultiCommand.invoke.<locals>._process_result at 0x7fb6496e3268>
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/click/core.py", line 895, in invoke
    return ctx.invoke(self.callback, **ctx.params)
           │          │                └ <click.core.Context object at 0x7fb646e709e8>
           │          └ <click.core.Command object at 0x7fb646e027b8><click.core.Context object at 0x7fb646e709e8>
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/click/core.py", line 535, in invoke
    return callback(*args, **kwargs)
           │         │       └ {'with_web': False, 'web_port': None, 'web_transport': None, 'web_bind': None, 'web_host': '<snip>'}
           │         └ ()
           └ <function worker at 0x7fb646e73ea0>
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/click/decorators.py", line 17, in new_func
    return f(get_current_context(), *args, **kwargs)
           │ │                       │       └ {'with_web': False, 'web_port': None, 'web_transport': None, 'web_bind': None, 'web_host': '<snip>'}
           │ │                       └ ()
           │ └ <function get_current_context at 0x7fb648d719d8><function worker at 0x7fb646e73e18>
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/cli/base.py", line 463, in _inner
    return cmd()
           └ <faust.cli.worker.worker object at 0x7fb646e069e8>
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/cli/base.py", line 538, in __call__
    self.run_using_worker(*args, **kwargs)
    │                      │       └ {}
    │                      └ ()
    └ <faust.cli.worker.worker object at 0x7fb646e069e8>
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/cli/base.py", line 547, in run_using_worker
    return worker.execute_from_commandline()
           └ <Worker: crashed []>
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/mode/worker.py", line 227, in execute_from_commandline
    self.stop_and_shutdown()
    └ <Worker: crashed []>
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/mode/worker.py", line 237, in stop_and_shutdown
    self._shutdown_loop()
    └ <Worker: crashed []>
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/mode/worker.py", line 265, in _shutdown_loop
    raise self._crash_reason from self._crash_reason
          │                       └ <Worker: crashed []><Worker: crashed []>
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/app/base.py", line 1396, in _on_partitions_assigned
    assigned, revoked, newly_assigned)
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/app/base.py", line 1161, in corowrapped
    await ret
          └ <coroutine object TableManager.on_rebalance at 0x7fb6441417c8>
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/tables/manager.py", line 151, in on_rebalance
    await T(table.on_rebalance)(assigned, revoked, newly_assigned)
          │ │                   │         │        └ {TP(topic='example_topic', partition=0), TP(topic='example_app-example_table-changelog', partition=0), TP(topic='example_app-exa...
          │ │                   │         └ set()
          │ │                   └ {TP(topic='example_topic', partition=0), TP(topic='example_app-example_table-changelog', partition=0), TP(topic='example_app-exa...
          │ └ <Table: stopping example_table><bound method App.traced of <App(example_app): [URL('kafka://localhost:9092')] crashed agents(<AgentManager: crashed >) topics(3...
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/app/base.py", line 1161, in corowrapped
    await ret
          └ <coroutine object Collection.on_rebalance at 0x7fb6441418c8>
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/tables/base.py", line 471, in on_rebalance
    await self.data.on_rebalance(self, assigned, revoked, newly_assigned)
          │                      │     │         │        └ {TP(topic='example_topic', partition=0), TP(topic='example_app-example_table-changelog', partition=0), TP(topic='example_app-exa...
          │                      │     │         └ set()
          │                      │     └ {TP(topic='example_topic', partition=0), TP(topic='example_app-example_table-changelog', partition=0), TP(topic='example_app-exa...
          │                      └ <Table: stopping example_table><Table: stopping example_table>
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/stores/rocksdb.py", line 263, in on_rebalance
    await self.assign_partitions(table, newly_assigned)
          │                      │      └ {TP(topic='example_topic', partition=0), TP(topic='example_app-example_table-changelog', partition=0), TP(topic='example_app-exa...
          │                      └ <Table: stopping example_table><Store: stopping table_name=example_table url=rocksdb:example_table>
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/stores/rocksdb.py", line 282, in assign_partitions
    await self._try_open_db_for_partition(tp.partition)
          │                               └ TP(topic='example_app-example_table-changelog', partition=0)
          └ <Store: stopping table_name=example_table url=rocksdb:example_table>
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/stores/rocksdb.py", line 290, in _try_open_db_for_partition
    return self._db_for_partition(partition)
           │                      └ 0<Store: stopping table_name=example_table url=rocksdb:example_table>
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/stores/rocksdb.py", line 220, in _db_for_partition
    db = self._dbs[partition] = self._open_for_partition(partition)
         │         │            │                        └ 0
         │         │            └ <Store: stopping table_name=example_table url=rocksdb:example_table>
         │         └ 0<Store: stopping table_name=example_table url=rocksdb:example_table>
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/stores/rocksdb.py", line 224, in _open_for_partition
    return self.options.open(self.partition_path(partition))
           │                 │                   └ 0
           │                 └ <Store: stopping table_name=example_table url=rocksdb:example_table><Store: stopping table_name=example_table url=rocksdb:example_table>
  File "<snip>/.virtualenvs/faust/lib/python3.7/site-packages/faust/stores/rocksdb.py", line 111, in open
    return rocksdb.DB(str(path), self.as_options(), read_only=read_only)
           │              │      │                            └ False
           │              │      └ <faust.stores.rocksdb.RocksDBOptions object at 0x7fb64412eb00>
           │              └ PosixPath('example_app-data/v1/tables/example_table-0.db')
           └ <module 'rocksdb' from '<snip>/.virtualenvs/faust/lib/python3.7/site-packages/rocksdb/__init__.py'>
  File "rocksdb/_rocksdb.pyx", line 1636, in rocksdb._rocksdb.DB.__cinit__
  File "rocksdb/_rocksdb.pyx", line 85, in rocksdb._rocksdb.check_status
rocksdb.errors.RocksIOError: b'IO error: While lock file: example_app-data/v1/tables/example_table-0.db/LOCK: Resource temporarily unavailable'

Versions

  • Python version: 3.7
  • Faust version: 1.5.0b1 and 1.4.6
  • Operating system: Fedora 29
  • Kafka version: 2.11-2.1.0
  • RocksDB version: 5.7.3
@Ed-von-Schleck
Copy link
Author

For reference, the first worker had this log output:

faust -A example worker -l info --without-web
┌ƒaµS† v1.5.0b1─────────────────────────────────────────────────────────────────────────────────────────┐
│ id        │ example_app                                                                               │
│ transport │ [URL('kafka://localhost:9092')]                                                           │
│ store     │ rocksdb:                                                                                  │
│ log       │ -stderr- (info)                                                                           │
│ pid       │ 24048                                                                                     │
│ hostname  │                                                                                           │
│ platform  │ CPython 3.7.2 (Linux x86_64)                                                              │
│ drivers   │ aiokafka=0.5.2 aiohttp=3.5.4                                                              │
│ datadir   │ <snip>/minimal_failing_faust_example/example_app-data                                     │
│ appdir    │ <snip>/minimal_failing_faust_example/example_app-data/v1                                  │
└───────────┴───────────────────────────────────────────────────────────────────────────────────────────┘
[2019-02-26 17:46:52,516: INFO]: [^Worker]: Starting... 
[2019-02-26 17:46:52,518: INFO]: [^-App]: Starting... 
[2019-02-26 17:46:52,519: INFO]: [^--Monitor]: Starting... 
[2019-02-26 17:46:52,519: INFO]: [^--Producer]: Starting... 
[2019-02-26 17:46:52,535: INFO]: [^--Consumer]: Starting... 
[2019-02-26 17:46:52,535: INFO]: [^---MethodQueue]: Starting... 
[2019-02-26 17:46:52,536: INFO]: [^---AIOKafkaConsumerThread]: Starting... 
[2019-02-26 17:46:52,552: INFO]: [^----MethodQueue]: Starting... 
[2019-02-26 17:46:53,521: INFO]: [^--LeaderAssignor]: Starting... 
[2019-02-26 17:46:53,523: INFO]: [^--Producer]: Creating topic example_app-__assignor-__leader 
[2019-02-26 17:46:53,524: INFO]: [^--ReplyConsumer]: Starting... 
[2019-02-26 17:46:53,524: INFO]: [^AgentManager]: Starting... 
[2019-02-26 17:46:53,525: INFO]: [^Agent: example.work]: Starting... 
[2019-02-26 17:46:53,527: INFO]: [^-OneForOneSupervisor]: Starting... 
[2019-02-26 17:46:53,528: INFO]: [^--Conductor]: Starting... 
[2019-02-26 17:46:53,528: INFO]: [^--TableManager]: Starting... 
[2019-02-26 17:46:54,530: INFO]: [^--Table: example_table]: Starting... 
[2019-02-26 17:46:54,572: INFO]: [^---Store: example_table]: Starting... 
[2019-02-26 17:46:54,574: INFO]: [^--Producer]: Creating topic example_app-example_table-changelog 
[2019-02-26 17:46:54,575: INFO]: [^---Recovery]: Starting... 
[2019-02-26 17:46:55,530: INFO]: [^--Producer]: Creating topic example_app-__assignor-__leader 
[2019-02-26 17:46:55,531: INFO]: [^--Producer]: Creating topic example_app-example_table-changelog 
[2019-02-26 17:46:55,568: INFO]: Updating subscribed topics to: frozenset({'example_topic', 'example_app-__assignor-__leader', 'example_app-example_table-changelog'}) 
[2019-02-26 17:46:55,571: INFO]: Subscribed to topic(s): {'example_topic', 'example_app-__assignor-__leader', 'example_app-example_table-changelog'} 
[2019-02-26 17:46:55,590: INFO]: Discovered coordinator 0 for group example_app 
[2019-02-26 17:46:55,592: INFO]: Revoking previously assigned partitions set() for group example_app 
[2019-02-26 17:46:56,569: INFO]: (Re-)joining group example_app 
[2019-02-26 17:46:56,578: INFO]: Joined group 'example_app' (generation 1) with member_id faust-1.5.0b1-89fea516-25a4-41ae-824e-2c1fb9792506 
[2019-02-26 17:46:56,580: INFO]: Elected group leader -- performing partition assignments using faust 
[2019-02-26 17:46:56,592: INFO]: Successfully synced group example_app with generation 1 
[2019-02-26 17:46:56,595: INFO]: Setting newly assigned partitions {TopicPartition(topic='example_topic', partition=1), TopicPartition(topic='example_topic', partition=0), TopicPartition(topic='example_app-__assignor-__leader', partition=0), TopicPartition(topic='example_app-example_table-changelog', partition=0), TopicPartition(topic='example_app-example_table-changelog', partition=1)} for group example_app 
[2019-02-26 17:47:01,614: INFO]: [^---Recovery]: Highwater for active changelog partitions:
┌Highwater - Active───────────────────┬───────────┬───────────┐
│ topic                               │ partition │ highwater │
├─────────────────────────────────────┼───────────┼───────────┤
│ example_app-example_table-changelog │ 0         │ -1        │
│ example_app-example_table-changelog │ 1         │ -1        │
└─────────────────────────────────────┴───────────┴───────────┘ 
[2019-02-26 17:47:03,533: INFO]: [^---Recovery]: active offsets at start of reading:
┌Reading Starts At - Active───────────┬───────────┬────────┐
│ topic                               │ partition │ offset │
├─────────────────────────────────────┼───────────┼────────┤
│ example_app-example_table-changelog │ 0         │ -1     │
│ example_app-example_table-changelog │ 1         │ -1     │
└─────────────────────────────────────┴───────────┴────────┘ 
[2019-02-26 17:47:03,617: INFO]: [^---Recovery]: standby offsets at start of reading:
┌Reading Starts At - Standby─┐
│ topic │ partition │ offset │
└───────┴───────────┴────────┘ 
[2019-02-26 17:47:03,744: INFO]: [^---Recovery]: Resuming flow... 
[2019-02-26 17:47:03,744: INFO]: [^---Recovery]: Recovery complete 
[2019-02-26 17:47:03,847: INFO]: [^---Recovery]: Restore complete! 
[2019-02-26 17:47:03,848: INFO]: [^---Recovery]: Seek stream partitions to committed offsets. 
[2019-02-26 17:47:04,580: INFO]: [^--Fetcher]: Starting... 
[2019-02-26 17:47:04,581: INFO]: [^---Recovery]: Worker ready 
[2019-02-26 17:47:04,583: INFO]: [^Worker]: Ready 
[2019-02-26 17:47:12,619: WARNING]: Heartbeat failed for group example_app because it is rebalancing 
[2019-02-26 17:47:12,620: INFO]: Revoking previously assigned partitions frozenset({TopicPartition(topic='example_app-__assignor-__leader', partition=0), TopicPartition(topic='example_app-example_table-changelog', partition=0), TopicPartition(topic='example_app-example_table-changelog', partition=1), TopicPartition(topic='example_topic', partition=1), TopicPartition(topic='example_topic', partition=0)}) for group example_app 
[2019-02-26 17:47:12,656: INFO]: (Re-)joining group example_app 
[2019-02-26 17:47:12,662: INFO]: Joined group 'example_app' (generation 2) with member_id faust-1.5.0b1-89fea516-25a4-41ae-824e-2c1fb9792506 
[2019-02-26 17:47:12,663: INFO]: Elected group leader -- performing partition assignments using faust 
[2019-02-26 17:47:12,672: INFO]: Successfully synced group example_app with generation 2 
[2019-02-26 17:47:12,675: INFO]: Setting newly assigned partitions {TopicPartition(topic='example_app-__assignor-__leader', partition=0), TopicPartition(topic='example_topic', partition=1), TopicPartition(topic='example_app-example_table-changelog', partition=0), TopicPartition(topic='example_app-example_table-changelog', partition=1)} for group example_app 
[2019-02-26 17:47:17,753: INFO]: [^---Recovery]: Highwater for active changelog partitions:
┌Highwater - Active───────────────────┬───────────┬───────────┐
│ topic                               │ partition │ highwater │
├─────────────────────────────────────┼───────────┼───────────┤
│ example_app-example_table-changelog │ 1         │ -1        │
└─────────────────────────────────────┴───────────┴───────────┘ 
[2019-02-26 17:47:19,591: INFO]: [^---Recovery]: active offsets at start of reading:
┌Reading Starts At - Active───────────┬───────────┬────────┐
│ topic                               │ partition │ offset │
├─────────────────────────────────────┼───────────┼────────┤
│ example_app-example_table-changelog │ 1         │ -1     │
└─────────────────────────────────────┴───────────┴────────┘ 
[2019-02-26 17:47:19,593: INFO]: [^---Recovery]: Still fetching. Remaining: {} 
[2019-02-26 17:47:19,596: WARNING]: Heartbeat failed for group example_app because it is rebalancing 
[2019-02-26 17:47:19,597: INFO]: Revoking previously assigned partitions frozenset({TopicPartition(topic='example_app-__assignor-__leader', partition=0), TopicPartition(topic='example_topic', partition=1), TopicPartition(topic='example_app-example_table-changelog', partition=0), TopicPartition(topic='example_app-example_table-changelog', partition=1)}) for group example_app 
[2019-02-26 17:47:19,775: INFO]: (Re-)joining group example_app 
[2019-02-26 17:47:19,776: INFO]: Joined group 'example_app' (generation 3) with member_id faust-1.5.0b1-89fea516-25a4-41ae-824e-2c1fb9792506 
[2019-02-26 17:47:19,776: INFO]: Elected group leader -- performing partition assignments using faust 
[2019-02-26 17:47:19,780: INFO]: Successfully synced group example_app with generation 3 
[2019-02-26 17:47:19,781: INFO]: Setting newly assigned partitions {TopicPartition(topic='example_topic', partition=1), TopicPartition(topic='example_topic', partition=0), TopicPartition(topic='example_app-__assignor-__leader', partition=0), TopicPartition(topic='example_app-example_table-changelog', partition=0), TopicPartition(topic='example_app-example_table-changelog', partition=1)} for group example_app 
[2019-02-26 17:47:24,596: INFO]: [^---Recovery]: Highwater for active changelog partitions:
┌Highwater - Active───────────────────┬───────────┬───────────┐
│ topic                               │ partition │ highwater │
├─────────────────────────────────────┼───────────┼───────────┤
│ example_app-example_table-changelog │ 0         │ -1        │
│ example_app-example_table-changelog │ 1         │ -1        │
└─────────────────────────────────────┴───────────┴───────────┘ 
[2019-02-26 17:47:24,598: INFO]: [^---Recovery]: Still fetching. Remaining: {} 
[2019-02-26 17:47:25,649: INFO]: [^---Recovery]: active offsets at start of reading:
┌Reading Starts At - Active───────────┬───────────┬────────┐
│ topic                               │ partition │ offset │
├─────────────────────────────────────┼───────────┼────────┤
│ example_app-example_table-changelog │ 1         │ -1     │
│ example_app-example_table-changelog │ 0         │ -1     │
└─────────────────────────────────────┴───────────┴────────┘ 
[2019-02-26 17:47:26,199: INFO]: [^---Recovery]: standby offsets at start of reading:
┌Reading Starts At - Standby──────────┬───────────┬────────┐
│ topic                               │ partition │ offset │
├─────────────────────────────────────┼───────────┼────────┤
│ example_app-example_table-changelog │ 0         │ -1     │
└─────────────────────────────────────┴───────────┴────────┘ 
[2019-02-26 17:47:26,652: INFO]: [^---Recovery]: Resuming flow... 
[2019-02-26 17:47:26,653: INFO]: [^---Recovery]: Recovery complete 
[2019-02-26 17:47:26,756: INFO]: [^---Recovery]: Restore complete! 
[2019-02-26 17:47:26,758: INFO]: [^---Recovery]: Seek stream partitions to committed offsets. 
[2019-02-26 17:47:27,596: INFO]: [^---Recovery]: Worker ready

@ask
Copy link
Contributor

ask commented Feb 28, 2019

Currently we are using one data directory per worker:

$ faust -A example --datadir=data/worker1 worker -l info --without-web
$ faust -A example --datadir=data/worker2 worker -l info --without-web

We use one RocksDB file for every changelog partition,
and since those partitions will have standbys, multiple workers will try to access them at the same time.

RocksDB does not allow reads/writes from multiple processes, so it crashes with this error.

An optimization that we could do is skipping starting a standby for partitions that we have a local database file for. It's not that easy to implement in practice: It would have to take into account how far behind the file is from the actual changelog so that recovery is not slow.

@ask ask changed the title Second worker crashes with RocksIOError Workers cannot share data directory (second worker crashes with RocksIOError) Feb 28, 2019
@ask ask added this to the 1.6 milestone Feb 28, 2019
@Ed-von-Schleck
Copy link
Author

I see. Thanks for the quick response! I believe with better documentation, or with a unique data directories per worker as a default, I'd regard my issue as resolved – though I'd certainly welcome the optimization that is hinted at in your comment. Naively, I thought that this would be case already; maybe a sentence or two in the documentation could clear that up.

Thanks again!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants