Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remake input data placement upon site list changes #12040

Open
amaltaro opened this issue Jul 11, 2024 · 9 comments · May be fixed by #12155
Open

Remake input data placement upon site list changes #12040

amaltaro opened this issue Jul 11, 2024 · 9 comments · May be fixed by #12155

Comments

@amaltaro
Copy link
Contributor

Impact of the new feature
MSTransferor

Is your feature request related to a problem? Please describe.
This is a sub-task of this meta-issue: #8323
Towards providing a feature that allows workflow site lists to be changed while workflows are active in the system.

Describe the solution you'd like
Whenever the site lists of an active workflow changes, we need to revisit the input data placement rules - if any - and update their RSE expressions accordingly. That will involve at least the following:

  1. accessing the workflow transfer document to find out the rule ids
  2. if a site was added to the SiteBlacklist, that site needs to be removed from the rule ids RSE expression
  3. if a site was added to the SiteWhitelist, that site needs to be added to the rule ids RSE expression
  4. the new rule ids need to be persisted in the transfer document, replacing those rules that are no longer valid.

This can be implemented either:
a) synchronous with the status transition, but it might make the client HTTP request just too expensive (ReqMgr2 + Globalqueue + Rucio change all in a single client call)
b) asynchronous, but then we need a mechanism to flag/identify workflows that need data re-placement.

Describe alternatives you've considered
There are a few ways to implement this feature, like:
a) trigger a new data placement with the new site lists
b) once the new rule creation is sucessful, we could delete the previous rule superseded
c) similar to a), but instead of making a new rule, we could consume the rule ids already persisted in the database (via MSTransferor/MSMonitor) and update their RSE expression accordingly.

Hasan made the following observation for case c): "You cannot update (update-rule) the rse expression of a rule and keep the same rule id. You can change the rse expression by "moving" (move-rule) a rule which creates a new rule."

Additional context
NOTE that the best would be to let MSTransferor take care of this data re-placement, such that it considers everything (campaign configuration, pileup configuration, rse quota, etc).

@amaltaro
Copy link
Contributor Author

PS: based on implementation details, we might have to break this down into 2 or 3 issues.

@todor-ivanov
Copy link
Contributor

@amaltaro as a logical continuation of the effort on delivering the dynamic SiteLists changes, I decided it would be good to take this issue as well and start looking into/thinking about the eventual logic while we go through the review process of #12099 . Please let me know if you think this was not a good idea and I will step down .....

@amaltaro
Copy link
Contributor Author

amaltaro commented Sep 23, 2024

@todor-ivanov there are a couple of decisions that are still not clear to me on this development, they are:

  • where to make the actual data placement (I would definitely be inclined to let MSTransferor deal with this)
  • and how do we notify that a given workflow needs to go through this data placement process once again.

I wonder what your thoughts are on this?

UPDATE: just to add, we have a very similar challenge with this ticket #12039, where we are also considering an external messaging queue (NATS).

@todor-ivanov
Copy link
Contributor

hi @amaltaro

Indeed, those are really important questions. on:

  • where to make the actual data placement (I would definitely be inclined to let MSTransferor deal with this)

I can 100% agree this is the place. No need for yet another service to deal with this.

and how do we notify that a given workflow needs to go through this data placement process once again.

This is tricky. But.... lets think logically. We have basically two options:

  • Regular update on a workflow (respectively dataset) change of input placement conditions and react on this basis....
  • Start again from the user action where we have the initial moment of change of those conditions.

NOTE: By change of conditions here I refer to the change of input data location based on the update of the SiteWhitelist and SiteVBlckalists at the workflow level.

Now thinking in the perspective of those both options, I'd definitely be in favor of a single action triggered on a user request. And this is a moment we already know in time. While following the path of regular updates to me would lead early or late to spawning yet another service. (On which I already expressed my skepticism).

