You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I can connect to my service hub, calling method with parameter and it will delegate the trigger from signal R hub.
But how do I invoke and get the result returned from the hub directly? I'm currently able to doing it in .NET to get the response. And I stuck on Python application.
The text was updated successfully, but these errors were encountered:
class HubServerIndexer:
"""
HubServer won't return index of message
but sometimes we need it
"""
def __init__(self, name, connection, hub):
self.name = name
self.__connection = connection
self.__hub = hub
self.wait_for_result: Dict[int, Event] = {}
def indexed_invoke(self, method, wait_for_result: bool, *data) -> int:
"""
Invoke a method on server and returns the result
"""
index = self.__connection.increment_send_counter()
if wait_for_result:
self.wait_for_result[index] = threading.Event()
self.__connection.send({"H": self.name, "M": method, "A": data, "I": index})
return index
session = Session()
connection = Connection(url, session)
connection.received += receive
hub = self._connection.register_hub(hub_name)
hub.server = HubServerIndexer(hub.name, connection, hub)
received_messages: Dict[int, Any] = {}
def receive(self, **kwargs) -> None:
if "I" not in kwargs:
return
index = int(kwargs.get("I"))
if index in hub.server.wait_for_result:
hub.server.wait_for_result.pop(index).set()
received_messages[index] = kwargs.get("R")
def invoke_and_get_result(self, method: str, *args) -> Any:
"""
Invoke a method on hub and return server response
"""
index = hub.server.indexed_invoke(method, True, *args)
event: Optional[Event] = hub.server.wait_for_result.get(index, None)
if event:
event.wait(timeout=60)
return self.received_messages.pop(index)
call invoke_and_get_result(method_name, *method_args) to get response
I can connect to my service hub, calling method with parameter and it will delegate the trigger from signal R hub.
But how do I invoke and get the result returned from the hub directly? I'm currently able to doing it in .NET to get the response. And I stuck on Python application.
The text was updated successfully, but these errors were encountered: