-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #48 from olegeech-me/fetcher/load-tests
Fetcher/load tests
- Loading branch information
Showing
8 changed files
with
296 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
from selenium import webdriver | ||
from selenium.webdriver.support.wait import WebDriverWait | ||
from selenium.webdriver.common.by import By | ||
from selenium.webdriver.common.action_chains import ActionChains | ||
import time | ||
import json | ||
import pytest | ||
from selenium.common.exceptions import ( | ||
ElementClickInterceptedException, | ||
) | ||
import random | ||
import fake_useragent | ||
import logging | ||
|
||
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") | ||
logger = logging.getLogger(__name__) | ||
|
||
|
||
test_data = [ | ||
{"number": 27628, "type": "DP", "suffix": "0", "year": 2023}, | ||
{"number": 43075, "type": "DP", "suffix": "0", "year": 2023}, | ||
{"number": 57638, "type": "ZM", "suffix": "5", "year": 2023}, | ||
{"number": 17271, "type": "DP", "suffix": "4", "year": 2023}, | ||
{"number": 40465, "type": "DP", "suffix": "4", "year": 2023}, | ||
] | ||
|
||
|
||
class TestMVCR: | ||
def random_sleep(self, min_seconds=0.5, max_seconds=1.5): | ||
"""Sleep for a random amount of time""" | ||
time.sleep(random.uniform(min_seconds, max_seconds)) | ||
|
||
def type_with_delay(self, element, text, min_delay=0.05, max_delay=0.15): | ||
"""Type text into an element one character at a time with a delay""" | ||
for char in text: | ||
element.send_keys(char) | ||
self.random_sleep(min_delay, max_delay) | ||
|
||
def set_random_resolution(self): | ||
"""Pick one of popular resolutions""" | ||
resolutions = [(1936, 1056), (1920, 1080), (1366, 768), (1440, 900), (1420, 1080), (1600, 900)] | ||
chosen_resolution = random.choice(resolutions) | ||
self.driver.set_window_size(chosen_resolution[0], chosen_resolution[1]) | ||
|
||
def clean_cookies(self, cookies): | ||
for cookie in cookies: | ||
samesite = cookie.get("sameSite", "None") | ||
if samesite: | ||
samesite = samesite.capitalize() | ||
if samesite not in ["None", "Lax", "Strict"]: | ||
cookie["sameSite"] = "None" | ||
else: | ||
cookie["sameSite"] = samesite | ||
logger.debug(f"Cookies: {cookies}") | ||
return cookies | ||
|
||
def load_cookies(self, filepath="all_cookies.json"): | ||
"""Load cookies from a file and add them to the Selenium browser""" | ||
with open(filepath, "r") as file: | ||
cookies = json.load(file) | ||
cleaned_cookies = self.clean_cookies(cookies) | ||
for cookie in cleaned_cookies: | ||
self.driver.add_cookie(cookie) | ||
|
||
def setup_method(self, method): | ||
useragent = fake_useragent.UserAgent(browsers=["firefox"]).random | ||
# chrome_options = webdriver.ChromeOptions() | ||
# chrome_options.add_argument(f"user-agent={user_agent}") | ||
# self.driver = webdriver.Chrome(options=chrome_options) | ||
|
||
# configure display & options | ||
options = webdriver.firefox.options.Options() | ||
options.set_preference("intl.accept_languages", "cs-CZ") | ||
options.set_preference("http.response.timeout", 10) | ||
options.set_preference("general.useragent.override", useragent) | ||
options.set_preference("dom.webdriver.enabled", False) | ||
options.set_preference("useAutomationExtension", False) | ||
self.driver = webdriver.Firefox(options=options) | ||
self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") | ||
self.vars = {} | ||
|
||
def teardown_method(self, method): | ||
self.driver.quit() | ||
|
||
@pytest.mark.parametrize("data", test_data) | ||
def test_MVCR(self, data): | ||
number = str(data["number"]) | ||
suffix = data["suffix"] | ||
type = data["type"] | ||
year = str(data["year"]) | ||
|
||
self.set_random_resolution() | ||
self.driver.get("https://frs.gov.cz/informace-o-stavu-rizeni/") | ||
self.load_cookies("mvcr_cookies.json") | ||
WebDriverWait(self.driver, 10).until( | ||
lambda x: x.find_element(By.CLASS_NAME, "wrapper__form"), | ||
message="Body didn't load in time", | ||
) | ||
cookies = self.driver.find_element_by_xpath('//button[@class="button button__primary" and text()="Souhlasím se všemi"]') | ||
try: | ||
cookies.click() | ||
except ElementClickInterceptedException: | ||
pass | ||
|
||
input_field = self.driver.find_element(By.XPATH, "//input[@placeholder='12345']") | ||
input_field.click() | ||
self.random_sleep() | ||
self.type_with_delay(input_field, number) | ||
|
||
input_field_xx = self.driver.find_element(By.XPATH, "//input[@placeholder='XX']") | ||
input_field_xx.click() | ||
self.random_sleep() | ||
self.type_with_delay(input_field_xx, suffix) | ||
|
||
# Trigger type dropdown menu to appear | ||
menu1 = self.driver.find_element_by_xpath( | ||
"//div[contains(@class, 'react-select') and ancestor::div[contains(@style, 'width: 140px;')]]" | ||
) | ||
menu1.find_element_by_xpath("//div[contains(@class, 'react-select__control')]").click() | ||
|
||
time.sleep(0.5) | ||
# Locate and select the type dropdown by placeholder | ||
scroll1 = self.driver.find_element_by_xpath("//div[contains(@class, 'react-select__menu')]") | ||
scroll1.find_element_by_xpath(f".//div[text()='{type}']").click() | ||
|
||
# Trigger year dropdown menu to appear | ||
menu2 = self.driver.find_element_by_xpath( | ||
"//div[contains(@class, 'react-select') and ancestor::div[contains(@style, 'width: 100px;')]]" | ||
) | ||
menu2.find_element_by_xpath(".//div[contains(@class, 'react-select__control')]").click() | ||
|
||
self.random_sleep() | ||
# Locate and select the year dropdown by placeholder | ||
scroll2 = menu2.find_element_by_xpath(".//div[contains(@class, 'react-select__menu')]") | ||
scroll2.find_element_by_xpath(f".//div[text()='{year}']").click() | ||
|
||
# Click submit | ||
submit_button = self.driver.find_element(By.CSS_SELECTOR, ".button--large") | ||
self.driver.execute_script("arguments[0].scrollIntoView();", submit_button) | ||
actions = ActionChains(self.driver) | ||
actions.move_to_element(submit_button).perform() | ||
self.random_sleep() | ||
self.driver.execute_script("arguments[0].click();", submit_button) | ||
self.driver.execute_script("window.scrollTo(0, 0);") | ||
|
||
# submit_button.click() | ||
|
||
WebDriverWait(self.driver, 5).until( | ||
lambda x: x.find_element(By.CLASS_NAME, "alert__content"), | ||
message="Status field wasn't found", | ||
) | ||
self.driver.execute_script("window.scrollBy(0, -document.body.scrollHeight);") | ||
time.sleep(5) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
import unittest | ||
import asyncio | ||
import logging | ||
from fetcher.browser import Browser | ||
|
||
URL = "https://frs.gov.cz/informace-o-stavu-rizeni/" | ||
|
||
# Setting up the logger | ||
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") | ||
logger = logging.getLogger(__name__) | ||
|
||
# Test case data | ||
test_data = [ | ||
{"number": 27628, "type": "DP", "suffix": "0", "year": 2023}, | ||
{"number": 43075, "type": "DP", "suffix": "0", "year": 2023}, | ||
{"number": 57638, "type": "ZM", "suffix": "5", "year": 2023}, | ||
{"number": 17271, "type": "DP", "suffix": "4", "year": 2023}, | ||
{"number": 40465, "type": "DP", "suffix": "4", "year": 2023}, | ||
{"number": 40018, "type": "DP", "suffix": "0", "year": 2023}, | ||
{"number": 15904, "type": "TP", "suffix": "0", "year": 2023}, | ||
{"number": 40139, "type": "DP", "suffix": "2", "year": 2023}, | ||
{"number": 20174, "type": "DP", "suffix": "3", "year": 2023}, | ||
{"number": 26394, "type": "DP", "suffix": "0", "year": 2023}, | ||
{"number": 6996, "type": "DV", "suffix": "0", "year": 2023}, | ||
{"number": 27774, "type": "DP", "suffix": "0", "year": 2023}, | ||
{"number": 24574, "type": "DP", "suffix": "3", "year": 2023}, | ||
{"number": 65199, "type": "ZM", "suffix": "5", "year": 2023}, | ||
{"number": 40110, "type": "DP", "suffix": "0", "year": 2023}, | ||
{"number": 19506, "type": "TP", "suffix": "0", "year": 2023}, | ||
{"number": 28743, "type": "DP", "suffix": "0", "year": 2023}, | ||
{"number": 23155, "type": "TP", "suffix": "0", "year": 2023}, | ||
{"number": 34208, "type": "DP", "suffix": "0", "year": 2023}, | ||
{"number": 38703, "type": "DP", "suffix": "0", "year": 2023}, | ||
] | ||
|
||
|
||
class TestBrowserLoad(unittest.TestCase): | ||
def setUp(self): | ||
# Create browser object | ||
self.browser = Browser() | ||
|
||
def test_load(self): | ||
# Define an async function to handle the load test | ||
async def run_test(): | ||
tasks = [] | ||
for app_details in test_data: | ||
task = asyncio.ensure_future(self.browser.fetch(URL, app_details)) | ||
tasks.append(task) | ||
|
||
results = await asyncio.gather(*tasks) | ||
|
||
# Here, results is a list of all outputs from the browser.fetch method for each app_details | ||
for result, app_details in zip(results, test_data): | ||
logger.info(f"Result for {app_details['number']}: {result}") | ||
# Add checks here to see if you get the expected results or behaviors | ||
self.assertIsNotNone(result) | ||
|
||
# Run the async function | ||
asyncio.run(run_test()) | ||
|
||
def tearDown(self): | ||
self.browser.close() | ||
|
||
|
||
if __name__ == "__main__": | ||
unittest.main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
[tox] | ||
isolated_build = True | ||
envlist = bot,fetcher | ||
|
||
[testenv] | ||
basepython = python3.8 | ||
deps = | ||
pytest | ||
pytest-asyncio | ||
pytest-mock | ||
|
||
[testenv:bot] | ||
deps = | ||
{[testenv]deps} | ||
-rrequirements-bot.txt | ||
commands = pytest -v src/tests/test_bot.py | ||
#setenv = | ||
# PYTEST_ADDOPTS = --log-cli-level=DEBUG | ||
|
||
[testenv:fetcher] | ||
deps = | ||
{[testenv]deps} | ||
-rrequirements-fetcher.txt | ||
commands = pytest -vvv src/tests/test_fetcher_browser_load.py | ||
setenv = | ||
PYTEST_ADDOPTS = --log-cli-level=DEBUG | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
import sqlite3 | ||
import json | ||
|
||
cookies_path = "/home/olegeech/git/mvcr-application-checker/chrome-cookies.sql" | ||
|
||
|
||
def extract_cookies_from_chrome_db(filepath): | ||
conn = sqlite3.connect(filepath) | ||
cursor = conn.cursor() | ||
|
||
# Extract cookies from the database | ||
cursor.execute("SELECT host_key, name, value, path, expires_utc, is_secure, is_httponly FROM cookies") | ||
cookies = cursor.fetchall() | ||
|
||
# Convert cookies into a list of dictionaries | ||
cookies_list = [ | ||
{ | ||
"domain": host_key, | ||
"name": name, | ||
"value": value, | ||
"path": path, | ||
"expiry": expires_utc, | ||
"secure": bool(is_secure), | ||
"httpOnly": bool(is_httponly), | ||
} | ||
for host_key, name, value, path, expires_utc, is_secure, is_httponly in cookies | ||
] | ||
|
||
return cookies_list | ||
|
||
|
||
all_cookies = extract_cookies_from_chrome_db(cookies_path) | ||
|
||
with open("all_cookies.json", "w") as f: | ||
json.dump(all_cookies, f) |