Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

syscall_server: fix epoll_wait implementation #101

Merged
merged 4 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 84 additions & 90 deletions .github/script/run_example.py
Original file line number Diff line number Diff line change
@@ -1,110 +1,104 @@
import sys
import subprocess
import datetime
import select
from io import StringIO
import asyncio
import typing
import signal
import fcntl
import os
import time

SERVER_TIMEOUT = 30
AGENT_TIMEOUT = 30
SERVER_START_SIGNAL = "bpftime-syscall-server started"


def set_non_blocking(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
async def handle_stdout(
stdout: asyncio.StreamReader,
notify: asyncio.Event,
title: str,
callback_all: typing.List[typing.Tuple[asyncio.Event, str]] = [],
):
while True:
t1 = asyncio.create_task(notify.wait())
t2 = asyncio.create_task(stdout.readline())
done, pending = await asyncio.wait(
[t1, t2],
return_when=asyncio.FIRST_COMPLETED,
)
for item in pending:
item.cancel()
if t2 in done:
s = t2.result().decode()
print(f"{title}:", s, end="")
for callback in callback_all:
evt, sig = callback
if sig in s:
evt.set()
print("Callback triggered")
if t1 in done:
break
if stdout.at_eof():
break


def main():
async def main():
(
executable,
victim,
expected_str,
bpftime_cli,
syscall_trace,
) = sys.argv[1:]
bashreadline_patch = "readline" in executable
# Run the syscall-server
server = subprocess.Popen(
" ".join([bpftime_cli, "load", executable]),
stdout=subprocess.PIPE,
text=False,
stderr=sys.stderr,
bufsize=0,
shell=True,
)
set_non_blocking(server.stdout)
server_ok = False
server_start_time = datetime.datetime.now()
while (
datetime.datetime.now() - server_start_time
).total_seconds() < SERVER_TIMEOUT:
if server.poll() is not None:
break
ready, _, _ = select.select([server.stdout], [], [], 0.01)
if ready:
line = server.stdout.readline().decode()
print("SERVER:", line, end="")
if SERVER_START_SIGNAL in line:
print("MONITOR: Server started!")
server_ok = True
break
if not server_ok:
print("Failed to start server!")
server.kill()
server.wait()
exit(1)
time.sleep(10)
# Start the agent
agent = subprocess.Popen(
" ".join(
[bpftime_cli, "start"]
+ (["-s", victim] if syscall_trace == "1" else [victim])
),
stdout=sys.stdout,
text=False,
stderr=sys.stderr,
stdin=subprocess.PIPE,
env={"SPDLOG_LEVEL": "info"},
shell=True,
)
agent_start_time = datetime.datetime.now()
agent_ok = False
buf = StringIO()
if bashreadline_patch:
# Currently it's difficult to test bashreadline
exit(0)
while (datetime.datetime.now() - agent_start_time).total_seconds() < AGENT_TIMEOUT:
# Check if server has expected output
if server.poll() is not None:
break
ready, _, _ = select.select([server.stdout], [], [], 0.01)
c = server.stdout.read()
if c:
c = c.decode()
buf.write(c)
print(c, end="")
if c == "\n":
buf.seek(0)
if expected_str in buf.getvalue():
# print("SERVER:", line, end="")
# if expected_str in line:
print(f"MONITOR: string `{expected_str}` found!")
agent_ok = True
break
agent.kill()
agent.wait()
server.kill()
server.wait()
if not agent_ok:
print("Failed to test, expected string not found!")
exit(1)
else:
exit(0)
try:
bashreadline_patch = "readline" in executable
should_exit = asyncio.Event()
# Run the syscall-server
server = await asyncio.subprocess.create_subprocess_exec(
*(" ".join([bpftime_cli, "load", executable]).split()),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
server_started_signal = asyncio.Event()
expected_str_signal = asyncio.Event()

server_out = asyncio.create_task(
handle_stdout(
server.stdout,
should_exit,
"SERVER",
[
(server_started_signal, SERVER_START_SIGNAL),
(expected_str_signal, expected_str),
],
)
)

await asyncio.wait_for(server_started_signal.wait(), SERVER_TIMEOUT)
await asyncio.sleep(2)
print("Server started!")

# Start the agent
agent = await asyncio.subprocess.create_subprocess_exec(
*(
" ".join(
[bpftime_cli, "start"]
+ (["-s", victim] if syscall_trace == "1" else [victim])
).split()
),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
env={"SPDLOG_LEVEL": "info"},
)
agent_out = asyncio.create_task(
handle_stdout(agent.stdout, should_exit, "AGENT", [])
)
if bashreadline_patch:
return
await asyncio.wait_for(expected_str_signal.wait(), AGENT_TIMEOUT)
print("Test successfully")
finally:
should_exit.set()
server.send_signal(signal.SIGINT)
agent.send_signal(signal.SIGINT)
await asyncio.gather(server_out, agent_out)
await asyncio.gather(server.communicate(), agent.communicate())


if __name__ == "__main__":
main()
asyncio.run(main())
68 changes: 64 additions & 4 deletions runtime/src/bpftime_shm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include "handler/map_handler.hpp"
#include "handler/perf_event_handler.hpp"
#include "spdlog/spdlog.h"
#include <csignal>
#include <signal.h>
#include <cerrno>
#include <errno.h>
#include <bpftime_shm_internal.hpp>
Expand Down Expand Up @@ -162,13 +164,15 @@ int bpftime_is_ringbuf_map(int fd)
return shm_holder.global_shared_memory.is_ringbuf_map_fd(fd);
}

void bpftime_protect_enable() {
void bpftime_protect_enable()
{
#if BPFTIME_ENABLE_MPK
return shm_holder.global_shared_memory.enable_mpk();
#endif
}

void bpftime_protect_disable() {
void bpftime_protect_disable()
{
#if BPFTIME_ENABLE_MPK
return shm_holder.global_shared_memory.disable_mpk();
#endif
Expand Down Expand Up @@ -274,6 +278,24 @@ int bpftime_epoll_wait(int fd, struct epoll_event *out_evts, int max_evt,
std::get<epoll_handler>(shm.get_manager()->get_handler(fd));
auto start_time = high_resolution_clock::now();
int next_id = 0;
sigset_t orig_sigset;
sigset_t to_block;
sigemptyset(&to_block);
sigaddset(&to_block, SIGINT);
sigaddset(&to_block, SIGTERM);

// Block the develivery of some signals, so we would be able to catch them when sleeping
if (int err = sigprocmask(SIG_BLOCK, &to_block, &orig_sigset);
err == -1) {
spdlog::error(
"sigprocmask failed to block sigint & sigterm, errno={}. this SHOULD NOT HAPPEN",
errno);
errno = EINVAL;
return -1;
}
// timeout for waiting..
timespec ts{ .tv_sec = 0, .tv_nsec = 1000 * 1000 };
bool failed_with_intr = false;
while (next_id < max_evt) {
auto now_time = high_resolution_clock::now();
auto elasped =
Expand Down Expand Up @@ -317,7 +339,44 @@ int bpftime_epoll_wait(int fd, struct epoll_event *out_evts, int max_evt,
}
}
}
std::this_thread::sleep_for(milliseconds(1));
if (next_id > 0) {
// According to man epoll_wait(2), epoll_wait can't be
// interrupted once at least one event was received
std::this_thread::sleep_for(milliseconds(1));
} else {
// Nothing has been received, so allow the interruption
// of epoll_wait
// First, unblock the signals
sigprocmask(SIG_UNBLOCK, &to_block, nullptr);
siginfo_t sig_info;
// Second, wait for interruptable signals
if (int sig = sigtimedwait(&to_block, &sig_info, &ts);
sig > 0) {
spdlog::debug(
"epoll_wait interrupted by signal {}",
sig);
// Invoke the original signal handler
struct sigaction act;
sigaction(sig, nullptr, &act);
if ((act.sa_flags & SA_SIGINFO) &&
act.sa_sigaction) {
act.sa_sigaction(sig, &sig_info,
nullptr);
} else if (auto f = act.sa_handler) {
f(sig);
}
failed_with_intr = true;
break;
}
// If not catched, just block them again
sigprocmask(SIG_BLOCK, &to_block, nullptr);
}
}
// Restore the original sigmask
sigprocmask(SIG_SETMASK, &orig_sigset, nullptr);
if (failed_with_intr) {
errno = EINTR;
return -1;
}
return next_id;
}
Expand All @@ -329,7 +388,8 @@ int bpftime_add_software_perf_event(int cpu, int32_t sample_type,
return shm.add_software_perf_event(cpu, sample_type, config);
}

int bpftime_add_ureplace_filter(int fd, int pid, const char *name, uint64_t offset, bool is_replace)
int bpftime_add_ureplace_filter(int fd, int pid, const char *name,
uint64_t offset, bool is_replace)
{
auto &shm = shm_holder.global_shared_memory;
return shm.add_ureplace_filter(fd, pid, name, offset, is_replace);
Expand Down
4 changes: 2 additions & 2 deletions runtime/src/handler/perf_event_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,14 @@ struct bpf_perf_event_handler {
// If This is a server, should inject the agent into the target
// process.

spdlog::info(
spdlog::debug(
"Enabling perf event for module name: {}, offset {:x}",
_module_name.c_str(), offset);
return 0;
}
int disable() const
{
spdlog::info(
spdlog::debug(
"Disabling perf event for module name: {}, offset {:x}",
_module_name.c_str(), offset);
enabled = false;
Expand Down
Loading