Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous Reindexing on Object-Commit #289

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion Products/zms/ZMSZCatalogAdapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from Products.zms import standard
from Products.zms import content_extraction
from Products.zms import _confmanager
from Products.zms import _daemon
from Products.zms import IZMSCatalogAdapter, IZMSConfigurationProvider
from Products.zms import ZMSItem

Expand Down Expand Up @@ -168,7 +169,14 @@ def traverse(node, recursive):
# --------------------------------------------------------------------------
# ZMSZCatalogAdapter.reindex_node
# --------------------------------------------------------------------------
def reindex_node(self, node):
def reindex_node(self, node, insync=True, REQUEST=None, RESPONSE=None):
""" ZMSZCatalogAdapter.reindex_node """
if REQUEST is None:
path = self.getRefObjPath(node)
if not insync and _daemon.push(self,'reindex_node',node=path):
return
else:
node = self.getLinkObj(node)
standard.writeBlock(node, "[reindex_node]")
connectors = []
fileparsing = False
Expand Down
5 changes: 5 additions & 0 deletions Products/zms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import re
# Product Imports.
from Products.zms import _confmanager
from Products.zms import _daemon
from Products.zms import _multilangmanager
from Products.zms import _mediadb
from Products.zms import _zmsattributecontainer
Expand Down Expand Up @@ -300,6 +301,10 @@ def initialize(context):
fileobj.write('\'')
fileobj.write('};')
fileobj.close()

# start daemon
_daemon.interval = 5 # TODO from conf
_daemon.start()

except:
"""If you can't register the product, dump error.
Expand Down
52 changes: 52 additions & 0 deletions Products/zms/_daemon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
################################################################################
# _daemon.py
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
################################################################################

# Imports.
from threading import *
import requests
import time
import uuid
# Product Imports.
from Products.zms import standard

stack = []
interval = 5

def process():
while True:
while stack:
item = stack.pop()
uid, url, kwargs = item[0], item[1], item[2]
standard.writeInfo(None, '[_daemon.process.%s]: %s'%(uid, url+'?'+'&'.join(['%s=%s'%(x,str(kwargs[x])) for x in kwargs])))
response = requests.get(url, kwargs)
standard.writeInfo(None, '[_daemon.process.%s]: ==> %s'%(uid, str(response)))
time.sleep(interval)

def push(self, name, **kwargs):
uid = None
instack = bool(self.getConfProperty('%s.%s.async'%(self.meta_type,name),1))
if instack:
uid = str(uuid.uuid4())
url = '%s/%s'%(self.absolute_url(),name)
standard.writeInfo(None, '[_daemon.push.%s]: %s'%(uid, url))
stack.append((uid, url, kwargs))
return uid

def start():
thread = Thread(target=process)
thread.start()
8 changes: 2 additions & 6 deletions Products/zms/_versionmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,20 +624,16 @@ def commitObjChanges(self, parent, REQUEST, forced=False, do_history=True, do_de
t0 = time.time()
standard.writeLog( self, "[commitObjChanges]: forced=%s, do_history=%s, do_delete=%s"%(str(forced), str(do_history), str(do_delete)))
delete = self._commitObjChanges( parent, REQUEST, forced, do_history, do_delete)
# Unset Request-Flags.
request = {'ZMS_INSERT': None, 'preview': None}
[(operator.setitem(request, x, REQUEST.get(x)), REQUEST.set(x, None)) for x in request]
# Synchronize access.
self.synchronizePublicAccess()
# Synchronize search.
self.getCatalogAdapter().reindex_node(self)
# Reset Request-Flags.
[REQUEST.set(x, request.get(x)) for x in request]
self.getCatalogAdapter().reindex_node(self, insync=False)
# Return flag for deleted objects.
standard.writeLog( self, '[commitObjChanges]: done (in '+str(int((time.time()-t0)*100.0)/100.0)+' secs.)')
return delete



"""
############################################################################
#
Expand Down
44 changes: 31 additions & 13 deletions Products/zms/standard.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@

security = ModuleSecurityInfo('Products.zms.standard')

LOGGER = logging.getLogger("ZMS.standard")

security.declarePublic('pybool')
def pybool(v):
return v in [True,'true','True',1]
Expand Down Expand Up @@ -1080,14 +1082,22 @@ def getLog(context):
"""
Get zms_log.
"""
request = context.REQUEST
if 'ZMSLOG' in request:
zms_log = request.get('ZMSLOG')
else:
zms_log = getattr(context, 'zms_log', None)
if zms_log is None:
zms_log = getattr(context.getPortalMaster(), 'zms_log', None)
request.set('ZMSLOG', zms_log)
zms_log = None
if context is not None:
request = getattr(context, 'REQUEST', None)
if request and 'ZMSLOG' in request:
zms_log = request.get('ZMSLOG')
else:
zms_log = getattr(context, 'zms_log', None)
if zms_log is None:
zms_log = getattr(context.getPortalMaster(), 'zms_log', None)
if request:
request.set('ZMSLOG', zms_log)
if zms_log is None:
class DefaulLog(object):
def hasSeverity(self, severity): return True
def LOG(self, severity, message): LOGGER.log( severity, message)
zms_log = DefaulLog()
return zms_log

security.declarePublic('writeStdout')
Expand Down Expand Up @@ -1115,14 +1125,20 @@ def writeLog(context, info):
zms_log = getLog(context)
severity = logging.DEBUG
if zms_log.hasSeverity(severity):
info = "[%s@%s] "%(context.meta_id, '/'.join(context.getPhysicalPath())) + info
if context:
info = "[%s@%s] "%(context.meta_id, '/'.join(context.getPhysicalPath())) + info
zms_log.LOG( severity, info)
except:
pass
return info

security.declarePublic('writeBlock')
def writeBlock(context, info):
def writeBlock(context=None, info='?'):
return writeInfo(context, info)


security.declarePublic('writeInfo')
def writeInfo(context=None, info='?'):
"""
Log information.
@param info: Information
Expand All @@ -1135,14 +1151,15 @@ def writeBlock(context, info):
zms_log = getLog(context)
severity = logging.INFO
if zms_log.hasSeverity(severity):
info = "[%s@%s] "%(context.meta_id, '/'.join(context.getPhysicalPath())) + info
if context:
info = "[%s@%s] "%(context.meta_id, '/'.join(context.getPhysicalPath())) + info
zms_log.LOG( severity, info)
except:
pass
return info

security.declarePublic('writeError')
def writeError(context, info):
def writeError(context=None, info='?'):
"""
Log error.
@param info: Information
Expand All @@ -1154,7 +1171,8 @@ def writeError(context, info):
info = info.decode('utf-8')
info += '\n'.join(traceback.format_tb(tb))
try:
info = "[%s@%s] "%(context.meta_id, '/'.join(context.getPhysicalPath())) + info
if context:
info = "[%s@%s] "%(context.meta_id, '/'.join(context.getPhysicalPath())) + info
zms_log = getLog(context)
severity = logging.ERROR
if zms_log.hasSeverity(severity):
Expand Down
Loading