Skip to content

Commit

Permalink
async reindex_node with daemon (interval: 5sec.)
Browse files Browse the repository at this point in the history
  • Loading branch information
zmsdev committed Jun 20, 2024
1 parent 3e06d55 commit 7b2863d
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 20 deletions.
10 changes: 9 additions & 1 deletion Products/zms/ZMSZCatalogAdapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from Products.zms import standard
from Products.zms import content_extraction
from Products.zms import _confmanager
from Products.zms import _daemon
from Products.zms import IZMSCatalogAdapter, IZMSConfigurationProvider
from Products.zms import ZMSItem

Expand Down Expand Up @@ -168,7 +169,14 @@ def traverse(node, recursive):
# --------------------------------------------------------------------------
# ZMSZCatalogAdapter.reindex_node
# --------------------------------------------------------------------------
def reindex_node(self, node):
def reindex_node(self, node, insync=True, REQUEST=None, RESPONSE=None):
""" ZMSZCatalogAdapter.reindex_node """
if REQUEST is None:
path = self.getRefObjPath(node)
if not insync and _daemon.push(self,'reindex_node',node=path):
return
else:
node = self.getLinkObj(node)
standard.writeBlock(node, "[reindex_node]")
connectors = []
fileparsing = False
Expand Down
5 changes: 5 additions & 0 deletions Products/zms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import re
# Product Imports.
from Products.zms import _confmanager
from Products.zms import _daemon
from Products.zms import _multilangmanager
from Products.zms import _mediadb
from Products.zms import _zmsattributecontainer
Expand Down Expand Up @@ -300,6 +301,10 @@ def initialize(context):
fileobj.write('\'')
fileobj.write('};')
fileobj.close()

# start daemon
_daemon.interval = 5 # TODO from conf
_daemon.start()

except:
"""If you can't register the product, dump error.
Expand Down
52 changes: 52 additions & 0 deletions Products/zms/_daemon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
################################################################################
# _daemon.py
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
################################################################################

# Imports.
from threading import *
import requests
import time
import uuid
# Product Imports.
from Products.zms import standard

stack = []
interval = 5

def process():
while True:
while stack:
item = stack.pop()
uid, url, kwargs = item[0], item[1], item[2]
standard.writeInfo(None, '[_daemon.process.%s]: %s'%(uid, url+'?'+'&'.join(['%s=%s'%(x,str(kwargs[x])) for x in kwargs])))
response = requests.get(url, kwargs)
standard.writeInfo(None, '[_daemon.process.%s]: ==> %s'%(uid, str(response)))
time.sleep(interval)

def push(self, name, **kwargs):
uid = None
instack = bool(self.getConfProperty('%s.%s.async'%(self.meta_type,name),1))
if instack:
uid = str(uuid.uuid4())
url = '%s/%s'%(self.absolute_url(),name)
standard.writeInfo(None, '[_daemon.push.%s]: %s'%(uid, url))
stack.append((uid, url, kwargs))
return uid

def start():
thread = Thread(target=process)
thread.start()
8 changes: 2 additions & 6 deletions Products/zms/_versionmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,20 +624,16 @@ def commitObjChanges(self, parent, REQUEST, forced=False, do_history=True, do_de
t0 = time.time()
standard.writeLog( self, "[commitObjChanges]: forced=%s, do_history=%s, do_delete=%s"%(str(forced), str(do_history), str(do_delete)))
delete = self._commitObjChanges( parent, REQUEST, forced, do_history, do_delete)
# Unset Request-Flags.
request = {'ZMS_INSERT': None, 'preview': None}
[(operator.setitem(request, x, REQUEST.get(x)), REQUEST.set(x, None)) for x in request]
# Synchronize access.
self.synchronizePublicAccess()
# Synchronize search.
self.getCatalogAdapter().reindex_node(self)
# Reset Request-Flags.
[REQUEST.set(x, request.get(x)) for x in request]
self.getCatalogAdapter().reindex_node(self, insync=False)
# Return flag for deleted objects.
standard.writeLog( self, '[commitObjChanges]: done (in '+str(int((time.time()-t0)*100.0)/100.0)+' secs.)')
return delete



"""
############################################################################
#
Expand Down
44 changes: 31 additions & 13 deletions Products/zms/standard.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@

security = ModuleSecurityInfo('Products.zms.standard')

LOGGER = logging.getLogger("ZMS.standard")

security.declarePublic('pybool')
def pybool(v):
return v in [True,'true','True',1]
Expand Down Expand Up @@ -1080,14 +1082,22 @@ def getLog(context):
"""
Get zms_log.
"""
request = context.REQUEST
if 'ZMSLOG' in request:
zms_log = request.get('ZMSLOG')
else:
zms_log = getattr(context, 'zms_log', None)
if zms_log is None:
zms_log = getattr(context.getPortalMaster(), 'zms_log', None)
request.set('ZMSLOG', zms_log)
zms_log = None
if context is not None:
request = getattr(context, 'REQUEST', None)
if request and 'ZMSLOG' in request:
zms_log = request.get('ZMSLOG')
else:
zms_log = getattr(context, 'zms_log', None)
if zms_log is None:
zms_log = getattr(context.getPortalMaster(), 'zms_log', None)
if request:
request.set('ZMSLOG', zms_log)
if zms_log is None:
class DefaulLog(object):
def hasSeverity(self, severity): return True
def LOG(self, severity, message): LOGGER.log( severity, message)
zms_log = DefaulLog()
return zms_log

security.declarePublic('writeStdout')
Expand Down Expand Up @@ -1115,14 +1125,20 @@ def writeLog(context, info):
zms_log = getLog(context)
severity = logging.DEBUG
if zms_log.hasSeverity(severity):
info = "[%s@%s] "%(context.meta_id, '/'.join(context.getPhysicalPath())) + info
if context:
info = "[%s@%s] "%(context.meta_id, '/'.join(context.getPhysicalPath())) + info
zms_log.LOG( severity, info)
except:
pass
return info

security.declarePublic('writeBlock')
def writeBlock(context, info):
def writeBlock(context=None, info='?'):
return writeInfo(context, info)


security.declarePublic('writeInfo')
def writeInfo(context=None, info='?'):
"""
Log information.
@param info: Information
Expand All @@ -1135,14 +1151,15 @@ def writeBlock(context, info):
zms_log = getLog(context)
severity = logging.INFO
if zms_log.hasSeverity(severity):
info = "[%s@%s] "%(context.meta_id, '/'.join(context.getPhysicalPath())) + info
if context:
info = "[%s@%s] "%(context.meta_id, '/'.join(context.getPhysicalPath())) + info
zms_log.LOG( severity, info)
except:
pass
return info

security.declarePublic('writeError')
def writeError(context, info):
def writeError(context=None, info='?'):
"""
Log error.
@param info: Information
Expand All @@ -1154,7 +1171,8 @@ def writeError(context, info):
info = info.decode('utf-8')
info += '\n'.join(traceback.format_tb(tb))
try:
info = "[%s@%s] "%(context.meta_id, '/'.join(context.getPhysicalPath())) + info
if context:
info = "[%s@%s] "%(context.meta_id, '/'.join(context.getPhysicalPath())) + info
zms_log = getLog(context)
severity = logging.ERROR
if zms_log.hasSeverity(severity):
Expand Down

0 comments on commit 7b2863d

Please sign in to comment.