So following this line of thoughts, I'd suggest we develop a dedicated API call in the MSTransferor microservice, through which we should trigger the proper transfer rules update based on the user request for update of SiteLists at the workflow level, and the rest of the code to be placed at MSTransferor. This would mean of course we would have to rethink (eventually) the transfer rule retention policy, in case it turns out we do not keep the information long enough. So if this is about to lead to a change in the concept of the MStransferor or change/adding of a different backend databases etc....(I mean we are not using Mongo in MStransferor which would be the perfect database for this) well.... again I am ok with that too... but may cost some more work.... So if we are not ready to pay the price of a conceptual change in the MSTransferror (or even better if we find a way not to do so) we must think of all those things in the perspective of the current design of MStransferror and add any functionality or workaround there as needed. (Honestly I hope the current design is already flexible enough such that we avoid similar drastic changes)

@todor-ivanov
Copy link
Contributor

Just to add here:

There is some minor difference in what we need to do here with respect to what needs to be done in: #12039

And the difference is simple - we do not need to preserve previous state because at this stage everything is still retained at Central services and we also know the exact moment of a change of Sitelist and also if there was an actual change or it was just the old one. So we can still develop a mechanism based on a set of REST calls between the central services and trigger the proper actions. In contrast to how we propagate the information between the agents and central services - where everything is done mostly through CouchDB, which implies the necessity of the knowledge and the ability to compare previous to current states.

@vkuznet
Copy link
Contributor

vkuznet commented Oct 15, 2024

I spoke with Todor via MM channel and will post hist suggestion here. @amaltaro at this moment I need your feedback on the following questions before I'll assign this issue to myself and work on it:

  • in issue description you listed two solutions (a) synchronous with the status transition and (b) asynchronous one, which one should be implemented? I saw in discussion threads here that NATS was mentioned but my conclusion from reading all details is that we prefer (a) implementation, right?
  • Please review Todor's proposal which he sent to me via MM and let me know if you agree with it or not
  • Todor mentioned that in order to proceed with this issue we need dmwm/WMCore#12099, please confirm and provide estimate when it will be merged that I can proceed.

Todor's proposal:

  • we know the exact moment when a change to the sitelists is made through this method here:
    def _handleNoStatusUpdate(self, workload, request_args, dn):
    """
    For no-status update, we only support the following parameters:
    1. RequestPriority
    2. SiteWhitelist
    3. SiteBlacklist
    4. Global workqueue statistics, while acquiring a workflow
    As Global workqueue statistics updates are exclusive to the rest of the
    parameters. Meaning if it is about to be updated all the rest of the
    request_args will be ignored.
    """
    reqArgs = deepcopy(request_args)
    if not reqArgs:
    cherrypy.log("Nothing to be changed at this stage")
    return 'OK'
    if workqueue_stat_validation(reqArgs):
    report = self.reqmgr_db_service.updateRequestStats(workload.name(), reqArgs)
    cherrypy.log('Updated workqueue statistics of "{}", with: {}'.format(workload.name(), reqArgs))
    return report
    # Update all workload parameters based on the full reqArgs dictionary
    workload.updateWorkloadArgs(reqArgs)
    # Commit the changes of the current workload object to the database:
    workload.saveCouchUrl(workload.specUrl())
    # Commit the changes of the current workload object to the database:
    workload.saveCouchUrl(workload.specUrl())
    # Commit all Global WorkQueue changes per workflow in a single go:
    self.gq_service.updateElementsByWorkflow(workload.name(), reqArgs, status=['Available', 'Negotiating', 'Acquired'])
    # Finally update ReqMgr Database
    report = self.reqmgr_db_service.updateRequestProperty(workload.name(), reqArgs, dn)
    return report
  • The HTTP calls which have reached this methods has been previously validated ... so you do not need to worry about that part.
  • What needs to happen is at this very same method to add a line at the bottom (upon workload and workqueue elements update) to call MSTransferror and ask it to trigger yet again the whole data transfer mechanism for the given workflow with the new site lists (no change of the MStransferor algorythm of how to manage sitelists is needed): basically we need to aks MSTransferror to repeat this sequence of steps :
    # execute data discovery
    reqResults = self.reqInfo(reqSlice, self.pileupDocs)
    self.logger.info("%d requests information completely processed.", len(reqResults))
    for wflow in reqResults:
    if not self.verifyCampaignExist(wflow):
    counterProblematicRequests += 1
    continue
    if not self.passSecondaryCheck(wflow):
    self.alertPUMisconfig(wflow.getName())
    counterProblematicRequests += 1
    continue
    # find accepted RSEs for the workflow
    rseList = self.getAcceptedRSEs(wflow)
    # now check where input primary and parent blocks will need to go
    self.checkDataLocation(wflow, rseList)
    try:
    success, transfers = self.makeTransferRequest(wflow, rseList)
    except Exception as ex:
    success = False
    self.alertUnknownTransferError(wflow.getName())
    msg = "Unknown exception while making transfer request for %s " % wflow.getName()
    msg = "\tError: %s" % str(ex)
    self.logger.exception(msg)
    if success:
    # then create a document in ReqMgr Aux DB
    self.logger.info("Transfer requests successful for %s. Summary: %s",
    wflow.getName(), pformat(transfers))
    if self.createTransferDoc(wflow.getName(), transfers):
    self.logger.info("Transfer document successfully created in CouchDB for: %s", wflow.getName())
    # then move this request to staging status
    self.change(wflow.getName(), 'staging', self.__class__.__name__)
    counterSuccessRequests += 1
    else:
    counterFailedRequests += 1
    self.alertTransferCouchDBError(wflow.getName())
    else:
    counterFailedRequests += 1
  • But the problem with all our micro services is they firstly fetch a set of workflows in a given status ... .and then they work on this list. What needs to happen in this case is - you'll need to develop a short wrapper method to trigger this algorithm there given only a single workflow name as an argument - which shouldn't be more than few lines
  • We need also the proper REST interface which should refer to this wrapper method, similar to: https://cmsweb-testbed.cern.ch/ms-transferor/data/info?request=tivanov_ReReco_Parents_Feb2024_Val_v2_240215_150900_8733.... Here is an example of how should look like one such method which is to be called from such a REST API (taken from MSUnmerged though):
    def getStatsFromMongoDB(self, detail=False, **kwargs):
    """
    Auxiliary method to serve the APIs of the REST calls for the service.
    Implements various queries to MongoDB based on the parameters passed.
    :param detail: Bool marking if additional details must be queried from the database
    (e.g. putting the 'dirs' field into the mongo projection)
    :param rse=rseName: String representing the RSE name to query the database for
    :return: Dictionary with the type of query + the data returned from the database
    """
    data = {}
    if kwargs.get('rse'):
    data["query"] = 'rse=%s&detail=%s' % (kwargs['rse'], detail)
    allDocs = (kwargs['rse'].lower() == "all_docs") if isinstance(kwargs['rse'], str) else False
    if allDocs:
    mongoProjection = {
    "_id": False,
    "name": True,
    "isClean": True,
    "rucioConMonStatus": True,
    "timestamps": True,
    "counters": {
    "gfalErrors": True,
    "dirsToDelete": True,
    "dirsDeletedSuccess": True,
    "dirsDeletedFail": True}}
    mongoFilter = {}
    data["rseData"] = list(self.msUnmergedColl.find(mongoFilter, projection=mongoProjection))
    else:
    mongoProjection = {
    "_id": False,
    "name": True,
    "isClean": True,
    "pfnPrefix": True,
    "rucioConMonStatus": True,
    "timestamps": True,
    "counters": True}
    if detail:
    mongoProjection["dirs"] = True
    rseList = kwargs['rse'] if isinstance(kwargs['rse'], list) else [kwargs['rse']]
    data["rseData"] = []
    for rseName in rseList:
    mongoFilter = {'name': rseName}
    data["rseData"].append(self.msUnmergedColl.find_one(mongoFilter, projection=mongoProjection))
    # Rewrite all timestamps in ISO 8601 format
    for rse in data['rseData']:
    if 'timestamps' in rse:
    for dateField, dateValue in rse['timestamps'].items():
    dateValue = datetime.utcfromtimestamp(dateValue)
    dateValue = dateValue.isoformat()
    rse['timestamps'][dateField] = dateValue
    return data
  • And as I said in the very first bullet above - one just needs to make this call at the last line of _handleNoStatusUpdate (with the workflow name in it):

But in order to avoid code conflicts we must converge with Alan on: dmwm/WMCore#12099

@vkuznet vkuznet self-assigned this Oct 21, 2024
@amaltaro
Copy link
Contributor Author

Hi @vkuznet

in issue description you listed two solutions (a) synchronous with the status transition and (b) asynchronous one, which one should be implemented? I saw in discussion threads here that NATS was mentioned but my conclusion from reading all details is that we prefer (a) implementation, right?

As recently discussed, NATS is out of the game.
My suggestion is to implement a hybrid solution here, which will rely in a new REST endpoint for MSTransferor. What I project is:
a) ReqMgr2 will be handling a site lists update request
b) it will then make a MSTransferor call to this new REST endpoint (synchronous to the change in ReqMgr2)
c) MSTransferor will acknowledge this request and ReqMgr2 will move on (non-blocking call)
d) MSTransferor will then fetch the up-to-date workflow spec from ReqMgr2 and execute again the input data placement logic

With this approach, we need to be mindful with concurrency between updating the spec in ReqMgr2 versus fetching an up-to-date spec in MSTransferor. If we have that covered, then the REST endpoint can be "static", where we simply provide the workflow name that we want to trigger a data (re-)placement.

An alternative to that would be to have an endpoint that would receive 3 parameters: workflow name, site whitelist, site blacklist.

Todor mentioned that in order to proceed with this issue we need #12099, please confirm and provide estimate when it will be merged that I can proceed.

Yes, it is correct that this feature depends on ReqMgr2. However, most of the implementation of this feature relies solely on MSTransferor codebase, so there is no need to wait for anything and developments can be done concurrently.
Once we have MSTransferor feature covered, ReqMgr2 change will be a matter of 1 or 2 lines of code.

To summarize, we can start working on the MSTransferor code right away, development-level validation can also be performed without any changes in ReqMgr2. Once we are happy with that, we can look on integrating this feature to ReqMgr2 (still within this same ticket).

@vkuznet
Copy link
Contributor

vkuznet commented Oct 21, 2024

@amaltaro , if I understand you correctly the following changes should be made outside ReqMgr2:

  • change MSManager.py to include transferor handler to POST API, i.e. such that ReqMgr will have POST API to call to
  • we need to decide what name assign to this end-point, e.g. HTTP POST /data/updateSiteList, is it appropriate?
  • we need to decide which parameters ReqMgr2 will send via HTTP POST request, i.e. decide on payload data-structure
  • we need to decide how this end-point will behave
    • it will either synchronously process this request, i.e. make necessary chages, or
    • it will process this request asynchronously, i.e. it will reply to ReqMgr2 with some response (must be defined) and persist provided data somewhere (where?). Once data is preserved the internal daemon (def execute method will process it somehow.

Please provide these details to proceed.

@amaltaro
Copy link
Contributor Author

we need to decide what name assign to this end-point, e.g. HTTP POST /data/updateSiteList, is it appropriate?

Perhaps update_sites, execute_transfer or something like this.

we need to decide which parameters ReqMgr2 will send via HTTP POST request, i.e. decide on payload data-structure

I would be in favor of providing solely the workflow name. But it really depends on how we can re-trigger the data placement, which is in your next question.

we need to decide how this end-point will behave

I would be in favor of having a model similar to the data migration in DBS. Where the workflow would be:
a) ReqMgr2 makes an HTTP request to MSTransferor
b) MSTransferor acknowledges it and persists this information somewhere
c) MSTransferor sends back an HTTP response to ReqMgr2
d) ReqMgr2 moves over with the user request
e) lastly, MSTransferor asynchronously re-execute the data placement for that given workflow.

The reason I prefer an asynchronous data placement is that this call can take up to many seconds, as we have to re-execute the data discovery (what are the final blocks that need to be transferred) for the workflow in question.

Another alternative for this data discovery, would be to fetch the Rucio rule ids for the previous data placement, grab the DIDs in those rules, and use those for the next data placement. However, I fear that we could have blocks being invalidated during the lifetime of a workflow, which would lead Rucio to return a DIDNotFound exception.

@vkuznet vkuznet linked a pull request Oct 22, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: In Progress
Development

Successfully merging a pull request may close this issue.

3 participants