Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Durable user notifications and begin Cell embedding formalism #2513

Merged
merged 20 commits into from
Jan 25, 2022
Merged
78 changes: 55 additions & 23 deletions synapse/cortex.py
Original file line number Diff line number Diff line change
@@ -68,6 +68,7 @@
import synapse.lib.stormlib.project as s_stormlib_project # NOQA
import synapse.lib.stormlib.version as s_stormlib_version # NOQA
import synapse.lib.stormlib.modelext as s_stormlib_modelext # NOQA
import synapse.lib.stormlib.notifications as s_stormlib_notifications # NOQA

logger = logging.getLogger(__name__)
stormlogger = logging.getLogger('synapse.storm')
@@ -950,6 +951,28 @@ async def getAxonBytes(self, sha256):
async for byts in self.cell.axon.get(s_common.uhex(sha256)):
yield byts

@s_cell.adminapi()
async def getUserNotif(self, indx):
return await self.cell.getUserNotif(indx)

@s_cell.adminapi()
async def delUserNotif(self, indx):
return await self.cell.delUserNotif(indx)

@s_cell.adminapi()
async def addUserNotif(self, useriden, mesgtype, mesgdata=None):
return await self.cell.addUserNotif(useriden, mesgtype, mesgdata=mesgdata)

@s_cell.adminapi()
async def iterUserNotifs(self, useriden, size=None):
async for item in self.cell.iterUserNotifs(useriden, size=size):
yield item

@s_cell.adminapi()
async def watchAllUserNotifs(self, offs=None):
async for item in self.cell.watchAllUserNotifs(offs=offs):
yield item

class Cortex(s_cell.Cell): # type: ignore
'''
A Cortex implements the synapse hypergraph.
@@ -1151,7 +1174,6 @@ async def initServiceStorage(self):

# Initialize our storage and views
await self._initCoreAxon()
await self._initCoreJsonStor()

await self._initCoreLayers()
await self._initCoreViews()
@@ -1292,9 +1314,12 @@ async def _addAllLayrRead(self):
async def initServiceRuntime(self):

# do any post-nexus initialization here...
await self._initJsonStor()

if self.isactive:
await self._checkNexsIndx()
await self._checkLayerModels()

await self._initCoreMods()
await self._initStormSvcs()

@@ -3009,14 +3034,14 @@ async def _initCoreHive(self):
await self.stormvars.set(s_stormlib_cell.runtime_fixes_key, s_stormlib_cell.getMaxHotFixes())
self.onfini(self.stormvars)

async def _initCoreJsonStor(self):
async def _initJsonStor(self):

self.jsonurl = self.conf.get('jsonstor')
if self.jsonurl is not None:
self.jsonstor = await s_telepath.Client.anit(self.jsonurl)
else:
path = os.path.join(self.dirn, 'jsonstor')
self.jsonstor = await s_jsonstor.JsonStorCell.anit(path)
self.jsonstor = await s_jsonstor.JsonStorCell.anit(path, parent=self)

self.onfini(self.jsonstor)

@@ -3044,42 +3069,49 @@ async def getJsonObjProp(self, path, prop):
async def delJsonObj(self, path):
if self.jsonurl is not None:
await self.jsonstor.waitready()
return await self.jsonstor.delPathObj(path)
return await self._delJsonObj(path)
return await self.jsonstor.delPathObj(path)

async def delJsonObjProp(self, path, prop):
if self.jsonurl is not None:
await self.jsonstor.waitready()
return await self.jsonstor.delPathObjProp(path, prop)
return await self._delJsonObjProp(path, prop)
return await self.jsonstor.delPathObjProp(path, prop)

async def setJsonObj(self, path, item):
if self.jsonurl is not None:
await self.jsonstor.waitready()
return await self.jsonstor.setPathObj(path, item)
return await self._setJsonObj(path, item)
return await self.jsonstor.setPathObj(path, item)

async def setJsonObjProp(self, path, prop, item):
if self.jsonurl is not None:
await self.jsonstor.waitready()
return await self.jsonstor.setPathObjProp(path, prop, item)
return await self._setJsonObjProp(path, prop, item)
return await self.jsonstor.setPathObjProp(path, prop, item)

@s_nexus.Pusher.onPushAuto('json:del')
async def _delJsonObj(self, path):
return await self.jsonstor.delPathObj(path)
async def getUserNotif(self, indx):
if self.jsonurl is not None:
await self.jsonstor.waitready()
return await self.jsonstor.getUserNotif(indx)

@s_nexus.Pusher.onPushAuto('json:set')
async def _setJsonObj(self, path, item):
return await self.jsonstor.setPathObj(path, item)
async def delUserNotif(self, indx):
if self.jsonurl is not None:
await self.jsonstor.waitready()
return await self.jsonstor.delUserNotif(indx)

@s_nexus.Pusher.onPushAuto('json:del:prop')
async def _delJsonObjProp(self, path, prop):
return await self.jsonstor.delPathObjProp(path, prop)
async def addUserNotif(self, useriden, mesgtype, mesgdata=None):
if self.jsonurl is not None:
await self.jsonstor.waitready()
return await self.jsonstor.addUserNotif(useriden, mesgtype, mesgdata=mesgdata)

@s_nexus.Pusher.onPushAuto('json:set:prop')
async def _setJsonObjProp(self, path, prop, item):
return await self.jsonstor.setPathObjProp(path, prop, item)
async def iterUserNotifs(self, useriden, size=None):
if self.jsonurl is not None:
await self.jsonstor.waitready()
async for item in self.jsonstor.iterUserNotifs(useriden, size=size):
yield item

async def watchAllUserNotifs(self, offs=None):
if self.jsonurl is not None:
await self.jsonstor.waitready()
async for item in self.jsonstor.watchAllUserNotifs(offs=offs):
yield item

async def _initCoreAxon(self):
turl = self.conf.get('axon')
23 changes: 17 additions & 6 deletions synapse/lib/cell.py
Original file line number Diff line number Diff line change
@@ -824,7 +824,7 @@ class Cell(s_nexus.Pusher, s_telepath.Aware):
VERSION = s_version.version
VERSTRING = s_version.verstring

async def __anit__(self, dirn, conf=None, readonly=False):
async def __anit__(self, dirn, conf=None, readonly=False, parent=None):

# phase 1
if conf is None:
@@ -836,6 +836,7 @@ async def __anit__(self, dirn, conf=None, readonly=False):
self.dirn = s_common.gendir(dirn)

self.auth = None
self.cellparent = parent
self.sessions = {}
self.isactive = False
self.inaugural = False
@@ -1027,9 +1028,10 @@ async def initServiceStorage(self):
pass

async def initNexusSubsystem(self):
mirror = self.conf.get('mirror')
await self.nexsroot.startup(mirror, celliden=self.iden)
await self.setCellActive(mirror is None)
if self.cellparent is None:
mirror = self.conf.get('mirror')
await self.nexsroot.startup(mirror, celliden=self.iden)
await self.setCellActive(mirror is None)

async def initServiceNetwork(self):

@@ -1123,6 +1125,9 @@ async def _ctorNexsRoot(self):
'''
Initialize a NexsRoot to use for the cell.
'''
if self.cellparent:
return self.cellparent.nexsroot

map_async = self.conf.get('nexslog:async')
return await s_nexus.NexsRoot.anit(self.dirn, donexslog=self.donexslog, map_async=map_async)

@@ -2003,7 +2008,7 @@ async def _initCellHive(self):
isnew = not self.slab.dbexists('hive')

db = self.slab.initdb('hive')
hive = await s_hive.SlabHive.anit(self.slab, db=db, nexsroot=self.nexsroot)
hive = await s_hive.SlabHive.anit(self.slab, db=db, nexsroot=self.getCellNexsRoot())
self.onfini(hive)

if isnew:
@@ -2042,10 +2047,16 @@ async def _initCellAuth(self):

return await self._initCellHiveAuth()

def getCellNexsRoot(self):
# the "cell scope" nexusroot only exists if we are *not* embedded
# (aka we dont have a self.cellparent)
if self.cellparent is None:
return self.nexsroot

async def _initCellHiveAuth(self):

node = await self.hive.open(('auth',))
auth = await s_hiveauth.Auth.anit(node, nexsroot=self.nexsroot)
auth = await s_hiveauth.Auth.anit(node, nexsroot=self.getCellNexsRoot())

self.onfini(auth.fini)
return auth
94 changes: 94 additions & 0 deletions synapse/lib/jsonstor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import asyncio
import logging

import synapse.exc as s_exc
import synapse.common as s_common
@@ -10,6 +11,7 @@
import synapse.lib.msgpack as s_msgpack
import synapse.lib.lmdbslab as s_lmdbslab

logger = logging.getLogger(__name__)

class JsonStor(s_base.Base):
'''
@@ -391,6 +393,28 @@ async def getsQueue(self, name, offs, size=None, cull=True, wait=True):
async for item in self.cell.getsQueue(name, offs, size=size, cull=cull, wait=wait):
yield item

@s_cell.adminapi()
async def addUserNotif(self, useriden, mesgtype, mesgdata=None):
return await self.cell.addUserNotif(useriden, mesgtype, mesgdata=mesgdata)

@s_cell.adminapi()
async def getUserNotif(self, indx):
return await self.cell.getUserNotif(indx)

@s_cell.adminapi()
async def delUserNotif(self, indx):
return await self.cell.delUserNotif(indx)

@s_cell.adminapi()
async def iterUserNotifs(self, useriden, size=None):
async for item in self.cell.iterUserNotifs(useriden, size=size):
yield item

@s_cell.adminapi()
async def watchAllUserNotifs(self, offs=None):
async for item in self.cell.watchAllUserNotifs(offs=offs):
yield item

class JsonStorCell(s_cell.Cell):

cellapi = JsonStorApi
@@ -402,6 +426,13 @@ async def initServiceStorage(self):
self.onfini(self.jsonstor.fini)
self.onfini(self.multique.fini)

self.notif_abrv_user = self.slab.getNameAbrv('users')
self.notif_abrv_type = self.slab.getNameAbrv('types')

self.notifseqn = self.slab.getSeqn('notifs')
self.notif_indx_usertime = self.slab.initdb('indx:user:time', dupsort=True)
self.notif_indx_usertype = self.slab.initdb('indx:user:type', dupsort=True)

async def getPathList(self, path):
async for item in self.jsonstor.getPathList(path):
yield item
@@ -483,3 +514,66 @@ async def getsQueue(self, name, offs, size=None, cull=True, wait=True):
await self.cullQueue(name, offs - 1)
async for item in self.multique.gets(name, offs, size=size, wait=wait):
yield item

async def addUserNotif(self, useriden, mesgtype, mesgdata=None):
mesg = (useriden, s_common.now(), mesgtype, mesgdata)
return await self._push('notif:add', mesg)

async def getUserNotif(self, indx):
return self.notifseqn.get(indx)

@s_nexus.Pusher.onPush('notif:add', passitem=True)
async def _addUserNotif(self, mesg, nexsitem):

indx = self.notifseqn.add(mesg, indx=nexsitem[0])
indxbyts = s_common.int64en(indx)

useriden, mesgtime, mesgtype, mesgdata = mesg

userbyts = s_common.uhex(useriden)
timebyts = s_common.int64en(mesgtime)
typeabrv = self.notif_abrv_type.setBytsToAbrv(mesgtype.encode())

self.slab.put(userbyts + timebyts, indxbyts, db=self.notif_indx_usertime, dupdata=True)
self.slab.put(userbyts + typeabrv + timebyts, indxbyts, db=self.notif_indx_usertype, dupdata=True)

return indx

@s_nexus.Pusher.onPushAuto('notif:del')
async def delUserNotif(self, indx):
envl = self.notifseqn.pop(indx)
if envl is None:
return

mesg = envl[1]
useriden, mesgtime, mesgtype, mesgdata = mesg

indxbyts = s_common.int64en(indx)
userbyts = s_common.uhex(useriden)
timebyts = s_common.int64en(mesgtime)
typeabrv = self.notif_abrv_type.setBytsToAbrv(mesgtype.encode())

self.slab.delete(userbyts + timebyts, indxbyts, db=self.notif_indx_usertime)
self.slab.delete(userbyts + typeabrv + timebyts, indxbyts, db=self.notif_indx_usertype)

async def iterUserNotifs(self, useriden, size=None):
# iterate user notifications backward
userbyts = s_common.uhex(useriden)
count = 0
for _, indxbyts in self.slab.scanByPrefBack(userbyts, db=self.notif_indx_usertype):
indx = s_common.int64un(indxbyts)
mesg = self.notifseqn.getraw(indxbyts)
yield (indx, mesg)
count += 1
if size is not None and count >= size:
break

async def watchAllUserNotifs(self, offs=None):
# yield only new notifications as they arrive
if offs is None:
offs = self.notifseqn.index()
async for item in self.notifseqn.aiter(offs=offs, wait=True, timeout=120):
yield item

# async def iterUserNotifsByTime(self, useriden, ival):
# async def iterUserNotifsByType(self, useriden, mesgtype, ival=None):
5 changes: 5 additions & 0 deletions synapse/lib/slabseqn.py
Original file line number Diff line number Diff line change
@@ -295,6 +295,11 @@ def get(self, offs):
if valu is not None:
return s_msgpack.un(valu)

def getraw(self, byts):
valu = self.slab.get(byts, db=self.db)
if valu is not None:
return s_msgpack.un(valu)

def slice(self, offs, size):

imax = size - 1
Loading