Skip to content

Commit

Permalink
fix callback have no page context
Browse files Browse the repository at this point in the history
  • Loading branch information
wang0618 committed Jul 6, 2022
1 parent 0834386 commit 0b9ace1
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 12 deletions.
4 changes: 2 additions & 2 deletions pywebio/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -1867,12 +1867,12 @@ def __call__(self, func):

@wraps(func)
def wrapper(*args, **kwargs):
with self:
with page_(): # can't use `with self:`, it will use same object in different calls to same decorated func
return func(*args, **kwargs)

@wraps(func)
async def coro_wrapper(*args, **kwargs):
with self:
with page_():
return await func(*args, **kwargs)

if iscoroutinefunction(func):
Expand Down
5 changes: 3 additions & 2 deletions pywebio/session/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,10 @@ def pop_page(self):
pass
return page_id

def push_page(self, page_id):
def push_page(self, page_id, task_id=None):
self.push_scope(ROOT_SCOPE)
task_id = type(self).get_current_task_id()
if task_id is None:
task_id = type(self).get_current_task_id()
self.page_stack[task_id].append(page_id)
self.active_page[task_id].add(page_id)

Expand Down
10 changes: 8 additions & 2 deletions pywebio/session/coroutinebased.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def register_callback(self, callback, mutex_mode=False):
:param bool mutex_mode: 互斥模式。若为 ``True`` ,则在运行回调函数过程中,无法响应同一组件(callback_id相同)的新点击事件,仅当 ``callback`` 为协程函数时有效
:return str: 回调id.
"""
page_id = self.get_page_id()

async def callback_coro():
while True:
Expand Down Expand Up @@ -204,7 +205,7 @@ async def callback_coro():
if mutex_mode:
await coro
else:
self.run_async(coro)
self._run_async(coro, page_id=page_id)

cls = type(self)
callback_task = Task(callback_coro(), cls.get_current_session())
Expand All @@ -224,12 +225,17 @@ def run_async(self, coro_obj):
:param coro_obj: 协程对象
:return: An instance of `TaskHandler` is returned, which can be used later to close the task.
"""
return self._run_async(coro_obj)

def _run_async(self, coro_obj, page_id=None):
assert asyncio.iscoroutine(coro_obj), '`run_async()` only accept coroutine object'
if page_id is None:
page_id = self.get_page_id()

self._alive_coro_cnt += 1

task = Task(coro_obj, session=self, on_coro_stop=self._on_task_finish)
self.coros[task.coro_id] = task
self.push_page(page_id, task_id=task.coro_id)
asyncio.get_event_loop().call_soon_threadsafe(task.step)
return task.task_handle()

Expand Down
12 changes: 9 additions & 3 deletions pywebio/session/threadbased.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def _dispatch_callback_event(self):
if not callback_info:
logger.error("No callback for callback_id:%s", event['task_id'])
return
callback, mutex = callback_info
callback, mutex, page_id = callback_info

@wraps(callback)
def run(callback):
Expand All @@ -270,7 +270,7 @@ def run(callback):
else:
t = threading.Thread(target=run, kwargs=dict(callback=callback),
daemon=True)
self.register_thread(t)
self._register_thread(t, page_id)
t.start()

def register_callback(self, callback, serial_mode=False):
Expand All @@ -285,7 +285,7 @@ def register_callback(self, callback, serial_mode=False):

self._activate_callback_env()
callback_id = 'CB-%s-%s' % (get_function_name(callback, 'callback'), random_str(10))
self.callbacks[callback_id] = (callback, serial_mode)
self.callbacks[callback_id] = (callback, serial_mode, self.get_page_id())
return callback_id

def register_thread(self, t: threading.Thread):
Expand All @@ -294,10 +294,16 @@ def register_thread(self, t: threading.Thread):
:param threading.Thread thread: 线程对象
"""
return self._register_thread(t)

def _register_thread(self, t: threading.Thread, page_id=None):
if page_id is None:
page_id = self.get_page_id()
self.threads.append(t) # 保存 registered thread,用于主任务线程退出后等待注册线程结束
self.thread2session[id(t)] = self # 用于在线程内获取会话
event_mq = queue.Queue(maxsize=self.event_mq_maxsize) # 线程内的用户事件队列
self.task_mqs[self._get_task_id(t)] = event_mq
self.push_page(page_id, task_id=self._get_task_id(t))

def need_keep_alive(self) -> bool:
# if callback thread is activated, then the session need to keep alive
Expand Down
3 changes: 2 additions & 1 deletion webiojs/src/handlers/base.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {Command, Session} from "../session";
import {DeliverMessage} from "../models/page";
import {PAGE_COMMANDS} from "./page";


export interface CommandHandler {
Expand Down Expand Up @@ -36,7 +37,7 @@ export class CommandDispatcher {
}

dispatch_message(msg: Command): boolean {
if (msg.page !== undefined && msg.page) {
if (msg.page !== undefined && msg.page && PAGE_COMMANDS.indexOf(msg.command) == -1) {
DeliverMessage(msg);
} else if (msg.command in this.command2handler) {
this.command2handler[msg.command].handle_message(msg);
Expand Down
4 changes: 3 additions & 1 deletion webiojs/src/handlers/page.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ import {Command} from "../session";
import {CommandHandler} from "./base";
import {ClosePage, OpenPage} from "../models/page";

export const PAGE_COMMANDS = ['open_page', 'close_page']

export class PageHandler implements CommandHandler {
accept_command: string[] = ['open_page', 'close_page'];
accept_command: string[] = PAGE_COMMANDS;

constructor() {
}
Expand Down
3 changes: 2 additions & 1 deletion webiojs/src/models/page.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ export function OpenPage(page_id: string, task_id: string) {
}

export function ClosePage(page_id: string) {
if (!(page_id in subpages))
if (!(page_id in subpages)) {
throw `Can't close page, the page (id "${page_id}") is not found`;
}
subpages[page_id].page.close();
delete subpages[page_id];
}
Expand Down

0 comments on commit 0b9ace1

Please sign in to comment.