Skip to content

Commit

Permalink
Merge branch 'master' into feat_dep_updates
Browse files Browse the repository at this point in the history
  • Loading branch information
invisig0th committed Jan 25, 2022
2 parents 4c2b94e + 1b2d327 commit 8e077cb
Show file tree
Hide file tree
Showing 10 changed files with 465 additions and 41 deletions.
78 changes: 55 additions & 23 deletions synapse/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import synapse.lib.stormlib.project as s_stormlib_project # NOQA
import synapse.lib.stormlib.version as s_stormlib_version # NOQA
import synapse.lib.stormlib.modelext as s_stormlib_modelext # NOQA
import synapse.lib.stormlib.notifications as s_stormlib_notifications # NOQA

logger = logging.getLogger(__name__)
stormlogger = logging.getLogger('synapse.storm')
Expand Down Expand Up @@ -950,6 +951,28 @@ async def getAxonBytes(self, sha256):
async for byts in self.cell.axon.get(s_common.uhex(sha256)):
yield byts

@s_cell.adminapi()
async def getUserNotif(self, indx):
return await self.cell.getUserNotif(indx)

@s_cell.adminapi()
async def delUserNotif(self, indx):
return await self.cell.delUserNotif(indx)

@s_cell.adminapi()
async def addUserNotif(self, useriden, mesgtype, mesgdata=None):
return await self.cell.addUserNotif(useriden, mesgtype, mesgdata=mesgdata)

@s_cell.adminapi()
async def iterUserNotifs(self, useriden, size=None):
async for item in self.cell.iterUserNotifs(useriden, size=size):
yield item

@s_cell.adminapi()
async def watchAllUserNotifs(self, offs=None):
async for item in self.cell.watchAllUserNotifs(offs=offs):
yield item

class Cortex(s_cell.Cell): # type: ignore
'''
A Cortex implements the synapse hypergraph.
Expand Down Expand Up @@ -1151,7 +1174,6 @@ async def initServiceStorage(self):

# Initialize our storage and views
await self._initCoreAxon()
await self._initCoreJsonStor()

await self._initCoreLayers()
await self._initCoreViews()
Expand Down Expand Up @@ -1292,9 +1314,12 @@ async def _addAllLayrRead(self):
async def initServiceRuntime(self):

# do any post-nexus initialization here...
await self._initJsonStor()

if self.isactive:
await self._checkNexsIndx()
await self._checkLayerModels()

await self._initCoreMods()
await self._initStormSvcs()

Expand Down Expand Up @@ -3009,14 +3034,14 @@ async def _initCoreHive(self):
await self.stormvars.set(s_stormlib_cell.runtime_fixes_key, s_stormlib_cell.getMaxHotFixes())
self.onfini(self.stormvars)

async def _initCoreJsonStor(self):
async def _initJsonStor(self):

self.jsonurl = self.conf.get('jsonstor')
if self.jsonurl is not None:
self.jsonstor = await s_telepath.Client.anit(self.jsonurl)
else:
path = os.path.join(self.dirn, 'jsonstor')
self.jsonstor = await s_jsonstor.JsonStorCell.anit(path)
self.jsonstor = await s_jsonstor.JsonStorCell.anit(path, parent=self)

self.onfini(self.jsonstor)

Expand Down Expand Up @@ -3044,42 +3069,49 @@ async def getJsonObjProp(self, path, prop):
async def delJsonObj(self, path):
if self.jsonurl is not None:
await self.jsonstor.waitready()
return await self.jsonstor.delPathObj(path)
return await self._delJsonObj(path)
return await self.jsonstor.delPathObj(path)

async def delJsonObjProp(self, path, prop):
if self.jsonurl is not None:
await self.jsonstor.waitready()
return await self.jsonstor.delPathObjProp(path, prop)
return await self._delJsonObjProp(path, prop)
return await self.jsonstor.delPathObjProp(path, prop)

async def setJsonObj(self, path, item):
if self.jsonurl is not None:
await self.jsonstor.waitready()
return await self.jsonstor.setPathObj(path, item)
return await self._setJsonObj(path, item)
return await self.jsonstor.setPathObj(path, item)

async def setJsonObjProp(self, path, prop, item):
if self.jsonurl is not None:
await self.jsonstor.waitready()
return await self.jsonstor.setPathObjProp(path, prop, item)
return await self._setJsonObjProp(path, prop, item)
return await self.jsonstor.setPathObjProp(path, prop, item)

@s_nexus.Pusher.onPushAuto('json:del')
async def _delJsonObj(self, path):
return await self.jsonstor.delPathObj(path)
async def getUserNotif(self, indx):
if self.jsonurl is not None:
await self.jsonstor.waitready()
return await self.jsonstor.getUserNotif(indx)

@s_nexus.Pusher.onPushAuto('json:set')
async def _setJsonObj(self, path, item):
return await self.jsonstor.setPathObj(path, item)
async def delUserNotif(self, indx):
if self.jsonurl is not None:
await self.jsonstor.waitready()
return await self.jsonstor.delUserNotif(indx)

@s_nexus.Pusher.onPushAuto('json:del:prop')
async def _delJsonObjProp(self, path, prop):
return await self.jsonstor.delPathObjProp(path, prop)
async def addUserNotif(self, useriden, mesgtype, mesgdata=None):
if self.jsonurl is not None:
await self.jsonstor.waitready()
return await self.jsonstor.addUserNotif(useriden, mesgtype, mesgdata=mesgdata)

@s_nexus.Pusher.onPushAuto('json:set:prop')
async def _setJsonObjProp(self, path, prop, item):
return await self.jsonstor.setPathObjProp(path, prop, item)
async def iterUserNotifs(self, useriden, size=None):
if self.jsonurl is not None:
await self.jsonstor.waitready()
async for item in self.jsonstor.iterUserNotifs(useriden, size=size):
yield item

async def watchAllUserNotifs(self, offs=None):
if self.jsonurl is not None:
await self.jsonstor.waitready()
async for item in self.jsonstor.watchAllUserNotifs(offs=offs):
yield item

async def _initCoreAxon(self):
turl = self.conf.get('axon')
Expand Down
23 changes: 17 additions & 6 deletions synapse/lib/cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ class Cell(s_nexus.Pusher, s_telepath.Aware):
VERSION = s_version.version
VERSTRING = s_version.verstring

async def __anit__(self, dirn, conf=None, readonly=False):
async def __anit__(self, dirn, conf=None, readonly=False, parent=None):

# phase 1
if conf is None:
Expand All @@ -836,6 +836,7 @@ async def __anit__(self, dirn, conf=None, readonly=False):
self.dirn = s_common.gendir(dirn)

self.auth = None
self.cellparent = parent
self.sessions = {}
self.isactive = False
self.inaugural = False
Expand Down Expand Up @@ -1027,9 +1028,10 @@ async def initServiceStorage(self):
pass

async def initNexusSubsystem(self):
mirror = self.conf.get('mirror')
await self.nexsroot.startup(mirror, celliden=self.iden)
await self.setCellActive(mirror is None)
if self.cellparent is None:
mirror = self.conf.get('mirror')
await self.nexsroot.startup(mirror, celliden=self.iden)
await self.setCellActive(mirror is None)

async def initServiceNetwork(self):

Expand Down Expand Up @@ -1123,6 +1125,9 @@ async def _ctorNexsRoot(self):
'''
Initialize a NexsRoot to use for the cell.
'''
if self.cellparent:
return self.cellparent.nexsroot

map_async = self.conf.get('nexslog:async')
return await s_nexus.NexsRoot.anit(self.dirn, donexslog=self.donexslog, map_async=map_async)

Expand Down Expand Up @@ -2003,7 +2008,7 @@ async def _initCellHive(self):
isnew = not self.slab.dbexists('hive')

db = self.slab.initdb('hive')
hive = await s_hive.SlabHive.anit(self.slab, db=db, nexsroot=self.nexsroot)
hive = await s_hive.SlabHive.anit(self.slab, db=db, nexsroot=self.getCellNexsRoot())
self.onfini(hive)

if isnew:
Expand Down Expand Up @@ -2042,10 +2047,16 @@ async def _initCellAuth(self):

return await self._initCellHiveAuth()

def getCellNexsRoot(self):
# the "cell scope" nexusroot only exists if we are *not* embedded
# (aka we dont have a self.cellparent)
if self.cellparent is None:
return self.nexsroot

async def _initCellHiveAuth(self):

node = await self.hive.open(('auth',))
auth = await s_hiveauth.Auth.anit(node, nexsroot=self.nexsroot)
auth = await s_hiveauth.Auth.anit(node, nexsroot=self.getCellNexsRoot())

self.onfini(auth.fini)
return auth
Expand Down
94 changes: 94 additions & 0 deletions synapse/lib/jsonstor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import asyncio
import logging

import synapse.exc as s_exc
import synapse.common as s_common
Expand All @@ -10,6 +11,7 @@
import synapse.lib.msgpack as s_msgpack
import synapse.lib.lmdbslab as s_lmdbslab

logger = logging.getLogger(__name__)

class JsonStor(s_base.Base):
'''
Expand Down Expand Up @@ -391,6 +393,28 @@ async def getsQueue(self, name, offs, size=None, cull=True, wait=True):
async for item in self.cell.getsQueue(name, offs, size=size, cull=cull, wait=wait):
yield item

@s_cell.adminapi()
async def addUserNotif(self, useriden, mesgtype, mesgdata=None):
return await self.cell.addUserNotif(useriden, mesgtype, mesgdata=mesgdata)

@s_cell.adminapi()
async def getUserNotif(self, indx):
return await self.cell.getUserNotif(indx)

@s_cell.adminapi()
async def delUserNotif(self, indx):
return await self.cell.delUserNotif(indx)

@s_cell.adminapi()
async def iterUserNotifs(self, useriden, size=None):
async for item in self.cell.iterUserNotifs(useriden, size=size):
yield item

@s_cell.adminapi()
async def watchAllUserNotifs(self, offs=None):
async for item in self.cell.watchAllUserNotifs(offs=offs):
yield item

class JsonStorCell(s_cell.Cell):

cellapi = JsonStorApi
Expand All @@ -402,6 +426,13 @@ async def initServiceStorage(self):
self.onfini(self.jsonstor.fini)
self.onfini(self.multique.fini)

self.notif_abrv_user = self.slab.getNameAbrv('users')
self.notif_abrv_type = self.slab.getNameAbrv('types')

self.notifseqn = self.slab.getSeqn('notifs')
self.notif_indx_usertime = self.slab.initdb('indx:user:time', dupsort=True)
self.notif_indx_usertype = self.slab.initdb('indx:user:type', dupsort=True)

async def getPathList(self, path):
async for item in self.jsonstor.getPathList(path):
yield item
Expand Down Expand Up @@ -483,3 +514,66 @@ async def getsQueue(self, name, offs, size=None, cull=True, wait=True):
await self.cullQueue(name, offs - 1)
async for item in self.multique.gets(name, offs, size=size, wait=wait):
yield item

async def addUserNotif(self, useriden, mesgtype, mesgdata=None):
mesg = (useriden, s_common.now(), mesgtype, mesgdata)
return await self._push('notif:add', mesg)

async def getUserNotif(self, indx):
return self.notifseqn.get(indx)

@s_nexus.Pusher.onPush('notif:add', passitem=True)
async def _addUserNotif(self, mesg, nexsitem):

indx = self.notifseqn.add(mesg, indx=nexsitem[0])
indxbyts = s_common.int64en(indx)

useriden, mesgtime, mesgtype, mesgdata = mesg

userbyts = s_common.uhex(useriden)
timebyts = s_common.int64en(mesgtime)
typeabrv = self.notif_abrv_type.setBytsToAbrv(mesgtype.encode())

self.slab.put(userbyts + timebyts, indxbyts, db=self.notif_indx_usertime, dupdata=True)
self.slab.put(userbyts + typeabrv + timebyts, indxbyts, db=self.notif_indx_usertype, dupdata=True)

return indx

@s_nexus.Pusher.onPushAuto('notif:del')
async def delUserNotif(self, indx):
envl = self.notifseqn.pop(indx)
if envl is None:
return

mesg = envl[1]
useriden, mesgtime, mesgtype, mesgdata = mesg

indxbyts = s_common.int64en(indx)
userbyts = s_common.uhex(useriden)
timebyts = s_common.int64en(mesgtime)
typeabrv = self.notif_abrv_type.setBytsToAbrv(mesgtype.encode())

self.slab.delete(userbyts + timebyts, indxbyts, db=self.notif_indx_usertime)
self.slab.delete(userbyts + typeabrv + timebyts, indxbyts, db=self.notif_indx_usertype)

async def iterUserNotifs(self, useriden, size=None):
# iterate user notifications backward
userbyts = s_common.uhex(useriden)
count = 0
for _, indxbyts in self.slab.scanByPrefBack(userbyts, db=self.notif_indx_usertype):
indx = s_common.int64un(indxbyts)
mesg = self.notifseqn.getraw(indxbyts)
yield (indx, mesg)
count += 1
if size is not None and count >= size:
break

async def watchAllUserNotifs(self, offs=None):
# yield only new notifications as they arrive
if offs is None:
offs = self.notifseqn.index()
async for item in self.notifseqn.aiter(offs=offs, wait=True, timeout=120):
yield item

# async def iterUserNotifsByTime(self, useriden, ival):
# async def iterUserNotifsByType(self, useriden, mesgtype, ival=None):
5 changes: 5 additions & 0 deletions synapse/lib/slabseqn.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,11 @@ def get(self, offs):
if valu is not None:
return s_msgpack.un(valu)

def getraw(self, byts):
valu = self.slab.get(byts, db=self.db)
if valu is not None:
return s_msgpack.un(valu)

def slice(self, offs, size):

imax = size - 1
Expand Down

0 comments on commit 8e077cb

Please sign in to comment.