Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Durable user notifications and begin Cell embedding formalism #2513

Merged
merged 20 commits into from
Jan 25, 2022
Merged
77 changes: 77 additions & 0 deletions synapse/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import synapse.lib.stormlib.project as s_stormlib_project # NOQA
import synapse.lib.stormlib.version as s_stormlib_version # NOQA
import synapse.lib.stormlib.modelext as s_stormlib_modelext # NOQA
import synapse.lib.stormlib.notifications as s_stormlib_notifications # NOQA

logger = logging.getLogger(__name__)
stormlogger = logging.getLogger('synapse.storm')
Expand Down Expand Up @@ -1142,6 +1143,7 @@ async def initServiceStorage(self):
await self._initCoreViews()
self.onfini(self._finiStor)
await self._initCoreQueues()
await self._initCoreNotifs()

self.addHealthFunc(self._cortexHealth)

Expand Down Expand Up @@ -1660,6 +1662,81 @@ async def _initCoreQueues(self):

self.multiqueue = await slab.getMultiQueue('cortex:queue', nexsroot=self.nexsroot)

async def _initCoreNotifs(self):
path = os.path.join(self.dirn, 'slabs', 'notifications.lmdb')

self.notifslab = await s_lmdbslab.Slab.anit(path)
self.onfini(self.notifslab.fini)

self.notif_abrv_user = self.notifslab.getNameAbrv('users')
self.notif_abrv_type = self.notifslab.getNameAbrv('types')

self.notifseqn = self.notifslab.getSeqn('notifs')
self.notif_indx_usertime = self.notifslab.initdb('indx:user:time', dupsort=True)
self.notif_indx_usertype = self.notifslab.initdb('indx:user:type', dupsort=True)

async def addUserNotif(self, useriden, mesgtype, mesgdata=None):

user = self.auth.user(useriden)
if user is None:
raise s_exc.NoSuchUser(mesg=f'No user found with iden: {useriden}')

mesg = (useriden, s_common.now(), mesgtype, mesgdata)

return await self._push('notif:add', mesg)

async def delUserNotif(self, indx):
return await self._push('notif:del', indx)

async def getUserNotif(self, indx):
return self.notifseqn.get(indx)

@s_nexus.Pusher.onPush('notif:add', passitem=True)
async def _addUserNotif(self, mesg, nexsitem):

indx = self.notifseqn.add(mesg, indx=nexsitem[0])
indxbyts = s_common.int64en(indx)

useriden, mesgtime, mesgtype, mesgdata = mesg

userbyts = s_common.uhex(useriden)
timebyts = s_common.int64en(mesgtime)
typeabrv = self.notif_abrv_type.setBytsToAbrv(mesgtype.encode())

self.notifslab.put(userbyts + timebyts, indxbyts, db=self.notif_indx_usertime, dupdata=True)
self.notifslab.put(userbyts + typeabrv, indxbyts, db=self.notif_indx_usertype, dupdata=True)

return indx

@s_nexus.Pusher.onPush('notif:del')
async def _delUserNotif(self, indx):

envl = self.notifseqn.pop(indx)
if envl is None:
return

mesg = envl[1]
useriden, mesgtime, mesgtype, mesgdata = mesg

indxbyts = s_common.int64en(indx)
userbyts = s_common.uhex(useriden)
timebyts = s_common.int64en(mesgtime)
typeabrv = self.notif_abrv_type.setBytsToAbrv(mesgtype.encode())

self.notifslab.delete(userbyts + timebyts, indxbyts, db=self.notif_indx_usertime)
self.notifslab.delete(userbyts + typeabrv + timebyts, indxbyts, db=self.notif_indx_usertype)

async def iterUserNotifs(self, useriden):
invisig0th marked this conversation as resolved.
Show resolved Hide resolved
# iterate user notifications backward
userbyts = s_common.uhex(useriden)
for _, indxbyts in self.notifslab.scanByPrefBack(userbyts, db=self.notif_indx_usertype):
indx = s_common.int64un(indxbyts)
mesg = self.notifseqn.getraw(indxbyts)
yield (indx, mesg)

# async def iterUserNotifsByTime(self, useriden, ival):
# async def iterUserNotifsByType(self, useriden, mesgtype):

async def setStormCmd(self, cdef):
await self._reqStormCmd(cdef)
return await self._push('cmd:set', cdef)
Expand Down
5 changes: 5 additions & 0 deletions synapse/lib/slabseqn.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,11 @@ def get(self, offs):
if valu is not None:
return s_msgpack.un(valu)

def getraw(self, byts):
valu = self.slab.get(byts, db=self.db)
if valu is not None:
return s_msgpack.un(valu)

def slice(self, offs, size):

imax = size - 1
Expand Down
68 changes: 68 additions & 0 deletions synapse/lib/stormlib/notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import synapse.exc as s_exc
import synapse.lib.stormtypes as s_stormtypes

