Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Play with how a contrib module could look like #312

Closed
wants to merge 2 commits into from
Closed

Conversation

FynnBe
Copy link
Member

@FynnBe FynnBe commented Nov 9, 2022

as discussed today with @oeway we could 'compile' operator and workflow functions to use asyncio, etc...

With ast we can also validate (with useful error messages) user code wrt to other criteria, e.g. third party imports.

This example does not substitute an op call with another function atm.. (as discussed) But this is of course also possible.

Note that compiled_ops.py and compiled_wfs.py are 100% generated.
Dependencies for the compile.py script are: Python 3.9 (for ast.unparse and black)

@FynnBe
Copy link
Member Author

FynnBe commented Nov 10, 2022

one note for @oeway :
I think it is difficult to parallelize ops in a workflow in this 'compile' step while ensuring its correctness. Mostly because of in-place manipulation.
However, we can provide example workflows using concurrent.futures.ThreadPoolExecutor and ProcessPoolExecutor and then--if needed--detect and adapt their use in the 'compile' step. This would give the devs some control over parallelization and make it more explicit.
(We should probably provide a convention for number of workers, so we can adapt it to server envs, but this is much easier than guessing were and how to parallelize)

note for @constantinpape (to bring you up to speed, because you are probably wondering what the Halloween this is about):
We are evaluating keeping ops in the Python world (bioimageio.core and bioimageio.contrib) and move the workflow steps as a Python function to contrib as well. Then generating workflow RDFs (wo 'steps') from the docstrings to make the workflow discoverable.
From the previous design including 'steps' @oeway liked the ability to detect which steps can be run in parallel and that steps can potentially be executed in different processes/machines and thsu also different environments.
To keep this ability, while being able to 'simply code a workflow function' I proposed this kind of 'compile' step where we detect use of operator functions and for example wrap them with a bioimageio.core.run_operator call.

The current state of this draft (which should maybe rather live in a new bioimageio.contrib repo...) changes ops to be awaitable and adapts the workflow accordingly. This (and smth like bioimageio.core.run_operator) should enable us to run workflows in a server/hypha/triton environment efficiently while accepting relatively simple workflow functions to contrib.

FAQ ;-):

  1. What if we want to use ops in a workflow that are not conda env compatible?
  • We can allow to wrap them manually with bioimageio.core.run_operator("op id", *args, **kwargs), but most cases could benefit from direct op calls for easier debugging and better IDE support (typing, etc)
    ...

@FynnBe
Copy link
Member Author

FynnBe commented Nov 10, 2022

Dependencies for the compile.py script are: Python 3.9 (for ast.unparse and black)

this could of course simply run in CI or we don't unparse (and black) and simple compile the updated ast. I just thought (for debugging) it's nice to see the changed Python code as code and not just ast.dump printout.

Reasons not to use unparse are noted in the docs: https://docs.python.org/3/library/ast.html#ast.unparse

  • Warning: The produced code string will not necessarily be equal to the original code that generated the ast.AST object (without any compiler optimizations, such as constant tuples/frozensets).
  • Warning: Trying to unparse a highly complex expression would result with RecursionError.

@FynnBe
Copy link
Member Author

FynnBe commented Nov 10, 2022

mixing concurrent.futures and asyncio might be unnececarily complicated..?
It is possible to run cpu bound tasks in different processes while using asyncio with
https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
But to me concurrent.futures seems much easier to work with (also because I have not worked with asyncio at all before), hence the question:

@oeway can we orchastrate the workflow execution using concurrent.futures only instead of asyncio? Or why do we need asyncio?

Considering concurrent.futures ops and workflows could accept (optional) thread/process_executors through which we could control execution!?
If we want to go across different python env processes etc I don't think asyncio nor concurrent.futures are sufficient on their own. That's why in tiktorch @m-novikov wrote an rpc module: https://github.com/ilastik/tiktorch/tree/main/tiktorch/rpc
This is obviously where hypha/imjoy-rpc comes in now. How does this look like in practice? For any non-primitive io we then create a custom codec? Could you draft an example of how we'd have to adapt an 'operator' Python function call to be executed elsewhere via hypha/imjoy-rpc? I am still not clear on the requirements we have for the worklfows and operators.

@oeway
Copy link
Contributor

oeway commented Nov 15, 2022

@FynnBe

mixing concurrent.futures and asyncio might be unnececarily complicated..?
They are two different ways to achieve concurrency.
No need to mixing the two, for our case, we can just go for asyncio.

@oeway can we orchastrate the workflow execution using concurrent.futures only instead of asyncio? Or why do we need asyncio?

Nope, concurrent.futures won't work for the browser where we don't support multi-threading yet, and asyncio is more efficient.

Considering concurrent.futures ops and workflows could accept (optional) thread/process_executors through which we could control execution!?

I think it's much harder than that, different executors may not allow direct data reference, sometimes, it requires them to be pickable for example.

If we want to go across different python env processes etc I don't think asyncio nor concurrent.futures are sufficient on their own.

That's possible if we separate the execution and the workflow orchestration. Let's say we allow distributed workers to run ops, then we should go for asyncio only, since we won't run cpu-bound tasks (so no threading needed, which is perfect for the browser too).

That's why in tiktorch @m-novikov wrote an rpc module: https://github.com/ilastik/tiktorch/tree/main/tiktorch/rpc
This is obviously where hypha/imjoy-rpc comes in now. How does this look like in practice? For any non-primitive io we then create a custom codec? Could you draft an example of how we'd have to adapt an 'operator' Python function call to be executed elsewhere via hypha/imjoy-rpc? I am still not clear on the requirements we have for the worklfows and operators.

Very good point, in fact, you reminded me that the imjoy-rpc is exactly for this purpose. In fact, we even support language-agnostic (to be exact, we have only python and javascript supported so far).

In the imjoy/hypha realm, we group the ops into service, if we think the ops are just a bunch of methods, then the service is basically an instance of a class with properties and member functions.

Let's say, if we have the bioimageio.core contrib ops, we can basically export them as a service in a worker:

import asyncio
from imjoy_rpc.hypha import connect_to_server
from bioimageio.core.contrib import all_ops

async def start_server(server_url):
    server = await connect_to_server({"server_url": server_url})

    def hello(name):
        print("Hello " + name)
        return "Hello " + name

    await server.register_service({
        "name": "BioImageIO Contrib Module",
        "id": "bioimageio-contrib",
        "config": {
            "visibility": "public",
            "run_in_executor": True # This will make sure all the sync functions run in a separate thread
        },
        "hello": hello,
        "ops": all_ops
    })

    print(f"hello world service regisered at workspace: {server.config.workspace}")
    print(f"Test it with the http proxy: {server_url}/{server.config.workspace}/services/bioimageio-contrib/hello?name=John")

if __name__ == "__main__":
    server_url = "http://localhost:9000"
    loop = asyncio.get_event_loop()
    loop.create_task(start_server(server_url))
    loop.run_forever()

Now to create a workflow with the contrib ops, we can run it like this:

import asyncio
from imjoy_rpc.hypha import connect_to_server

async def main():
    server = await connect_to_server({"server_url":  "http://localhost:9000"})
    # get an existing service
    # since bioimageio-contrib is registered as a public service, we can access it with only the name "bioimageio-contrib"
    contrib = await server.get_service("bioimageio-contrib")
    ret = await contrib.hello("John")
    print(ret)
    
    # sequential execution
    result1 = await contrib.ops.run_model("affable-shark-1", ...)
    result2 = await contrib.ops.run_model("affable-shark-2", ...)

    # parallel execution
    p1 = contrib.ops.run_model("affable-shark-1", ...)
    p2 = contrib.ops.run_model("affable-shark-2", ...)
    result1, result2 = await asyncio.gather(p1, p2)

    # Interact with the BioEngine [working code]
    triton = await server.get_service("triton-client")
    results = await triton.execute(inputs=[image, {'diameter': 30}], model_name='cellpose-python', decode_json=True)
    mask = results['mask'][0]
    ...

asyncio.run(main())

Here you can find more details for trying the hypha server: https://ha.amun.ai/ and live demos: https://slides.imjoy.io/?slides=https://raw.githubusercontent.com/oeway/slides/master/2022/i2k-2022-hypha-introduction.md#/7

For custom io, here you can find an example on how we can encode the itkImage for imjoy-rpc: https://github.com/InsightSoftwareConsortium/itkwidgets/blob/main/itkwidgets/imjoy.py#L48-L49

@oeway
Copy link
Contributor

oeway commented Nov 15, 2022

FYI: If we going for the route of hypha/imjoy-rpc, we don't need much effort to develop it, since we have it already supported in the bioimage.io website and also the BioEngine ;)

We need however, to standardize it and create documentation for it.

@FynnBe
Copy link
Member Author

FynnBe commented Nov 15, 2022

great example snippets to play with :-)

@FynnBe
Copy link
Member Author

FynnBe commented Nov 16, 2022

when trying to start a serve I get this error:
ERROR:asyncio:Task exception was never retrieved
future: <Task finished name='Task-1' coro=<start_server() done, defined at C:\repos\bioimage-io\python-bioimage-io\bioimageio\core\contrib\utils\_server.py:8> exception=Exception("Failed to connect to ws://localhost:9000/ws: Multiple exceptions: [Errno 10061] Connect call failed ('::1', 9000, 0, 0), [Errno 10061] Connect call failed ('127.0.0.1', 9000)")>
Traceback (most recent call last):
  File "C:\conda\envs\selfseg\lib\site-packages\imjoy_rpc\hypha\websocket_client.py", line 72, in open
    self._websocket = await asyncio.wait_for(
  File "C:\conda\envs\selfseg\lib\asyncio\tasks.py", line 494, in wait_for
    return fut.result()
  File "C:\conda\envs\selfseg\lib\asyncio\tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "C:\conda\envs\selfseg\lib\site-packages\websockets\legacy\client.py", line 659, in __await_impl_timeout__
    return await asyncio.wait_for(self.__await_impl__(), self.open_timeout)
  File "C:\conda\envs\selfseg\lib\asyncio\tasks.py", line 494, in wait_for
    return fut.result()
  File "C:\conda\envs\selfseg\lib\site-packages\websockets\legacy\client.py", line 663, in __await_impl__
    _transport, _protocol = await self._create_connection()
  File "C:\conda\envs\selfseg\lib\asyncio\base_events.py", line 1033, in create_connection
    raise OSError('Multiple exceptions: {}'.format(
OSError: Multiple exceptions: [Errno 10061] Connect call failed ('::1', 9000, 0, 0), [Errno 10061] Connect call failed ('127.0.0.1', 9000)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\repos\bioimage-io\python-bioimage-io\bioimageio\core\contrib\utils\_server.py", line 9, in start_server
    server = await connect_to_server({"server_url": server_url})
  File "C:\conda\envs\selfseg\lib\site-packages\imjoy_rpc\hypha\websocket_client.py", line 149, in connect_to_server
    await connection.open()
  File "C:\conda\envs\selfseg\lib\site-packages\imjoy_rpc\hypha\websocket_client.py", line 82, in open
    raise Exception(
Exception: Failed to connect to ws://localhost:9000/ws: Multiple exceptions: [Errno 10061] Connect call failed ('::1', 9000, 0, 0), [Errno 10061] Connect call failed ('127.0.0.1', 9000)
Any idea what's wrong?

@FynnBe
Copy link
Member Author

FynnBe commented Nov 16, 2022

async def start_server(server_url):
    server = await connect_to_server({"server_url": server_url})

is start_server not starting a server, but only starting the service? how do I start the server?

@FynnBe
Copy link
Member Author

FynnBe commented Nov 16, 2022

async def start_server(server_url):
    server = await connect_to_server({"server_url": server_url})

is start_server not starting a server, but only starting the service? how do I start the server?

I guess I found how: https://github.com/imjoy-team/hypha#usage

@FynnBe
Copy link
Member Author

FynnBe commented Nov 16, 2022

I guess I found how: https://github.com/imjoy-team/hypha#usage

with the hypha server running I now get this error when trying to connect
python -m hypha.server --host=0.0.0.0 --port=9000 --enable-server-apps
WARNING:auth:JWT_SECRET is not defined
INFO:     Started server process [260]
INFO:     Waiting for application startup.
INFO:workspace:New client registered: public/workspace-manager
INFO:workspace:Creating RPC for client public/workspace-manager
INFO:workspace:Client workspace-manager updated (services: ['workspace-manager:built-in', 'workspace-manager:default'])
INFO:workspace:Creating RPC for client public/workspace-manager-52PmPdHokskpL9LqaTbdiv
INFO:workspace:Reusing existing workspace-manager client: workspace-manager
INFO:workspace:Registering service server-apps to public
INFO:workspace:Client workspace-manager updated (services: ['workspace-manager:built-in', 'workspace-manager:default', 'workspace-manager:server-apps'])
INFO:workspace:Registering service browser-runner-5uqS2hNoNwDxjoVPyLECND to public
INFO:workspace:Client workspace-manager updated (services: ['workspace-manager:built-in', 'workspace-manager:default', 'workspace-manager:server-apps', 'workspace-manager:browser-runner-5uqS2hNoNwDxjoVPyLECND'])
INFO:workspace:Registering service browser-runner-cgZvwfqz7z2jBuxWTGsvHU to public
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:9000 (Press CTRL+C to quit)
INFO:workspace:Client workspace-manager updated (services: ['workspace-manager:built-in', 'workspace-manager:default', 'workspace-manager:server-apps', 'workspace-manager:browser-runner-5uqS2hNoNwDxjoVPyLECND', 'workspace-manager:browser-runner-cgZvwfqz7z2jBuxWTGsvHU'])
INFO:     127.0.0.1:54423 - "GET / HTTP/1.1" 200 OK
INFO:     127.0.0.1:54423 - "GET /favicon.ico HTTP/1.1" 404 Not Found
INFO:websocket-server:Anonymized User connected: 5g4R6QkbPDvZAiv2qUZTHZ
INFO:workspace:New client registered: 5g4R6QkbPDvZAiv2qUZTHZ/workspace-manager
INFO:workspace:Creating RPC for client 5g4R6QkbPDvZAiv2qUZTHZ/workspace-manager
INFO:workspace:Client workspace-manager updated (services: ['workspace-manager:built-in', 'workspace-manager:default'])
INFO:workspace:Creating RPC for client 5g4R6QkbPDvZAiv2qUZTHZ/workspace-manager-HQkEiS7ivbP7ry3hnUyk5j
INFO:workspace:Reusing existing workspace-manager client: workspace-manager
INFO:     ('127.0.0.1', 54468) - "WebSocket /ws?client_id=L5BQXkcpv2QeeXk7AXvcYo" [accepted]
INFO:     connection closed
INFO:workspace:New client registered: 5g4R6QkbPDvZAiv2qUZTHZ/L5BQXkcpv2QeeXk7AXvcYo
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/uvicorn/protocols/websockets/websockets_impl.py", line 230, in run_asgi
    result = await self.app(self.scope, self.asgi_receive, self.asgi_send)
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
    return await self.app(scope, receive, send)
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/fastapi/applications.py", line 270, in __call__
    await super().__call__(scope, receive, send)
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/starlette/applications.py", line 124, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/starlette/middleware/errors.py", line 149, in __call__
    await self.app(scope, receive, send)
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/starlette/middleware/cors.py", line 76, in __call__
    await self.app(scope, receive, send)
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/hypha/utils.py", line 341, in __call__
    await self.app(scope, receive, send)
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
    raise exc
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
    await self.app(scope, receive, sender)
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/starlette/routing.py", line 706, in __call__
    await route.handle(scope, receive, send)
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/starlette/routing.py", line 341, in handle
    await self.app(scope, receive, send)
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/starlette/routing.py", line 82, in app
    await func(session)
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/fastapi/routing.py", line 287, in app
    await dependant.call(**values)
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/hypha/websocket.py", line 182, in websocket_endpoint
    data = await websocket.receive_bytes()
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/starlette/websockets.py", line 121, in receive_bytes
    message = await self.receive()
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/starlette/websockets.py", line 45, in receive
    message = await self._receive()
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/uvicorn/protocols/websockets/websockets_impl.py", line 341, in asgi_receive
    data = await self.recv()
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/websockets/legacy/protocol.py", line 551, in recv
    await asyncio.wait(
  File "/home/fynn/conda/envs/hypha/lib/python3.8/asyncio/tasks.py", line 424, in wait
    fs = {ensure_future(f, loop=loop) for f in set(fs)}
  File "/home/fynn/conda/envs/hypha/lib/python3.8/asyncio/tasks.py", line 424, in <setcomp>
    fs = {ensure_future(f, loop=loop) for f in set(fs)}
  File "/home/fynn/conda/envs/hypha/lib/python3.8/asyncio/tasks.py", line 684, in ensure_future
    raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
TypeError: An asyncio.Future, a coroutine or an awaitable is required

nvm: you just forgot an async in your example -> async def hello(...

@oeway
Copy link
Contributor

oeway commented Nov 16, 2022

@FynnBe I just tried it with a new conda env and I cannot reproduce the issue, it all works for me. Could you please try it again by following the example here: https://ha.amun.ai/ (Note: You don't need the server-apps for now.)

conda create -n hypha python=3.8
conda activate hypha
pip install hypha

# Start the server
python3 -m hypha.server --host=0.0.0.0 --port=9000

Alternatively, you can use our public server by replacing "http://localhost:9000" into "https://ai.imjoy.io" (note: it's https), then you don't need to start your own server, but only the worker and the client.

Once you have the server running or changed the url to the public server, in another terminal, start the worker:

python3 start-worker.py

Now the workflow client:

python3 run-workflow.py

nvm: you just forgot an async in your example -> async def hello(...

Nope, you don't have to define function as async, it will convert it automatically. But when you call the function, you need to treat it as async function since it's a remote function, always.

@FynnBe
Copy link
Member Author

FynnBe commented Nov 16, 2022

Thanks @oeway !

Nope, you don't have to define function as async, it will convert it automatically. But when you call the function, you need to treat it as async function since it's a remote function, always.

Interesting, I did remove the async now and it did indeed work. Not sure what happened earlier. I'll report if it happens again!

@FynnBe
Copy link
Member Author

FynnBe commented Nov 16, 2022

I've been trying to transfer dask arrays/zarr arrays, so I wanted to add register_default_codecs() to your above example, but I get:

  File "bioimageio/core/contrib/client.py", line 50, in <module>
    register_default_codecs()
  File "C:\conda\envs\bio-core-rpc\lib\site-packages\imjoy_rpc\utils.py", line 347, in register_default_codecs
    api.registerCodec(
  File "C:\conda\envs\bio-core-rpc\lib\site-packages\imjoy_rpc\werkzeug\local.py", line 349, in __getattr__
    return getattr(self._get_current_object(), name)
  File "C:\conda\envs\bio-core-rpc\lib\site-packages\imjoy_rpc\__init__.py", line 48, in __getattr__
    return _rpc_context.api[attr]
KeyError: 'registerCodec'

Any idea what's going wrong?

@oeway
Copy link
Contributor

oeway commented Nov 16, 2022

I guess that's a bug, because we changed the function name to register_codec() and forgot to update the utils function for hypha. I think you can just do something like:

'''
api.register_codec(
{"name": "zarr-array", "type": zarr.Array, "encoder": encode_zarr_store}
)

'''

And you can find the encode_zarr_store function here: https://github.com/imjoy-team/imjoy-rpc/blob/645a7f1450fe3af65cd9746c6a2d08ed05d2bf83/python/imjoy_rpc/hypha/utils.py#L238

@FynnBe
Copy link
Member Author

FynnBe commented Nov 17, 2022

still doesn't work for me though...

api.register_codec(

>>> from imjoy_rpc import api
>>> api.register_codec()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/imjoy_rpc/werkzeug/local.py", line 349, in __getattr__
    return getattr(self._get_current_object(), name)
  File "/home/fynn/conda/envs/hypha/lib/python3.8/site-packages/imjoy_rpc/__init__.py", line 48, in __getattr__
    return _rpc_context.api[attr]
KeyError: 'register_codec'
conda list
# packages in environment at /home/fynn/conda/envs/hypha:
#
# Name                    Version                   Build  Channel
_libgcc_mutex             0.1                 conda_forge    conda-forge
_openmp_mutex             4.5                       2_gnu    conda-forge
aiobotocore               2.4.0                    pypi_0    pypi
aiofiles                  22.1.0                   pypi_0    pypi
aiohttp                   3.8.3                    pypi_0    pypi
aioitertools              0.11.0                   pypi_0    pypi
aioredis                  2.0.1                    pypi_0    pypi
aiosignal                 1.3.1                    pypi_0    pypi
anyio                     3.6.2                    pypi_0    pypi
async-timeout             4.0.2                    pypi_0    pypi
attrs                     22.1.0                   pypi_0    pypi
base58                    2.1.1                    pypi_0    pypi
botocore                  1.27.59                  pypi_0    pypi
bzip2                     1.0.8                h7f98852_4    conda-forge
ca-certificates           2022.9.24            ha878542_0    conda-forge
certifi                   2022.9.24                pypi_0    pypi
charset-normalizer        2.1.1                    pypi_0    pypi
click                     8.1.3                    pypi_0    pypi
deprecated                1.2.13                   pypi_0    pypi
dnspython                 2.2.1                    pypi_0    pypi
ecdsa                     0.18.0                   pypi_0    pypi
email-validator           1.3.0                    pypi_0    pypi
fastapi                   0.87.0                   pypi_0    pypi
frozenlist                1.3.3                    pypi_0    pypi
greenlet                  1.1.3                    pypi_0    pypi
h11                       0.12.0                   pypi_0    pypi
httpcore                  0.15.0                   pypi_0    pypi
httpx                     0.23.0                   pypi_0    pypi
hypha                     0.15.8                   pypi_0    pypi
idna                      3.4                      pypi_0    pypi
imjoy-rpc                 0.5.15                   pypi_0    pypi
jinja2                    3.1.2                    pypi_0    pypi
jmespath                  1.0.1                    pypi_0    pypi
ld_impl_linux-64          2.39                 hc81fddc_0    conda-forge
libffi                    3.4.2                h7f98852_5    conda-forge
libgcc-ng                 12.2.0              h65d4601_19    conda-forge
libgomp                   12.2.0              h65d4601_19    conda-forge
libnsl                    2.0.0                h7f98852_0    conda-forge
libsqlite                 3.39.4               h753d276_0    conda-forge
libuuid                   2.32.1            h7f98852_1000    conda-forge
libzlib                   1.2.13               h166bdaf_4    conda-forge
lxml                      4.9.1                    pypi_0    pypi
markupsafe                2.1.1                    pypi_0    pypi
msgpack                   1.0.4                    pypi_0    pypi
multidict                 6.0.2                    pypi_0    pypi
ncurses                   6.3                  h27087fc_1    conda-forge
numpy                     1.23.4                   pypi_0    pypi
openssl                   3.0.7                h166bdaf_0    conda-forge
packaging                 21.3                     pypi_0    pypi
pip                       22.3.1             pyhd8ed1ab_0    conda-forge
playwright                1.27.1                   pypi_0    pypi
psutil                    5.9.4                    pypi_0    pypi
pyasn1                    0.4.8                    pypi_0    pypi
pydantic                  1.10.2                   pypi_0    pypi
pyee                      8.1.0                    pypi_0    pypi
pymultihash               0.8.2                    pypi_0    pypi
pyotritonclient           0.2.4                    pypi_0    pypi
pyparsing                 3.0.9                    pypi_0    pypi
python                    3.8.13          ha86cf86_0_cpython    conda-forge
python-dateutil           2.8.2                    pypi_0    pypi
python-dotenv             0.21.0                   pypi_0    pypi
python-engineio           4.1.0                    pypi_0    pypi
python-jose               3.3.0                    pypi_0    pypi
python-rapidjson          1.9                      pypi_0    pypi
pyyaml                    6.0                      pypi_0    pypi
readline                  8.1.2                h0f457ee_0    conda-forge
redis                     4.3.4                    pypi_0    pypi
redislite                 6.2.805324               pypi_0    pypi
requests                  2.28.1                   pypi_0    pypi
rfc3986                   1.5.0                    pypi_0    pypi
rsa                       4.9                      pypi_0    pypi
setuptools                65.5.1             pyhd8ed1ab_0    conda-forge
shortuuid                 1.0.11                   pypi_0    pypi
six                       1.16.0                   pypi_0    pypi
sniffio                   1.3.0                    pypi_0    pypi
sqlite                    3.39.4               h4ff8645_0    conda-forge
starlette                 0.21.0                   pypi_0    pypi
tk                        8.6.12               h27826a3_0    conda-forge
typing-extensions         4.4.0                    pypi_0    pypi
urllib3                   1.26.12                  pypi_0    pypi
uvicorn                   0.19.0                   pypi_0    pypi
websockets                10.4                     pypi_0    pypi
wheel                     0.38.4             pyhd8ed1ab_0    conda-forge
wrapt                     1.14.1                   pypi_0    pypi
xz                        5.2.6                h166bdaf_0    conda-forge
yarl                      1.8.1                    pypi_0    pypi
(another) conda list
# packages in environment at C:\conda\envs\bio-core-rpc:
#
# Name                    Version                   Build  Channel
asciitree                 0.3.3                      py_2    conda-forge
attrs                     22.1.0             pyh71513ae_1    conda-forge
beautifulsoup4            4.11.1             pyha770c72_0    conda-forge
bioimageio-core           0.5.7                    pypi_0    pypi
bioimageio-spec           0.4.8.post1              pypi_0    pypi
black                     22.10.0          py37h03978a9_1    conda-forge
brotlipy                  0.7.0           py37hcc03f2d_1004    conda-forge
bzip2                     1.0.8                h8ffe710_4    conda-forge
ca-certificates           2022.9.24            h5b45459_0    conda-forge
certifi                   2022.9.24          pyhd8ed1ab_0    conda-forge
cffi                      1.15.1           py37ha95fbe2_1    conda-forge
chardet                   5.0.0            py37h03978a9_0    conda-forge
charset-normalizer        2.1.1              pyhd8ed1ab_0    conda-forge
click                     8.1.3            py37h03978a9_0    conda-forge
colorama                  0.4.6              pyhd8ed1ab_0    conda-forge
commonmark                0.9.1                      py_0    conda-forge
conda                     22.9.0           py37h03978a9_1    conda-forge
conda-build               3.22.0           py37h03978a9_2    conda-forge
conda-package-handling    1.9.0            py37hf957a29_0    conda-forge
cryptography              38.0.2           py37h953a470_1    conda-forge
dataclasses               0.8                pyhc8e2a94_3    conda-forge
entrypoints               0.4                pyhd8ed1ab_0    conda-forge
exceptiongroup            1.0.4              pyhd8ed1ab_0    conda-forge
fasteners                 0.17.3             pyhd8ed1ab_0    conda-forge
filelock                  3.8.0              pyhd8ed1ab_0    conda-forge
freetype                  2.12.1               h546665d_0    conda-forge
future                    0.18.2           py37h03978a9_5    conda-forge
glob2                     0.7                        py_0    conda-forge
h5py                      2.10.0          nompi_py37he280515_106    conda-forge
hdf5                      1.10.6          nompi_he0bbb20_101    conda-forge
idna                      3.4                pyhd8ed1ab_0    conda-forge
imageio                   2.22.0             pyhfa7a67d_0    conda-forge
imjoy-rpc                 0.5.16                   pypi_0    pypi
importlib-metadata        4.11.4           py37h03978a9_0    conda-forge
importlib_metadata        4.11.4               hd8ed1ab_0    conda-forge
iniconfig                 1.1.1              pyh9f0ad1d_0    conda-forge
intel-openmp              2022.1.0          h57928b3_3787    conda-forge
jinja2                    3.1.2              pyhd8ed1ab_1    conda-forge
jpeg                      9e                   h8ffe710_2    conda-forge
lcms2                     2.14                 h90d422f_0    conda-forge
lerc                      4.0.0                h63175ca_0    conda-forge
libarchive                3.6.1                h27c7867_0    conda-forge
libblas                   3.9.0              16_win64_mkl    conda-forge
libcblas                  3.9.0              16_win64_mkl    conda-forge
libdeflate                1.14                 hcfcfb64_0    conda-forge
libiconv                  1.17                 h8ffe710_0    conda-forge
liblapack                 3.9.0              16_win64_mkl    conda-forge
liblief                   0.12.2               h63175ca_0    conda-forge
libpng                    1.6.38               h19919ed_0    conda-forge
libsqlite                 3.39.4               hcfcfb64_0    conda-forge
libtiff                   4.4.0                h8e97e67_4    conda-forge
libwebp-base              1.2.4                h8ffe710_0    conda-forge
libxcb                    1.13              hcd874cb_1004    conda-forge
libxml2                   2.10.3               hc3477c8_0    conda-forge
libzlib                   1.2.13               hcfcfb64_4    conda-forge
lz4-c                     1.9.3                h8ffe710_1    conda-forge
lzo                       2.10              he774522_1000    conda-forge
m2-msys2-runtime          2.5.0.17080.65c939c               3    conda-forge
m2-patch                  2.7.5                         2    conda-forge
m2w64-gcc-libgfortran     5.3.0                         6    conda-forge
m2w64-gcc-libs            5.3.0                         7    conda-forge
m2w64-gcc-libs-core       5.3.0                         7    conda-forge
m2w64-gmp                 6.1.0                         2    conda-forge
m2w64-libwinpthread-git   5.0.0.4634.697f757               2    conda-forge
markupsafe                2.1.1            py37hcc03f2d_1    conda-forge
marshmallow               3.19.0             pyhd8ed1ab_0    conda-forge
marshmallow-jsonschema    0.13.0             pyhd8ed1ab_0    conda-forge
marshmallow-union         0.1.15.post1       pyhd8ed1ab_0    conda-forge
menuinst                  1.4.19           py37h03978a9_0    conda-forge
mkl                       2022.1.0           h6a75c08_874    conda-forge
msgpack-python            1.0.4            py37h8c56517_0    conda-forge
msys2-conda-epoch         20160418                      1    conda-forge
mypy                      0.982            py37h51bd9d9_0    conda-forge
mypy_extensions           0.4.3            py37h03978a9_5    conda-forge
numcodecs                 0.10.2           py37hf2a7229_0    conda-forge
numpy                     1.21.6           py37h2830a78_0    conda-forge
openjpeg                  2.5.0                hc9384bd_1    conda-forge
openssl                   3.0.7                hcfcfb64_0    conda-forge
packaging                 21.3               pyhd8ed1ab_0    conda-forge
pandas                    1.3.5            py37h9386db6_0    conda-forge
pathspec                  0.10.2             pyhd8ed1ab_0    conda-forge
pillow                    9.2.0            py37h42a8222_2    conda-forge
pip                       22.3.1             pyhd8ed1ab_0    conda-forge
pkginfo                   1.8.3              pyhd8ed1ab_0    conda-forge
platformdirs              2.5.2              pyhd8ed1ab_1    conda-forge
pluggy                    1.0.0            py37h03978a9_3    conda-forge
psutil                    5.9.3            py37h51bd9d9_0    conda-forge
pthread-stubs             0.4               hcd874cb_1001    conda-forge
py-lief                   0.12.2           py37h7f67f24_0    conda-forge
pycosat                   0.6.4            py37h51bd9d9_0    conda-forge
pycparser                 2.21               pyhd8ed1ab_0    conda-forge
pygments                  2.13.0             pyhd8ed1ab_0    conda-forge
pyopenssl                 22.1.0             pyhd8ed1ab_0    conda-forge
pyparsing                 3.0.9              pyhd8ed1ab_0    conda-forge
pyreadline                2.1             py37h03978a9_1006    conda-forge
pysocks                   1.7.1            py37h03978a9_5    conda-forge
pytest                    7.2.0            py37h03978a9_0    conda-forge
python                    3.7.12          h900ac77_100_cpython    conda-forge
python-dateutil           2.8.2              pyhd8ed1ab_0    conda-forge
python-libarchive-c       4.0              py37h03978a9_1    conda-forge
python-stdnum             1.17               pyhd8ed1ab_0    conda-forge
python_abi                3.7                     2_cp37m    conda-forge
pytz                      2022.6             pyhd8ed1ab_0    conda-forge
pyyaml                    6.0              py37hcc03f2d_4    conda-forge
requests                  2.28.1             pyhd8ed1ab_1    conda-forge
rich                      12.6.0             pyhd8ed1ab_0    conda-forge
ripgrep                   13.0.0               h7f3b576_2    conda-forge
ruamel.yaml               0.17.21          py37hcc03f2d_1    conda-forge
ruamel.yaml.clib          0.2.6            py37hcc03f2d_1    conda-forge
ruamel_yaml               0.15.80         py37hcc03f2d_1007    conda-forge
setuptools                59.8.0           py37h03978a9_1    conda-forge
shellingham               1.5.0              pyhd8ed1ab_0    conda-forge
shortuuid                 1.0.11                   pypi_0    pypi
six                       1.16.0             pyh6c4a22f_0    conda-forge
soupsieve                 2.3.2.post1        pyhd8ed1ab_0    conda-forge
sqlite                    3.39.4               hcfcfb64_0    conda-forge
tbb                       2021.6.0             h91493d7_1    conda-forge
tifffile                  2021.11.2                pypi_0    pypi
tk                        8.6.12               h8ffe710_0    conda-forge
toml                      0.10.2             pyhd8ed1ab_0    conda-forge
tomli                     2.0.1              pyhd8ed1ab_0    conda-forge
toolz                     0.12.0             pyhd8ed1ab_0    conda-forge
tqdm                      4.64.1             pyhd8ed1ab_0    conda-forge
typed-ast                 1.5.4            py37hcc03f2d_0    conda-forge
typer                     0.7.0              pyhd8ed1ab_0    conda-forge
typing-extensions         4.4.0                hd8ed1ab_0    conda-forge
typing_extensions         4.4.0              pyha770c72_0    conda-forge
ucrt                      10.0.22621.0         h57928b3_0    conda-forge
urllib3                   1.26.11            pyhd8ed1ab_0    conda-forge
vc                        14.3                 h3d8a991_9    conda-forge
vs2015_runtime            14.32.31332          h1d6e394_9    conda-forge
websockets                10.4                     pypi_0    pypi
wheel                     0.38.4             pyhd8ed1ab_0    conda-forge
win_inet_pton             1.1.0            py37h03978a9_4    conda-forge
xarray                    0.20.2             pyhd8ed1ab_0    conda-forge
xorg-libxau               1.0.9                hcd874cb_0    conda-forge
xorg-libxdmcp             1.1.3                hcd874cb_0    conda-forge
xz                        5.2.6                h8d14728_0    conda-forge
yaml                      0.2.5                h8ffe710_2    conda-forge
zarr                      2.13.3             pyhd8ed1ab_0    conda-forge
zipp                      3.10.0             pyhd8ed1ab_0    conda-forge
zlib                      1.2.13               hcfcfb64_4    conda-forge
zstd                      1.5.2                h7755175_4    conda-forge

@FynnBe FynnBe mentioned this pull request Nov 17, 2022
@oeway
Copy link
Contributor

oeway commented Nov 17, 2022

Hi, sorry for the confusion, we are in the process of migrating from imjoy-rpc to hypha, so you should import stuff from imjoy_rpc.hypha. And here the usage of the register_codecs function has changed too, you can only call than when you connected to the server.

    server = await connect_to_server({"server_url": server_url})

    # Register the codecs
    server.register_codec(
        {"name": "zarr-group", "type": zarr.Group, "encoder": encode_zarr_store}
    )

    z = zarr.array(np.arange(100))
  
    # Use the echo function to do a round-trip with the zarr object
    # Note: Since we didn't create a decoder, so we won't get the zarr object, but a zarr store interface
    z2 = await server.echo(z)
    print(z2)

For more detailed usage for the register_codecs, see here: https://github.com/imjoy-team/imjoy-rpc#encoding-and-decoding-custom-objects

Here is a complete implementation:

Worker with zarr codecs
import asyncio
from imjoy_rpc.hypha import connect_to_server
# from bioimageio.core.contrib import all_ops
import zarr
import numpy as np

def encode_zarr_store(zobj):
    """Encode the zarr store."""
    import zarr

    path_prefix = f"{zobj.path}/" if zobj.path else ""

    def getItem(key, options=None):
        return zobj.store[path_prefix + key]

    def setItem(key, value):
        zobj.store[path_prefix + key] = value

    def containsItem(key, options=None):
        if path_prefix + key in zobj.store:
            return True

    return {
        "_rintf": True,
        "_rtype": "zarr-array" if isinstance(zobj, zarr.Array) else "zarr-group",
        "getItem": getItem,
        "setItem": setItem,
        "containsItem": containsItem,
    }


def register_codecs(server):
    """Register default codecs."""
    server.register_codec(
        {"name": "zarr-array", "type": zarr.Array, "encoder": encode_zarr_store}
    )

    server.register_codec(
        {"name": "zarr-group", "type": zarr.Group, "encoder": encode_zarr_store}
    )

async def start_server(server_url):
    server = await connect_to_server({"server_url": server_url})

    # Register the codecs
    register_codecs(server)
    z = zarr.array(np.arange(100))
    
    # Use the echo function to do a round-trip with the zarr object
    # Note: Since we didn't create a decoder, so we won't get the zarr object, but a zarr store interface
    z2 = await server.echo(z)
    print(z2)
    
    def hello(name):
        print("Hello " + name)
        return "Hello " + name

    await server.register_service({
        "name": "BioImageIO Contrib Module",
        "id": "bioimageio-contrib",
        "config": {
            "visibility": "public",
            "run_in_executor": True # This will make sure all the sync functions run in a separate thread
        },
        "hello": hello,
        # "ops": all_ops
    })

    print(f"hello world service regisered at workspace: {server.config.workspace}")
    print(f"Test it with the http proxy: {server_url}/{server.config.workspace}/services/bioimageio-contrib/hello?name=John")

if __name__ == "__main__":
    server_url = "https://ai.imjoy.io"
    loop = asyncio.get_event_loop()
    loop.create_task(start_server(server_url))
    loop.run_forever()

@oeway
Copy link
Contributor

oeway commented Nov 17, 2022

FYI: I updated the imjoy-rpc docs for the new version (now called v2) here: https://github.com/imjoy-team/imjoy-rpc/blob/master/imjoy-rpc-v2.md

@FynnBe
Copy link
Member Author

FynnBe commented Nov 18, 2022

@oeway How about decoding?
In particular for dask arrays this would be interesting. Although solving it for zarr would probably suffice, because of
https://docs.dask.org/en/stable/generated/dask.array.to_zarr.html
and https://docs.dask.org/en/stable/generated/dask.array.from_zarr.html

Ultimately in this context I'm interested in xarray.DataArray (has either numpy.ndarray or dask.array.DataArray as data storage).
So we could also use a DuckArray: https://docs.xarray.dev/en/stable/internals/duck-arrays-integration.html
The most promising approach I currently see is with https://docs.dask.org/en/stable/custom-collections.html

The most elegant would probably to go through fsspec, which both dask and zarr are compatible with. They also support an https interface. https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations
I just haven't dug deep enough in that direction...

What would your approach to encoding dask/zarr be (and why is it not (yet?) included in the examples 😇 )?

@FynnBe
Copy link
Member Author

FynnBe commented Nov 18, 2022

This dummy encoding/decoding for zarr works, but is not asynchronous...
import asyncio
from collections.abc import MutableMapping
from typing import Literal, TypedDict

import numpy as np
import zarr


class IZarr(TypedDict):
    _rintf: bool
    _rtype: Literal["zarr-array", "zarr_group"]
    getItem: callable
    setItem: callable
    containsItem: callable


def encode_zarr_store(zobj) -> IZarr:
    """Encode the zarr store."""

    path_prefix = f"{zobj.path}/" if zobj.path else ""

    def getItem(key, options=None):
        return zobj.store[path_prefix + key]

    def setItem(key, value):
        zobj.store[path_prefix + key] = value

    def containsItem(key, options=None):
        return path_prefix + key in zobj.store

    return dict(
        _rintf=True,
        _rtype="zarr-array" if isinstance(zobj, zarr.Array) else "zarr-group",
        getItem=getItem,
        setItem=setItem,
        containsItem=containsItem,
    )


class RPC_Store(MutableMapping):
    def __init__(self, get_item, set_item, contains_item):  # , fill_value, nchunks: int):
        super().__init__()
        self.get_item = get_item
        self.set_item = set_item
        self.contains = contains_item

    def __getitem__(self, key):
        return self.get_item(key)

    def __setitem__(self, key, value):
        self.set_item(key, value)

    def __contains__(self, key):
        return self.contains(key)

    def __delitem__(self, key):
        raise NotImplementedError
        self[key] = self.fill_value

    def __iter__(self):
        raise NotImplementedError

    def __len__(self):
        raise NotImplementedError
        return self.nchunks


def decode_zarr_store(iz: IZarr):
    return zarr.convenience.open(
        RPC_Store(
            get_item=iz["getItem"],
            set_item=iz["setItem"],
            contains_item=iz["containsItem"],
            # fill_value=iz["fill_value"],
            # nchunks=iz["nchunks"],
        )
    )


if __name__ == "__main__":
    z = zarr.array(np.arange(100))
    e = encode_zarr_store(z)
    z2 = decode_zarr_store(e)
    print(z2.shape)
    print(z2[:10])
    print(z2[10])

with async def getItem, etc. it does not work as zarr.Array is not asynchronous...

for now I'll hack it to run synchronously, blocking the event loop. But this could be optimized.
These are interesting:
zarr-developers/zarr-python#536
https://github.com/martindurant/async-zarr

@FynnBe
Copy link
Member Author

FynnBe commented Nov 18, 2022

for now I'll hack it to run synchronously, blocking the event loop

While I found working options for calling async functions synchronously (e.g. unsync), these don't work nested, i.e. within another async function, which we would need here...
This is getting too hacky; there gotta be a cleaner way!

@oeway
Copy link
Contributor

oeway commented Nov 20, 2022

For decoding, we need to convert the async functions into sync so Zarr can work with it. This can be achieved by using threading(something similar has been implemented in fsspec too). Or, we can potentially use azarr, see martindurant/async-zarr#1

@oeway
Copy link
Contributor

oeway commented Nov 23, 2022

Hi @FynnBe If you want to just encode/decode xarray, you can already do it without any async conversion. async conversion is only needed when we want to do lazy encoding (meaning the encoded object contains functions).

Here you can find a working example for encoding/decoding xarray:

import asyncio
from imjoy_rpc.hypha import connect_to_server
import xarray as xr
import numpy as np

def encode_xarray(obj):
    """Encode the zarr store."""
    assert isinstance(obj, xr.DataArray)
    return {
        "_rintf": True,
        "_rtype": "xarray",
        "data": obj.data,
        "dims": obj.dims,
        "attrs": obj.attrs,
        "name": obj.name,
    }

def decode_xarray(obj):
    assert obj["_rtype"] == "xarray"
    return xr.DataArray(
                data=obj["data"],
                dims=obj["dims"],
                attrs=obj.get("attrs", {}),
                name=obj.get("name", None),
        )


async def start_server(server_url):
    server = await connect_to_server({"server_url": server_url})

    # Register the codecs
    server.register_codec(
        {"name": "xarray", "type": xr.DataArray, "encoder": encode_xarray, "decoder": decode_xarray}
    )
    
    z = xr.DataArray(data=np.arange(100), dims=["x"], attrs={"test": "test"}, name="mydata")

    # Use the echo function to do a round-trip with the xarray object
    z2 = await server.echo(z)

    assert isinstance(z2, xr.DataArray)
    assert z2.attrs["test"] == "test"
    assert z2.dims == ("x",)
    assert z2.data[0] == 0
    assert z2.data[99] == 99
    assert z2.name == "mydata"
    print("Success!")

if __name__ == "__main__":
    server_url = "https://ai.imjoy.io"
    loop = asyncio.get_event_loop()
    loop.create_task(start_server(server_url))
    loop.run_forever()

Also see here: https://github.com/imjoy-team/imjoy-rpc/blob/master/imjoy-rpc-v2.md#example-1-encode-and-decode-xarray

@FynnBe
Copy link
Member Author

FynnBe commented Nov 24, 2022

Hi @FynnBe If you want to just encode/decode xarray, you can already do it without any async conversion. async conversion is only needed when we want to do lazy encoding (meaning the encoded object contains functions).

I saw that, but I would like to decode lazy xarrays with a dask array as their data property...
see superseding #316

@FynnBe
Copy link
Member Author

FynnBe commented Feb 13, 2023

closed in favor of new repo: https://github.com/bioimage-io/workflows-bioimage-io-python

@FynnBe FynnBe closed this Feb 13, 2023
@FynnBe FynnBe mentioned this pull request Feb 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants