Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TypeError: no default __reduce__ due to non-trivial __cinit__ #29

Closed
allrobot opened this issue Jun 7, 2024 · 4 comments
Closed

TypeError: no default __reduce__ due to non-trivial __cinit__ #29

allrobot opened this issue Jun 7, 2024 · 4 comments

Comments

@allrobot
Copy link

allrobot commented Jun 7, 2024

https://aiomultiprocess.omnilib.dev/en/latest/guide.html#using-uvloop

code:

async def fetch_detail(url: str, pool: aiomultiprocess.Pool):
    async with aiohttp.ClientSession() .get(url) as response:
        html = await response.text()
        return html

async def aiomultiprocess_main():
    main_url = "http://am.adianshi.com:6805/"
    async with aiomultiprocess.Pool(loop_initializer=winloop.new_event_loop()) as pool:
        task = pool.apply(fetch_detail, args=(main_url, pool))
        html=await task
        print(html)

if __name__ == "__main__":
    asyncio.run(aiomultiprocess_main())

It report error:

C:\ProgramData\anaconda3\envs\python311\python.exe C:\Users\Administrator\Personal_scripts\pythonProject\temp.py 
Traceback (most recent call last):
  File "C:\Users\Administrator\Personal_scripts\pythonProject\temp.py", line 60, in <module>
    asyncio.run(aiomultiprocess_main())
  File "C:\ProgramData\anaconda3\envs\python311\Lib\asyncio\runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\ProgramData\anaconda3\envs\python311\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\ProgramData\anaconda3\envs\python311\Lib\asyncio\base_events.py", line 654, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "C:\Users\Administrator\Personal_scripts\pythonProject\temp.py", line 54, in aiomultiprocess_main
    async with aiomultiprocess.Pool(loop_initializer=winloop.new_event_loop()) as pool:
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\ProgramData\anaconda3\envs\python311\Lib\site-packages\aiomultiprocess\pool.py", line 186, in __init__
    self.init()
  File "C:\ProgramData\anaconda3\envs\python311\Lib\site-packages\aiomultiprocess\pool.py", line 214, in init
    self.processes[self.create_worker(qid)] = qid
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\ProgramData\anaconda3\envs\python311\Lib\site-packages\aiomultiprocess\pool.py", line 261, in create_worker
    process.start()
  File "C:\ProgramData\anaconda3\envs\python311\Lib\site-packages\aiomultiprocess\core.py", line 156, in start
    return self.aio_process.start()
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\ProgramData\anaconda3\envs\python311\Lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
                  ^^^^^^^^^^^^^^^^^
  File "C:\ProgramData\anaconda3\envs\python311\Lib\multiprocessing\context.py", line 336, in _Popen
    return Popen(process_obj)
           ^^^^^^^^^^^^^^^^^^
  File "C:\ProgramData\anaconda3\envs\python311\Lib\multiprocessing\popen_spawn_win32.py", line 95, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\ProgramData\anaconda3\envs\python311\Lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "<stringsource>", line 2, in winloop.loop.Loop.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__
@Vizonex
Copy link
Owner

Vizonex commented Jun 8, 2024

@allrobot try running it with winloop.run() and see if it throws the same errors at you.

@allrobot
Copy link
Author

allrobot commented Jun 9, 2024

https://github.com/Vizonex/Winloop#making-pull-requests

run code, it will report:

C:\ProgramData\anaconda3\envs\python311\python.exe "C:/Program Files/JetBrains/PyCharm Community Edition 2024.1/plugins/python-ce/helpers/pycharm/_jb_pytest_runner.py" --path C:\Users\Administrator\Personal_scripts\pythonProject\test.py 
Testing started at 下午3:28 ...
Launching pytest with arguments C:\Users\Administrator\Personal_scripts\pythonProject\test.py --no-header --no-summary -q in C:\Users\Administrator\Personal_scripts\pythonProject

============================= test session starts =============================
collecting ... collected 2 items

test.py::TestAioHTTP::test_aiohttp_basic_1 
test.py::TestAioHTTP::test_aiohttp_graceful_shutdown 

