-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mutation example #2
Comments
I'll try to add an example in a day or two, or tomorrow if I have time. In the mean time, take a look at ProjectChesire's code that I based this on. The |
Thanks, especially to see how to indicate the mutation's resolvers, and encapsulate input parameters in objects. |
I have subscriptions working on mine, but I'm using aiohttp and rabbitmq as my pubsub so I'm not sure how reusable it would be for people... from graphene.types.objecttype import ObjectType, ObjectTypeOptions
from collections import OrderedDict
from graphene.types import Field as SubField
from graphene.types.utils import yank_fields_from_attrs
from graphene.utils.props import props
from six import get_unbound_function
class SubscriptionOptions(ObjectTypeOptions):
arguments = None
output = None
resolver = None
class SubscriptionType(ObjectType):
@classmethod
def __init_subclass_with_meta__(
cls, resolver=None, output=None, arguments=None, _meta=None, **options
):
if not _meta:
_meta = SubscriptionOptions(cls)
output = output or getattr(cls, "Output", None)
fields = {}
if not output:
# If output is defined, we don't need to get the fields
fields = OrderedDict()
for base in reversed(cls.__mro__):
fields.update(
yank_fields_from_attrs(base.__dict__, _as=SubField)
)
output = cls
if not arguments:
input_class = getattr(cls, "Arguments", None)
if input_class:
arguments = props(input_class)
else:
arguments = {}
if not resolver:
assert hasattr(
cls, "next"
), "All subscriptions must define a next method"
next = getattr(cls, "next", None)
resolver = get_unbound_function(next)
if _meta.fields:
_meta.fields.update(fields)
else:
_meta.fields = fields
_meta.output = output
_meta.resolver = resolver
_meta.arguments = arguments
super(SubscriptionType, cls).__init_subclass_with_meta__(
_meta=_meta, **options
)
# noinspection PyUnusedLocal
@classmethod
def Field(
cls,
name=None,
description=None,
deprecation_reason=None,
required=False,
):
return SubField(
cls._meta.output,
args=cls._meta.arguments,
resolver=cls._meta.resolver,
name=name,
description=description,
deprecation_reason=deprecation_reason,
required=required,
) |
Then from aio_pika.exchange import ExchangeType
import pickle
from graphene import Field
from logzero import logger
from api.definitions.types_subscription import SubscriptionType as SxType
from api.definitions.types_base import BaseSubscription
from api.definitions.enums import PayloadOp
from api.definitions.inputs import ProjectContextInputType
from .type import StaffType
class StaffSubscription(SxType):
class Arguments:
input = ProjectContextInputType()
status = PayloadOp()
staff = Field(StaffType)
@staticmethod
async def next(root, info, **args):
assert "input" in args.keys()
try:
p = info.context["project"]
connection = await info.context["connection"](
info.context["conf"]["AMQP_URI"]
)
async with connection:
channel = await connection.channel()
exchange = await channel.declare_exchange(
type=ExchangeType.FANOUT, name=p.channel_name + ":staff"
)
q = await channel.declare_queue(exclusive=True)
await q.bind(exchange)
async for message in q:
with message.process():
yield StaffSubscription(
staff=pickle.loads(message.body),
status=message.headers["event"],
)
except Exception as e:
logger.debug(e)
class MetaSubscription(BaseSubscription):
staff_settings_subscription = StaffSubscription.Field() |
Note that to run that, you'd need to set up the subscription server, I use a patchwork of graphql ws and aiohttpgraphql |
V similar format for mutationd, as I was trying to keep my patterns the same class UpdateStaffSettings(Mute):
class Arguments:
input = UpdatePersonnelSettingsInputType()
status = Field(Boolean)
staff = Field(StaffType)
@staticmethod
def mutate(root, info, **args):
status = False
s = resolve.enable_positions(root, info, **args)
if s.id:
status = True
return UpdateStaffSettings(staff=s, status=status) Then do a meta mutation that inherits the base mutation, and as subscriptions, enumerate your mutation classes |
Excellent! @izquierdogalan, I'll try to add mutations to the example soon. I can't promise anything, but maybe some time next week. I haven't looked at subscriptions in GraphQL at all yet, so that will have to wait. |
Very thanks @cmmartti and @ProjectCheshire, your work is being of great help! |
--I'm doing everything via docker compose. Files incoming.-- (Start services first, you may need to docker compose build)
|
(well, nvm. apparently github doesnt support dockerfile/or docker.yaml attachments) |
I'm wondering if there is hope to see an example of a nice mutation here :-P I will make a pr if I can figure out how myself promise was asking it here graphql-python/graphene#545 but I guess this is the best place |
please, could you give an example of mutation?
The text was updated successfully, but these errors were encountered: