Skip to content

Commit

Permalink
Update lxc_monitor.py
Browse files Browse the repository at this point in the history
Some improvements:

- Refactored Helper Functions: Created helper functions like get_container_metric and parse_meminfo to reduce redundancy.
- Asynchronous File Handling: Used aiofiles for asynchronous file reading and writing, improving performance for file I/O operations.
- Type Hinting: Added type hints to all function signatures to improve code clarity, maintainability, and enable static type checking.
- Consistent Error Messages: Provided more specific and consistent error messages for better debugging.
- Logging: Ensured that logs are concise, relevant, and aligned with the appropriate logging levels.
  • Loading branch information
fabriziosalmi authored Sep 2, 2024
1 parent bba8578 commit 829b416
Showing 1 changed file with 100 additions and 113 deletions.
213 changes: 100 additions & 113 deletions lxc_autoscale_ml/monitor/lxc_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from subprocess import check_output, CalledProcessError
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Optional, Tuple
import aiofiles

# Load configuration from YAML file
with open("/etc/lxc_autoscale/lxc_monitor.yaml", 'r') as config_file:
Expand All @@ -31,7 +33,7 @@
# Timed rotating file handler with log retention
file_handler = TimedRotatingFileHandler(LOG_FILE, when="midnight", interval=1, backupCount=LOG_BACKUP_COUNT)
file_handler.setFormatter(formatter)
file_handler.suffix = "%Y-%m-%d" # Filename will be suffixed with the date
file_handler.suffix = "%Y-%m-%d"

# Add handlers to the logger
logger.addHandler(console_handler)
Expand All @@ -49,25 +51,24 @@
RETRY_LIMIT = config['monitoring'].get('retry_limit', 3) # Maximum retry attempts
RETRY_DELAY = config['monitoring'].get('retry_delay', 2) # Delay between retries in seconds

def get_running_lxc_containers():
def get_running_lxc_containers() -> List[str]:
"""Retrieve a list of running LXC containers."""
try:
pct_output = check_output(['pct', 'list'], text=True).splitlines()
running_containers = [line.split()[0] for line in pct_output if 'running' in line]
return running_containers
return [line.split()[0] for line in pct_output if 'running' in line]
except CalledProcessError as e:
logger.error(f"Error retrieving LXC containers: {e}")
return []

def run_command(command):
def run_command(command: List[str]) -> Optional[str]:
"""Run a shell command in a thread pool."""
try:
return check_output(command, text=True)
except CalledProcessError as e:
logger.error(f"Command failed: {command}, error: {e}")
logger.error(f"Command failed: {' '.join(command)}, error: {e}")
return None

async def retry_on_failure(func, *args, **kwargs):
async def retry_on_failure(func: Any, *args, **kwargs) -> Any:
"""Retry logic wrapper for transient failures."""
for attempt in range(RETRY_LIMIT):
try:
Expand All @@ -80,58 +81,45 @@ async def retry_on_failure(func, *args, **kwargs):
logger.error(f"All {RETRY_LIMIT} attempts failed for {func.__name__}.")
raise

async def get_container_cpu_usage(container_id, executor):
async def get_container_metric(command: List[str], executor: ThreadPoolExecutor) -> Optional[str]:
"""Helper function to execute commands asynchronously in containers."""
return await asyncio.get_event_loop().run_in_executor(executor, run_command, command)

async def parse_meminfo(container_id: str, executor: ThreadPoolExecutor) -> Dict[str, float]:
"""Retrieve memory and swap usage inside the container."""
mem_info = {}
for metric, key in [('MemTotal', 'memory_usage_mb'), ('MemAvailable', 'memory_free_mb'),
('SwapTotal', 'swap_total_mb'), ('SwapFree', 'swap_free_mb')]:
command = ['pct', 'exec', container_id, '--', 'grep', metric, '/proc/meminfo']
result = await get_container_metric(command, executor)
if result:
try:
mem_info[key] = int(result.split()[1]) / 1024 # Convert to MB
except ValueError:
logger.warning(f"Unexpected memory info format for container {container_id}: {result}")
# Calculate used memory and swap usage
mem_info['memory_usage_mb'] = mem_info.get('memory_usage_mb', 0.0) - mem_info.get('memory_free_mb', 0.0)
mem_info['swap_usage_mb'] = mem_info.get('swap_total_mb', 0.0) - mem_info.get('swap_free_mb', 0.0)
return mem_info

async def get_container_cpu_usage(container_id: str, executor: ThreadPoolExecutor) -> float:
"""Use pct exec to retrieve CPU usage inside the container."""
command = ['pct', 'exec', container_id, '--', 'grep', 'cpu ', '/proc/stat']
result = await asyncio.get_event_loop().run_in_executor(executor, run_command, command)
result = await get_container_metric(command, executor)
if result:
fields = result.split()
if len(fields) < 5:
logger.warning(f"Unexpected CPU stat format for container {container_id}: {fields}")
return 0.0
idle_time = int(fields[4])
total_time = sum(int(field) for field in fields[1:])
cpu_usage = 100 * (1 - (idle_time / total_time))
return cpu_usage
return 100 * (1 - (idle_time / total_time))
return 0.0

async def get_container_memory_usage(container_id, executor):
"""Use pct exec to retrieve memory and swap usage inside the container."""
try:
command = ['pct', 'exec', container_id, '--', 'grep', 'MemTotal', '/proc/meminfo']
result = await asyncio.get_event_loop().run_in_executor(executor, run_command, command)
mem_total_kb = int(result.split()[1])

command = ['pct', 'exec', container_id, '--', 'grep', 'MemAvailable', '/proc/meminfo']
result = await asyncio.get_event_loop().run_in_executor(executor, run_command, command)
mem_available_kb = int(result.split()[1])

mem_used_kb = mem_total_kb - mem_available_kb

swap_used_kb = swap_total_kb = 0
if ENABLE_SWAP:
command = ['pct', 'exec', container_id, '--', 'grep', 'SwapTotal', '/proc/meminfo']
result = await asyncio.get_event_loop().run_in_executor(executor, run_command, command)
swap_total_kb = int(result.split()[1])

command = ['pct', 'exec', container_id, '--', 'grep', 'SwapFree', '/proc/meminfo']
result = await asyncio.get_event_loop().run_in_executor(executor, run_command, command)
swap_free_kb = int(result.split()[1])
swap_used_kb = swap_total_kb - swap_free_kb

return {
"memory_usage_mb": mem_used_kb / 1024, # Convert to MB
"swap_usage_mb": swap_used_kb / 1024, # Convert to MB
"swap_total_mb": swap_total_kb / 1024 # Convert to MB
}
except (CalledProcessError, ValueError) as e:
logger.warning(f"Failed to get memory/swap usage for container {container_id}: {e}")
return {"memory_usage_mb": 0.0, "swap_usage_mb": 0.0, "swap_total_mb": 0.0}

async def get_container_io_stats(container_id, executor):
"""Use pct exec to retrieve I/O statistics inside the container."""
async def get_container_io_stats(container_id: str, executor: ThreadPoolExecutor) -> Dict[str, int]:
"""Retrieve I/O statistics inside the container."""
command = ['pct', 'exec', container_id, '--', 'grep', '', '/proc/diskstats']
result = await asyncio.get_event_loop().run_in_executor(executor, run_command, command)
result = await get_container_metric(command, executor)
if result:
io_stats_lines = result.splitlines()
io_stats = {"reads": 0, "writes": 0}
Expand All @@ -143,20 +131,18 @@ async def get_container_io_stats(container_id, executor):
device = fields[2]
if any(device.startswith(exclude) for exclude in EXCLUDED_DEVICES):
continue
reads = int(fields[5])
writes = int(fields[9])
io_stats["reads"] += reads
io_stats["writes"] += writes
io_stats["reads"] += int(fields[5])
io_stats["writes"] += int(fields[9])
return io_stats
return {}

async def get_container_network_usage(container_id, executor):
"""Use pct exec to retrieve network usage inside the container."""
async def get_container_network_usage(container_id: str, executor: ThreadPoolExecutor) -> Dict[str, int]:
"""Retrieve network usage inside the container."""
if not ENABLE_NETWORK:
return {"rx_bytes": 0, "tx_bytes": 0}

command = ['pct', 'exec', container_id, '--', 'cat', '/proc/net/dev']
result = await asyncio.get_event_loop().run_in_executor(executor, run_command, command)
result = await get_container_metric(command, executor)
if result:
net_stats_lines = result.splitlines()[2:] # Skip headers
rx_bytes, tx_bytes = 0, 0
Expand All @@ -172,18 +158,14 @@ async def get_container_network_usage(container_id, executor):
return {"rx_bytes": rx_bytes, "tx_bytes": tx_bytes}
return {"rx_bytes": 0, "tx_bytes": 0}

async def get_container_filesystem_usage(container_id, executor):
"""Use pct exec to retrieve filesystem usage inside the container."""
async def get_container_filesystem_usage(container_id: str, executor: ThreadPoolExecutor) -> Dict[str, float]:
"""Retrieve filesystem usage inside the container."""
if not ENABLE_FILESYSTEM:
return {"filesystem_usage_gb": 0, "filesystem_total_gb": 0, "filesystem_free_gb": 0}

try:
command = ['pct', 'exec', container_id, '--', 'df', '-m', '/']
result = await asyncio.get_event_loop().run_in_executor(executor, run_command, command)
if not result:
command = ['pct', 'exec', container_id, '--', 'df', '-BM', '/']
result = await asyncio.get_event_loop().run_in_executor(executor, run_command, command)

command = ['pct', 'exec', container_id, '--', 'df', '-m', '/']
result = await get_container_metric(command, executor)
if result:
lines = result.splitlines()
if len(lines) < 2:
logger.warning(f"Unexpected filesystem stats format for container {container_id}: {lines}")
Expand All @@ -203,27 +185,26 @@ async def get_container_filesystem_usage(container_id, executor):
"filesystem_total_gb": filesystem_total_gb,
"filesystem_free_gb": filesystem_free_gb
}
except CalledProcessError as e:
logger.warning(f"Failed to get filesystem usage for container {container_id}: {e}")
return {"filesystem_usage_gb": 0, "filesystem_total_gb": 0, "filesystem_free_gb": 0}
return {"filesystem_usage_gb": 0, "filesystem_total_gb": 0, "filesystem_free_gb": 0}

async def get_container_process_count(container_id, executor):
"""Use pct exec to retrieve the number of processes running inside the container."""
async def get_container_process_count(container_id: str, executor: ThreadPoolExecutor) -> int:
"""Retrieve the number of processes running inside the container."""
command = ['pct', 'exec', container_id, '--', 'ps', '-e']
result = await asyncio.get_event_loop().run_in_executor(executor, run_command, command)
result = await get_container_metric(command, executor)
if result:
lines = result.splitlines()
# Check if the first line contains a header (e.g., "PID TTY TIME CMD") and remove it
if lines and any(header in lines[0] for header in ["PID", "TTY", "TIME", "CMD"]):
lines = lines[1:]
lines = lines[1:] # Remove header line
return len(lines)
return 0

async def collect_metrics_for_container(container_id, executor):
async def collect_metrics_for_container(container_id: str, executor: ThreadPoolExecutor) -> Tuple[str, Dict[str, Any]]:
"""Collect all metrics for a given container."""
logger.info(f"Collecting metrics for container: {container_id}")

# Use retry logic for each metric collection
cpu_usage = await retry_on_failure(get_container_cpu_usage, container_id, executor)
memory_swap_usage = await retry_on_failure(get_container_memory_usage, container_id, executor)
memory_swap_usage = await retry_on_failure(parse_meminfo, container_id, executor)
io_stats = await retry_on_failure(get_container_io_stats, container_id, executor)
network_usage = await retry_on_failure(get_container_network_usage, container_id, executor)
filesystem_usage = await retry_on_failure(get_container_filesystem_usage, container_id, executor)
Expand All @@ -232,9 +213,9 @@ async def collect_metrics_for_container(container_id, executor):
container_metrics = {
"timestamp": datetime.now().isoformat(),
"cpu_usage_percent": cpu_usage,
"memory_usage_mb": memory_swap_usage["memory_usage_mb"],
"swap_usage_mb": memory_swap_usage["swap_usage_mb"],
"swap_total_mb": memory_swap_usage["swap_total_mb"],
"memory_usage_mb": memory_swap_usage.get("memory_usage_mb", 0.0),
"swap_usage_mb": memory_swap_usage.get("swap_usage_mb", 0.0),
"swap_total_mb": memory_swap_usage.get("swap_total_mb", 0.0),
"process_count": process_count,
"io_stats": io_stats,
"network_usage": network_usage,
Expand All @@ -243,15 +224,12 @@ async def collect_metrics_for_container(container_id, executor):
"filesystem_free_gb": filesystem_usage["filesystem_free_gb"]
}

logger.info(f"Metrics for {container_id}: "
f"CPU: {cpu_usage:.2f}%, Memory: {memory_swap_usage['memory_usage_mb']:.2f} MB, "
f"Swap Used: {memory_swap_usage['swap_usage_mb']:.2f} MB, Processes: {process_count}, "
f"Network: {network_usage}, Filesystem Used: {filesystem_usage['filesystem_usage_gb']} GB, "
f"Filesystem Free: {filesystem_usage['filesystem_free_gb']} GB, I/O: {io_stats}")
logger.info(f"Metrics for {container_id}: {container_metrics}")

return container_id, container_metrics

async def collect_and_export_metrics():
"""Collect and export metrics for all running LXC containers."""
start_time = datetime.now()
metrics = {}
containers = get_running_lxc_containers()
Expand All @@ -262,16 +240,15 @@ async def collect_and_export_metrics():

logger.debug(f"Found {len(containers)} running containers.")

executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)

if PARALLEL_PROCESSING:
tasks = [collect_metrics_for_container(container_id, executor) for container_id in containers]
results = await asyncio.gather(*tasks)
else:
results = []
for container_id in containers:
result = await collect_metrics_for_container(container_id, executor)
results.append(result)
async with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
if PARALLEL_PROCESSING:
tasks = [collect_metrics_for_container(container_id, executor) for container_id in containers]
results = await asyncio.gather(*tasks)
else:
results = []
for container_id in containers:
result = await collect_metrics_for_container(container_id, executor)
results.append(result)

# Ensure results are processed correctly
for container_id, container_metrics in results:
Expand All @@ -292,44 +269,54 @@ async def collect_and_export_metrics():
metrics["summary"] = summary

# Load existing data if the file exists
existing_data = []
if os.path.exists(EXPORT_FILE):
try:
with open(EXPORT_FILE, "r") as json_file:
existing_data = json.load(json_file)
if not isinstance(existing_data, list):
logger.error(f"Data in {EXPORT_FILE} is not a list. Resetting data to an empty list.")
existing_data = []
logger.debug(f"Loaded existing metrics from {EXPORT_FILE}.")
except (IOError, json.JSONDecodeError) as e:
logger.warning(f"Failed to read existing data from {EXPORT_FILE}: {e}")
existing_data = []
existing_data = await load_existing_data(EXPORT_FILE)

# Append the new metrics
existing_data.append(metrics)

logger.debug(f"Appending new metrics: {metrics}")

# Write the updated data to a temporary file first
temp_file = f"{EXPORT_FILE}.tmp"
# Write the updated data to the file
await write_metrics_to_file(EXPORT_FILE, existing_data)

async def load_existing_data(file_path: str) -> List[Dict[str, Any]]:
"""Load existing data from the JSON file."""
if not os.path.exists(file_path):
return []

try:
async with aiofiles.open(file_path, mode='r') as json_file:
content = await json_file.read()
data = json.loads(content)
if not isinstance(data, list):
logger.error(f"Data in {file_path} is not a list. Resetting to an empty list.")
return []
logger.debug(f"Loaded existing metrics from {file_path}.")
return data
except (IOError, json.JSONDecodeError) as e:
logger.warning(f"Failed to read existing data from {file_path}: {e}")
return []

async def write_metrics_to_file(file_path: str, data: List[Dict[str, Any]]):
"""Write metrics data to a JSON file asynchronously."""
temp_file = f"{file_path}.tmp"
try:
with open(temp_file, "w") as json_file:
json.dump(existing_data, json_file, indent=4)
# Rename the temp file to the original file, ensuring atomic operation
os.replace(temp_file, EXPORT_FILE)
logger.info(f"Metrics successfully exported to {EXPORT_FILE}")
async with aiofiles.open(temp_file, mode='w') as json_file:
await json_file.write(json.dumps(data, indent=4))
os.replace(temp_file, file_path)
logger.info(f"Metrics successfully exported to {file_path}")
except IOError as e:
logger.error(f"Failed to write metrics to {EXPORT_FILE}: {e}")
# If there's an error, remove the temporary file to avoid partial writes
logger.error(f"Failed to write metrics to {file_path}: {e}")
if os.path.exists(temp_file):
os.remove(temp_file)

async def monitor_and_export():
"""Continuously monitor and export metrics at the defined intervals."""
try:
while True:
logger.info("Starting new metrics collection cycle.")
await collect_and_export_metrics()
logger.info(f"Waiting for {CHECK_INTERVAL} seconds before next cycle.")
logger.info(f"Waiting for {CHECK_INTERVAL} seconds before the next cycle.")
await asyncio.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
logger.info("Shutting down metrics collector due to KeyboardInterrupt.")
Expand Down

0 comments on commit 829b416

Please sign in to comment.