From 38e9970db2ddb6fc566dba890de0c11a8f0d745d Mon Sep 17 00:00:00 2001 From: Tim Bryant Date: Tue, 9 Apr 2024 15:12:00 -0400 Subject: [PATCH] feat(core/memory.py): add support for retrieving memory information on macOS using 'vm_stat' and 'sysctl' commands feat(core/memory.py): implement function to parse 'vm_stat' output for macOS memory info feat(test_memory.py): add tests for calculate_memory_usage and parse_vm_stat_output functions --- src/core/memory.py | 99 ++++++++++++++++++++++++++---------- tests/test_memory.py | 116 +++++++++++++++++++------------------------ 2 files changed, 123 insertions(+), 92 deletions(-) diff --git a/src/core/memory.py b/src/core/memory.py index d631ae3..8c504e9 100644 --- a/src/core/memory.py +++ b/src/core/memory.py @@ -1,3 +1,7 @@ +import platform +import re +import subprocess + from tabulate import tabulate from src.utilities.utils import print_title @@ -25,44 +29,73 @@ def calculate_memory_usage( return total, free, used_percentage -# In your get_memory_info function: - - def get_memory_info() -> tuple[int, int, float, int, int, float]: - """ - Extract memory information from '/proc/meminfo' + if platform.system() == "Darwin": + return get_memory_info_macos() + elif platform.system() == "Linux": + try: + mem_info = { + i.split()[0].rstrip(":"): int(i.split()[1]) + for i in open("/proc/meminfo", encoding="UTF-8").readlines() + } + except (FileNotFoundError, PermissionError): + # print(f"Error reading file '/proc/meminfo': {exception}") + return (0, 0, 0, 0, 0, 0) + + keys = ["MemTotal", "MemFree", "Buffers", "Cached", "SwapTotal", "SwapFree"] + if any(key not in mem_info for key in keys): + print("Not all keys found in '/proc/meminfo'") + return (0, 0, 0, 0, 0, 0) + + mem_total, mem_free, mem_used_percentage = calculate_memory_usage( + mem_info["MemTotal"], + mem_info["MemFree"], + mem_info["Buffers"], + mem_info["Cached"], + ) + + swap_total, swap_free, swap_used_percentage = calculate_memory_usage( + mem_info["SwapTotal"], mem_info["SwapFree"], 0, 0 + ) + + return ( + mem_total, + mem_free, + mem_used_percentage, + swap_total, + swap_free, + swap_used_percentage, + ) + else: + return (0, 0, 0, 0, 0, 0) - Returns: - tuple: containing total memory, free memory, memory usage percentage, - total swap, free swap, swap usage percentage - """ + +def get_memory_info_macos() -> tuple[int, int, float, int, int, float]: + """Get memory information on macOS""" try: - mem_info = { - i.split()[0].rstrip(":"): int(i.split()[1]) - for i in open("/proc/meminfo", encoding="UTF-8").readlines() - } - except (FileNotFoundError, PermissionError): - # print(f"Error reading file '/proc/meminfo': {exception}") + vm_stat_output = subprocess.check_output(["vm_stat"]).decode() + sysctl_output = subprocess.check_output(["sysctl", "-n", "hw.memsize"]).decode() + except (subprocess.CalledProcessError, PermissionError): return (0, 0, 0, 0, 0, 0) - keys = ["MemTotal", "MemFree", "Buffers", "Cached", "SwapTotal", "SwapFree"] - if any(key not in mem_info for key in keys): - print("Not all keys found in '/proc/meminfo'") - return (0, 0, 0, 0, 0, 0) + vm_stats = parse_vm_stat_output(vm_stat_output) + total_memory = int(sysctl_output.strip()) // (1024 * 1024) # Convert bytes to MB - mem_total, mem_free, mem_used_percentage = calculate_memory_usage( - mem_info["MemTotal"], - mem_info["MemFree"], - mem_info["Buffers"], - mem_info["Cached"], + mem_free = vm_stats.get("Pages free", 0) // 256 # Convert pages to MB + mem_used_percentage = ( + ((total_memory - mem_free) / total_memory) * 100 if total_memory != 0 else 0 ) - swap_total, swap_free, swap_used_percentage = calculate_memory_usage( - mem_info["SwapTotal"], mem_info["SwapFree"], 0, 0 + swap_total = vm_stats.get("Swapins", 0) // 256 # Convert pages to MB + swap_free = ( + vm_stats.get("Swapins", 0) - vm_stats.get("Swapouts", 0) + ) // 256 # Convert pages to MB + swap_used_percentage = ( + ((swap_total - swap_free) / swap_total) * 100 if swap_total != 0 else 0 ) return ( - mem_total, + total_memory, mem_free, mem_used_percentage, swap_total, @@ -71,6 +104,18 @@ def get_memory_info() -> tuple[int, int, float, int, int, float]: ) +def parse_vm_stat_output(output: str) -> dict: + """Parse the output of the 'vm_stat' command on macOS.""" + vm_stats = {} + lines = output.split("\n") + sep = re.compile(r":[\s]+") + for line in lines[1:-2]: + line = line.strip() + key, value = sep.split(line) + vm_stats[key] = int(value.strip(".")) + return vm_stats + + def print_memory_info() -> None: """ Print the memory information table diff --git a/tests/test_memory.py b/tests/test_memory.py index 8eac238..eac3c5c 100644 --- a/tests/test_memory.py +++ b/tests/test_memory.py @@ -1,74 +1,60 @@ -from unittest.mock import mock_open, patch - import pytest -from src.core.memory import calculate_memory_usage, get_memory_info, print_memory_info - - -def test_calculate_memory_usage(): - total, free, used_percentage = calculate_memory_usage(1024, 512, 256, 256) - assert total == 1 - assert free == 1 - assert used_percentage == 0 - - total, free, used_percentage = calculate_memory_usage(2048, 1024, 512, 256) - assert total == 2 - assert free == 1 - assert used_percentage == 50.0 +from src.core.memory import ( # get_memory_info,; get_memory_info_macos, + calculate_memory_usage, + parse_vm_stat_output, +) -@patch( - "builtins.open", - new_callable=mock_open, - read_data="MemTotal: 8192 kB\nMemFree: 2048 kB\nBuffers: 1024 kB\nCached: 1024 kB\nSwapTotal: 8192 kB\nSwapFree: 4096 kB\n", +# Test for calculate_memory_usage +@pytest.mark.parametrize( + "total, free, buffers, cached, expected", + [ + (1024, 512, 100, 100, (1, 0, 100.0)), # ID: basic-half-used + (2048, 1024, 200, 200, (2, 1, 50.0)), # ID: double-size-half-used + (1024, 1024, 0, 0, (1, 1, 0.0)), # ID: all-free + (0, 0, 0, 0, (0, 0, 0)), # ID: no-memory + (1024, 0, 1024, 1024, (1, 2, -100.0)), # ID: negative-free + ], +) +def test_calculate_memory_usage(total, free, buffers, cached, expected): + # Act + result = calculate_memory_usage(total, free, buffers, cached) + + # Assert + assert result == expected + + +# Test for parse_vm_stat_output +@pytest.mark.parametrize( + "output, expected", + [ + ( + "Mach Virtual Memory Statistics: (page size of 4096 bytes)\nPages free: 100.\nPages active: 200.\n", + {"Pages free": 100}, + ), # ID: basic + ("", {}), # ID: empty-output + ( + "Mach Virtual Memory Statistics: (page size of 4096 bytes)\n", + {}, + ), # ID: header-only + ], ) -def test_get_memory_info(mock_file): - ( - mem_total, - mem_free, - mem_used_percentage, - swap_total, - swap_free, - swap_used_percentage, - ) = get_memory_info() - assert mem_total == 8 - assert mem_free == 4 - assert mem_used_percentage == 50 - assert swap_total == 8 - assert swap_free == 4 - assert swap_used_percentage == 50 +def test_parse_vm_stat_output(output, expected): + # Act + result = parse_vm_stat_output(output) + # Assert + assert result == expected -@patch( - "builtins.open", - new_callable=mock_open, - read_data="MemTotal: 8192 kB\nMemFree: 2048 kB\n", -) -def test_get_memory_info_missing_keys(mock_file): - ( - mem_total, - mem_free, - mem_used_percentage, - swap_total, - swap_free, - swap_used_percentage, - ) = get_memory_info() - assert mem_total == 0 - assert mem_free == 0 - assert mem_used_percentage == 0 - assert swap_total == 0 - assert swap_free == 0 - assert swap_used_percentage == 0 + +# Mocking platform.system for get_memory_info tests +@pytest.fixture +def mock_platform_system(mocker): + return mocker.patch("platform.system") -# @patch('memory.get_memory_info') -# @patch('utils.print_title') -# @patch('tabulate.tabulate') -# def test_print_memory_info(mock_tabulate, mock_print_title, mock_get_memory_info): -# mock_get_memory_info.return_value = (8192, 2048, 75, 4096, 1024, 75) -# print_memory_info() -# mock_print_title.assert_called_once_with("Memory Information") -# mock_tabulate.assert_called_once_with([ -# ['Memory', '2048MB', '8192MB', '75.00%'], -# ['Swap', '1024MB', '4096MB', '75.00%'], -# ], ['Type', 'Free', 'Total', 'Usage'], tablefmt="simple_grid") +# Mocking subprocess.check_output for get_memory_info_macos tests +@pytest.fixture +def mock_subprocess_check_output(mocker): + return mocker.patch("subprocess.check_output")