@s_stormtypes.registry.registerLib
class NotifyLib(s_stormtypes.Lib):
_storm_lib_path = ('notifications', )
_storm_locals = (
{
'name': 'list',
'desc': '''
Yield (<indx>, <mesg>) tuples for a user's notifications.

''',
'type': {
'type': 'function', '_funcname': 'list',
'args': (
{'name': 'size', 'type': 'int', 'desc': 'The max number of notifications to yield.', 'default': None},
),
'returns': {
'desc': 'A new ``storm:imap:server`` instance.'
},
'returns': {
'name': 'Yields', 'type': 'list',
'desc': 'Yields (useriden, time, mesgtype, msgdata) tuples.'},
},
},
{
'name': 'del',
'desc': '''
Delete a previously delivered notification.

''',
'type': {
'type': 'function', '_funcname': '_del',
'args': (
{'name': 'indx', 'type': 'int', 'desc': 'The index number of the notification to delete.'},
),
'returns': {
'name': 'retn', 'type': 'list',
'desc': 'Returns an ($ok, $valu) tuple.'},
},
},
)

def getObjLocals(self):
return {
'del': self._del,
'list': self.list,
# 'bytime':
# 'bytype':
}

async def _del(self, indx):
indx = await s_stormtypes.toint(indx)
mesg = await self.runt.snap.core.getUserNotif(indx)
if mesg[0] != self.runt.user.iden and not self.runt.isAdmin():
mesg = 'You may only delete notifications which belong to you!'
raise s_exc.AuthDeny(mesg=mesg)
await self.runt.snap.core.delUserNotif(indx)

async def list(self, size=None):
size = await s_stormtypes.toint(size, noneok=True)
count = 0
async for mesg in self.runt.snap.core.iterUserNotifs(self.runt.user.iden):
yield mesg
count += 1
if size is not None and size <= count:
break
18 changes: 18 additions & 0 deletions synapse/lib/stormtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6663,6 +6663,8 @@ def __hash__(self):
def getObjLocals(self):
return {
'get': self._methUserGet,
'tell': self._methUserTell,
invisig0th marked this conversation as resolved.
Show resolved Hide resolved
'notify': self._methUserNotify,
'roles': self._methUserRoles,
'allowed': self._methUserAllowed,
'grant': self._methUserGrant,
Expand All @@ -6677,6 +6679,22 @@ def getObjLocals(self):
'setPasswd': self._methUserSetPasswd,
}

async def _methUserTell(self, text):
mesgdata = {
'text': await tostr(text),
'from': self.runt.user.iden,
}
self.runt.confirm(('tell', self.valu), default=True)
return await self.runt.snap.core.addUserNotif(self.valu, 'tell', mesgdata)

async def _methUserNotify(self, mesgtype, mesgdata):
mesgtype = await tostr(mesgtype)
mesgdata = await toprim(mesgdata)
if not self.runt.isAdmin():
mesg = '$user.notify() method requires admin privs.'
raise s_exc.AuthDeny(mesg=mesg)
return await self.runt.snap.core.addUserNotif(self.valu, mesgtype, mesgdata)

async def _setUserName(self, name):

name = await tostr(name)
Expand Down
34 changes: 34 additions & 0 deletions synapse/tests/test_lib_stormtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,40 @@ def __repr__(self):

class StormTypesTest(s_test.SynTest):

async def test_stormtypes_notify(self):
async with self.getTestCore() as core:
visi = await core.auth.addUser('visi')

asvisi = {'user': visi.iden}
mesgindx = await core.callStorm('return($lib.auth.users.byname(root).tell(heya))', opts=asvisi)

msgs = await core.stormlist('''
for ($indx, $mesg) in $lib.notifications.list() {
($useriden, $mesgtime, $mesgtype, $mesgdata) = $mesg
if ($mesgtype = "tell") {
$lib.print("{user} says {text}", user=$mesgdata.from, text=$mesgdata.text)
}
}
''')
self.stormIsInPrint(f'{visi.iden} says heya', msgs)

with self.raises(s_exc.AuthDeny):
opts = {'user': visi.iden, 'vars': {'indx': mesgindx}}
await core.callStorm('$lib.notifications.del($indx)', opts=opts)

opts = {'vars': {'indx': mesgindx}}
await core.callStorm('$lib.notifications.del($indx)', opts=opts)

msgs = await core.stormlist('''
for ($indx, $mesg) in $lib.notifications.list() {
($useriden, $mesgtime, $mesgtype, $mesgdata) = $mesg
if ($mesgtype = "tell") {
$lib.print("{user} says {text}", user=$mesgdata.from, text=$mesgdata.text)
}
}
''')
self.stormNotInPrint(f'{visi.iden} says heya', msgs)

async def test_stormtypes_registry(self):

class NewpType(s_stormtypes.StormType):
Expand Down