From 7b2863db0af52a37715089e29fbda7ddf6267213 Mon Sep 17 00:00:00 2001 From: zmsdev Date: Wed, 19 Jun 2024 08:54:15 +0200 Subject: [PATCH] async reindex_node with daemon (interval: 5sec.) --- Products/zms/ZMSZCatalogAdapter.py | 10 +++++- Products/zms/__init__.py | 5 +++ Products/zms/_daemon.py | 52 ++++++++++++++++++++++++++++++ Products/zms/_versionmanager.py | 8 ++--- Products/zms/standard.py | 44 +++++++++++++++++-------- 5 files changed, 99 insertions(+), 20 deletions(-) create mode 100644 Products/zms/_daemon.py diff --git a/Products/zms/ZMSZCatalogAdapter.py b/Products/zms/ZMSZCatalogAdapter.py index 921eb5660..46486566c 100644 --- a/Products/zms/ZMSZCatalogAdapter.py +++ b/Products/zms/ZMSZCatalogAdapter.py @@ -29,6 +29,7 @@ from Products.zms import standard from Products.zms import content_extraction from Products.zms import _confmanager +from Products.zms import _daemon from Products.zms import IZMSCatalogAdapter, IZMSConfigurationProvider from Products.zms import ZMSItem @@ -168,7 +169,14 @@ def traverse(node, recursive): # -------------------------------------------------------------------------- # ZMSZCatalogAdapter.reindex_node # -------------------------------------------------------------------------- - def reindex_node(self, node): + def reindex_node(self, node, insync=True, REQUEST=None, RESPONSE=None): + """ ZMSZCatalogAdapter.reindex_node """ + if REQUEST is None: + path = self.getRefObjPath(node) + if not insync and _daemon.push(self,'reindex_node',node=path): + return + else: + node = self.getLinkObj(node) standard.writeBlock(node, "[reindex_node]") connectors = [] fileparsing = False diff --git a/Products/zms/__init__.py b/Products/zms/__init__.py index 94533f75e..28f514119 100644 --- a/Products/zms/__init__.py +++ b/Products/zms/__init__.py @@ -25,6 +25,7 @@ import re # Product Imports. from Products.zms import _confmanager +from Products.zms import _daemon from Products.zms import _multilangmanager from Products.zms import _mediadb from Products.zms import _zmsattributecontainer @@ -300,6 +301,10 @@ def initialize(context): fileobj.write('\'') fileobj.write('};') fileobj.close() + + # start daemon + _daemon.interval = 5 # TODO from conf + _daemon.start() except: """If you can't register the product, dump error. diff --git a/Products/zms/_daemon.py b/Products/zms/_daemon.py new file mode 100644 index 000000000..c46e95073 --- /dev/null +++ b/Products/zms/_daemon.py @@ -0,0 +1,52 @@ +################################################################################ +# _daemon.py +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +################################################################################ + +# Imports. +from threading import * +import requests +import time +import uuid +# Product Imports. +from Products.zms import standard + +stack = [] +interval = 5 + +def process(): + while True: + while stack: + item = stack.pop() + uid, url, kwargs = item[0], item[1], item[2] + standard.writeInfo(None, '[_daemon.process.%s]: %s'%(uid, url+'?'+'&'.join(['%s=%s'%(x,str(kwargs[x])) for x in kwargs]))) + response = requests.get(url, kwargs) + standard.writeInfo(None, '[_daemon.process.%s]: ==> %s'%(uid, str(response))) + time.sleep(interval) + +def push(self, name, **kwargs): + uid = None + instack = bool(self.getConfProperty('%s.%s.async'%(self.meta_type,name),1)) + if instack: + uid = str(uuid.uuid4()) + url = '%s/%s'%(self.absolute_url(),name) + standard.writeInfo(None, '[_daemon.push.%s]: %s'%(uid, url)) + stack.append((uid, url, kwargs)) + return uid + +def start(): + thread = Thread(target=process) + thread.start() diff --git a/Products/zms/_versionmanager.py b/Products/zms/_versionmanager.py index 00f55675d..c71a8ad8e 100644 --- a/Products/zms/_versionmanager.py +++ b/Products/zms/_versionmanager.py @@ -624,20 +624,16 @@ def commitObjChanges(self, parent, REQUEST, forced=False, do_history=True, do_de t0 = time.time() standard.writeLog( self, "[commitObjChanges]: forced=%s, do_history=%s, do_delete=%s"%(str(forced), str(do_history), str(do_delete))) delete = self._commitObjChanges( parent, REQUEST, forced, do_history, do_delete) - # Unset Request-Flags. - request = {'ZMS_INSERT': None, 'preview': None} - [(operator.setitem(request, x, REQUEST.get(x)), REQUEST.set(x, None)) for x in request] # Synchronize access. self.synchronizePublicAccess() # Synchronize search. - self.getCatalogAdapter().reindex_node(self) - # Reset Request-Flags. - [REQUEST.set(x, request.get(x)) for x in request] + self.getCatalogAdapter().reindex_node(self, insync=False) # Return flag for deleted objects. standard.writeLog( self, '[commitObjChanges]: done (in '+str(int((time.time()-t0)*100.0)/100.0)+' secs.)') return delete + """ ############################################################################ # diff --git a/Products/zms/standard.py b/Products/zms/standard.py index 87b0650c1..6772b455f 100644 --- a/Products/zms/standard.py +++ b/Products/zms/standard.py @@ -56,6 +56,8 @@ security = ModuleSecurityInfo('Products.zms.standard') +LOGGER = logging.getLogger("ZMS.standard") + security.declarePublic('pybool') def pybool(v): return v in [True,'true','True',1] @@ -1080,14 +1082,22 @@ def getLog(context): """ Get zms_log. """ - request = context.REQUEST - if 'ZMSLOG' in request: - zms_log = request.get('ZMSLOG') - else: - zms_log = getattr(context, 'zms_log', None) - if zms_log is None: - zms_log = getattr(context.getPortalMaster(), 'zms_log', None) - request.set('ZMSLOG', zms_log) + zms_log = None + if context is not None: + request = getattr(context, 'REQUEST', None) + if request and 'ZMSLOG' in request: + zms_log = request.get('ZMSLOG') + else: + zms_log = getattr(context, 'zms_log', None) + if zms_log is None: + zms_log = getattr(context.getPortalMaster(), 'zms_log', None) + if request: + request.set('ZMSLOG', zms_log) + if zms_log is None: + class DefaulLog(object): + def hasSeverity(self, severity): return True + def LOG(self, severity, message): LOGGER.log( severity, message) + zms_log = DefaulLog() return zms_log security.declarePublic('writeStdout') @@ -1115,14 +1125,20 @@ def writeLog(context, info): zms_log = getLog(context) severity = logging.DEBUG if zms_log.hasSeverity(severity): - info = "[%s@%s] "%(context.meta_id, '/'.join(context.getPhysicalPath())) + info + if context: + info = "[%s@%s] "%(context.meta_id, '/'.join(context.getPhysicalPath())) + info zms_log.LOG( severity, info) except: pass return info security.declarePublic('writeBlock') -def writeBlock(context, info): +def writeBlock(context=None, info='?'): + return writeInfo(context, info) + + +security.declarePublic('writeInfo') +def writeInfo(context=None, info='?'): """ Log information. @param info: Information @@ -1135,14 +1151,15 @@ def writeBlock(context, info): zms_log = getLog(context) severity = logging.INFO if zms_log.hasSeverity(severity): - info = "[%s@%s] "%(context.meta_id, '/'.join(context.getPhysicalPath())) + info + if context: + info = "[%s@%s] "%(context.meta_id, '/'.join(context.getPhysicalPath())) + info zms_log.LOG( severity, info) except: pass return info security.declarePublic('writeError') -def writeError(context, info): +def writeError(context=None, info='?'): """ Log error. @param info: Information @@ -1154,7 +1171,8 @@ def writeError(context, info): info = info.decode('utf-8') info += '\n'.join(traceback.format_tb(tb)) try: - info = "[%s@%s] "%(context.meta_id, '/'.join(context.getPhysicalPath())) + info + if context: + info = "[%s@%s] "%(context.meta_id, '/'.join(context.getPhysicalPath())) + info zms_log = getLog(context) severity = logging.ERROR if zms_log.hasSeverity(severity):