======================== 2 failed, 1 warning in 2.32s =========================
FAILED                        [ 50%]
test.py:22 (TestAioHTTP.test_aiohttp_basic_1)
self = <test.TestAioHTTP testMethod=test_aiohttp_basic_1>

    def test_aiohttp_basic_1(self):
        PAYLOAD = '<h1>It Works!</h1>' * 10000
    
        async def on_request(request):
            return aiohttp.web.Response(text=PAYLOAD)
    
        asyncio.set_event_loop(self.loop)
        app = aiohttp.web.Application()
        app.router.add_get('/', on_request)
    
        runner = aiohttp.web.AppRunner(app)
        self.loop.run_until_complete(runner.setup())
        site = aiohttp.web.TCPSite(runner, '0.0.0.0', '10000')
        self.loop.run_until_complete(site.start())
        port = site._server.sockets[0].getsockname()[1]
    
        async def test():
            # Make sure we're using the correct event loop.
            self.assertIs(asyncio.get_event_loop(), self.loop)
    
            for addr in (('localhost', port),
                         ('127.0.0.1', port)):
                async with aiohttp.ClientSession() as client:
                    async with client.get('http://{}:{}'.format(*addr)) as r:
                        self.assertEqual(r.status, 200)
                        result = await r.text()
                        self.assertEqual(result, PAYLOAD)
    
>       self.loop.run_until_complete(test())

test.py:51: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\ProgramData\anaconda3\envs\python311\Lib\asyncio\base_events.py:654: in run_until_complete
    return future.result()
test.py:46: in test
    async with client.get('http://{}:{}'.format(*addr)) as r:
C:\ProgramData\anaconda3\envs\python311\Lib\site-packages\aiohttp\client.py:1197: in __aenter__
    self._resp = await self._coro
C:\ProgramData\anaconda3\envs\python311\Lib\site-packages\aiohttp\client.py:608: in _request
    await resp.start(conn)
C:\ProgramData\anaconda3\envs\python311\Lib\site-packages\aiohttp\client_reqrep.py:976: in start
    message, payload = await protocol.read()  # type: ignore[union-attr]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <aiohttp.client_proto.ResponseHandler object at 0x00000240EE359B70>

    async def read(self) -> _T:
        if not self._buffer and not self._eof:
            assert not self._waiter
            self._waiter = self._loop.create_future()
            try:
>               await self._waiter
E               aiohttp.client_exceptions.ServerDisconnectedError: Server disconnected

C:\ProgramData\anaconda3\envs\python311\Lib\site-packages\aiohttp\streams.py:640: ServerDisconnectedError
FAILED              [100%]
test.py:53 (TestAioHTTP.test_aiohttp_graceful_shutdown)
self = <test.TestAioHTTP testMethod=test_aiohttp_graceful_shutdown>

    def test_aiohttp_graceful_shutdown(self):
        async def websocket_handler(request):
            ws = aiohttp.web.WebSocketResponse()
            await ws.prepare(request)
            request.app['websockets'].add(ws)
            try:
                async for msg in ws:
                    await ws.send_str(msg.data)
            finally:
                request.app['websockets'].discard(ws)
            return ws
    
        async def on_shutdown(app):
            for ws in set(app['websockets']):
                await ws.close(
                    code=aiohttp.WSCloseCode.GOING_AWAY,
                    message='Server shutdown')
    
        asyncio.set_event_loop(self.loop)
        app = aiohttp.web.Application()
        app.router.add_get('/', websocket_handler)
        app.on_shutdown.append(on_shutdown)
        app['websockets'] = weakref.WeakSet()
    
        runner = aiohttp.web.AppRunner(app)
        self.loop.run_until_complete(runner.setup())
        site = aiohttp.web.TCPSite(runner, '0.0.0.0', '10000')
>       self.loop.run_until_complete(site.start())

test.py:81: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\ProgramData\anaconda3\envs\python311\Lib\asyncio\base_events.py:654: in run_until_complete
    return future.result()
C:\ProgramData\anaconda3\envs\python311\Lib\site-packages\aiohttp\web_runner.py:119: in start
    self._server = await loop.create_server(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <ProactorEventLoop running=False closed=False debug=False>
protocol_factory = <aiohttp.web_server.Server object at 0x00000240EE86EFD0>
host = '0.0.0.0', port = '10000'

    async def create_server(
            self, protocol_factory, host=None, port=None,
            *,
            family=socket.AF_UNSPEC,
            flags=socket.AI_PASSIVE,
            sock=None,
            backlog=100,
            ssl=None,
            reuse_address=None,
            reuse_port=None,
            ssl_handshake_timeout=None,
            ssl_shutdown_timeout=None,
            start_serving=True):
        """Create a TCP server.
    
        The host parameter can be a string, in that case the TCP server is
        bound to host and port.
    
        The host parameter can also be a sequence of strings and in that case
        the TCP server is bound to all hosts of the sequence. If a host
        appears multiple times (possibly indirectly e.g. when hostnames
        resolve to the same IP address), the server is only bound once to that
        host.
    
        Return a Server object which can be used to stop the service.
    
        This method is a coroutine.
        """
        if isinstance(ssl, bool):
            raise TypeError('ssl argument must be an SSLContext or None')
    
        if ssl_handshake_timeout is not None and ssl is None:
            raise ValueError(
                'ssl_handshake_timeout is only meaningful with ssl')
    
        if ssl_shutdown_timeout is not None and ssl is None:
            raise ValueError(
                'ssl_shutdown_timeout is only meaningful with ssl')
    
        if sock is not None:
            _check_ssl_socket(sock)
    
        if host is not None or port is not None:
            if sock is not None:
                raise ValueError(
                    'host/port and sock can not be specified at the same time')
    
            if reuse_address is None:
                reuse_address = os.name == "posix" and sys.platform != "cygwin"
            sockets = []
            if host == '':
                hosts = [None]
            elif (isinstance(host, str) or
                  not isinstance(host, collections.abc.Iterable)):
                hosts = [host]
            else:
                hosts = host
    
            fs = [self._create_server_getaddrinfo(host, port, family=family,
                                                  flags=flags)
                  for host in hosts]
            infos = await tasks.gather(*fs)
            infos = set(itertools.chain.from_iterable(infos))
    
            completed = False
            try:
                for res in infos:
                    af, socktype, proto, canonname, sa = res
                    try:
                        sock = socket.socket(af, socktype, proto)
                    except socket.error:
                        # Assume it's a bad family/type/protocol combination.
                        if self._debug:
                            logger.warning('create_server() failed to create '
                                           'socket.socket(%r, %r, %r)',
                                           af, socktype, proto, exc_info=True)
                        continue
                    sockets.append(sock)
                    if reuse_address:
                        sock.setsockopt(
                            socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
                    if reuse_port:
                        _set_reuseport(sock)
                    # Disable IPv4/IPv6 dual stack support (enabled by
                    # default on Linux) which makes a single socket
                    # listen on both address families.
                    if (_HAS_IPv6 and
                            af == socket.AF_INET6 and
                            hasattr(socket, 'IPPROTO_IPV6')):
                        sock.setsockopt(socket.IPPROTO_IPV6,
                                        socket.IPV6_V6ONLY,
                                        True)
                    try:
                        sock.bind(sa)
                    except OSError as err:
                        msg = ('error while attempting '
                               'to bind on address %r: %s'
                               % (sa, err.strerror.lower()))
                        if err.errno == errno.EADDRNOTAVAIL:
                            # Assume the family is not enabled (bpo-30945)
                            sockets.pop()
                            sock.close()
                            if self._debug:
                                logger.warning(msg)
                            continue
>                       raise OSError(err.errno, msg) from None
E                       OSError: [Errno 10048] error while attempting to bind on address ('0.0.0.0', 10000): Typically each socket address (protocol/network address/port) is only allowed to be used once.

C:\ProgramData\anaconda3\envs\python311\Lib\asyncio\base_events.py:1536: OSError

https://github.com/Vizonex/Winloop#how-to-use-winloop-with-fastapi

it report

C:\ProgramData\anaconda3\envs\python311\python.exe "C:/Program Files/JetBrains/PyCharm Community Edition 2024.1/plugins/python-ce/helpers/pycharm/_jb_pytest_runner.py" --path C:\Users\Administrator\Personal_scripts\pythonProject\test.py 
Testing started at 下午3:26 ...
Launching pytest with arguments C:\Users\Administrator\Personal_scripts\pythonProject\test.py --no-header --no-summary -q in C:\Users\Administrator\Personal_scripts\pythonProject

============================= test session starts =============================
collecting ... collected 2 items

test.py::test_get_request SKIPPED (async def function and no async
plugin installed (see warnings))                                         [ 50%]
Skipped: async def function and no async plugin installed (see warnings)

test.py::test_dynamic_response PASSED                                    [100%]

================== 1 passed, 1 skipped, 4 warnings in 0.39s ===================

@Vizonex
Copy link
Owner

Vizonex commented Jul 6, 2024

@allrobot I added winloop to aiomultiprocess's testsuite as well so that hopefully any future issues can be resolved.

@Vizonex
Copy link
Owner

Vizonex commented Aug 20, 2024

I tested it and it works fine now so I am closing this issue up

@Vizonex Vizonex closed this as completed Aug 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants