Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1728000 TestExecutor and TestSelector for e2e tests #963

Merged
merged 2 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions test/test_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from datetime import datetime
import sys
import traceback

# TestExecutor is responsible for running a given subset of tests
class TestExecutor:

def execute(self, testSuitList, driver, nameSalt, round=1):
try:
for test in testSuitList:
driver.createConnector(test.getConfigFileName(), nameSalt)

driver.startConnectorWaitTime()

for r in range(round):
print(datetime.now().strftime("\n%H:%M:%S "), "=== round {} ===".format(r))
for test in testSuitList:
print(datetime.now().strftime("\n%H:%M:%S "),
"=== Sending " + test.__class__.__name__ + " data ===")
test.send()
print(datetime.now().strftime("%H:%M:%S "), "=== Done " + test.__class__.__name__ + " ===", flush=True)


driver.verifyWaitTime()

for test in testSuitList:
print(datetime.now().strftime("\n%H:%M:%S "), "=== Verify " + test.__class__.__name__ + " ===")
driver.verifyWithRetry(test.verify, r, test.getConfigFileName())
print(datetime.now().strftime("%H:%M:%S "), "=== Passed " + test.__class__.__name__ + " ===", flush=True)

print(datetime.now().strftime("\n%H:%M:%S "), "=== All test passed ===")
except Exception as e:
print(datetime.now().strftime("%H:%M:%S "), e)
traceback.print_tb(e.__traceback__)
print(datetime.now().strftime("%H:%M:%S "), "Error: ", sys.exc_info()[0], driver.connectorParameters)
exit(1)
18 changes: 18 additions & 0 deletions test/test_selector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from test_suites import create_end_to_end_test_suites
import test_suit

# TestSelector is responsible for selecting a subset of tests to be run
# It is meant to filter tests by platform, cloud vendor or any other predicate needed
class TestSelector:

def selectTestsToBeRun(self, driver, nameSalt, schemaRegistryAddress, testPlatform, allowedTestsCsv):
test_suites = create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testPlatform, allowedTestsCsv)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allowedTestCsv filtering will be moved here as well.


if testPlatform == "apache":
return [single_end_to_end_test.test_instance for single_end_to_end_test in test_suites.values() if single_end_to_end_test.run_in_apache == True]
elif testPlatform == "confluent":
return [single_end_to_end_test.test_instance for single_end_to_end_test in test_suites.values() if single_end_to_end_test.run_in_confluent == True]
elif testPlatform == "clean":
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This clean stuff is very wrong but separating it requires a lot of work.

return [single_end_to_end_test.test_instance for single_end_to_end_test in test_suites.values()]
else:
raise test_suit.test_utils.NonRetryableError("unknown testPlatform={}".format(testPlatform))
91 changes: 12 additions & 79 deletions test/test_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from confluent_kafka.admin import AdminClient, NewTopic, ConfigResource, NewPartitions
from confluent_kafka.avro import AvroProducer
from test_suites import create_end_to_end_test_suites
from test_executor import TestExecutor
from test_selector import TestSelector
import time

import test_suit
Expand Down Expand Up @@ -457,56 +459,26 @@ def runStressTests(driver, testSet, nameSalt):
############################ Stress Tests Round 1 ############################
# TestPressure and TestPressureRestart will only run when Running StressTests
print(datetime.now().strftime("\n%H:%M:%S "), "=== Stress Tests Round 1 ===")
testSuitList = [testPressureRestart]

testSuitEnableList = []
if testSet == "confluent":
testSuitEnableList = [True]
elif testSet == "apache":
testSuitEnableList = [True]
elif testSet != "clean":
errorExit("Unknown testSet option {}, please input confluent, apache or clean".format(testSet))

execution(testSet, testSuitList, testSuitEnableList, driver, nameSalt, round=1)
execution(testSet, [testPressureRestart], driver, nameSalt, round=1)
############################ Stress Tests Round 1 ############################

############################ Stress Tests Round 2 ############################
print(datetime.now().strftime("\n%H:%M:%S "), "=== Stress Tests Round 2 ===")
testSuitList = [testPressure]

testSuitEnableList = []
if testSet == "confluent":
testSuitEnableList = [True]
elif testSet == "apache":
testSuitEnableList = [True]
elif testSet != "clean":
errorExit("Unknown testSet option {}, please input confluent, apache or clean".format(testSet))

execution(testSet, testSuitList, testSuitEnableList, driver, nameSalt, round=1)
execution(testSet, [testPressure], driver, nameSalt, round=1)
############################ Stress Tests Round 2 ############################


def runTestSet(driver, testSet, nameSalt, enable_stress_test, skipProxy, allowedTestsCsv):
if enable_stress_test:
runStressTests(driver, testSet, nameSalt)
else:
test_suites = create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testSet, allowedTestsCsv)

############################ round 1 ############################
print(datetime.now().strftime("\n%H:%M:%S "), "=== Round 1 ===")

end_to_end_tests_suite = [single_end_to_end_test.test_instance for single_end_to_end_test in test_suites.values()]
testSelector = TestSelector()
end_to_end_tests_suite = testSelector.selectTestsToBeRun(driver, nameSalt, schemaRegistryAddress, testSet, allowedTestsCsv)

end_to_end_tests_suite_runner = []

if testSet == "confluent":
end_to_end_tests_suite_runner = [single_end_to_end_test.run_in_confluent for single_end_to_end_test in test_suites.values()]
elif testSet == "apache":
end_to_end_tests_suite_runner = [single_end_to_end_test.run_in_apache for single_end_to_end_test in test_suites.values()]
elif testSet != "clean":
errorExit("Unknown testSet option {}, please input confluent, apache or clean".format(testSet))

execution(testSet, end_to_end_tests_suite, end_to_end_tests_suite_runner, driver, nameSalt)
execution(testSet, end_to_end_tests_suite, driver, nameSalt)

############################ Always run Proxy tests in the end ############################

Expand All @@ -529,57 +501,18 @@ def runTestSet(driver, testSet, nameSalt, enable_stress_test, skipProxy, allowed

end_to_end_proxy_tests_suite = [single_end_to_end_test.test_instance for single_end_to_end_test in proxy_tests_suite]

proxy_suite_runner = []

if testSet == "confluent":
proxy_suite_runner = [single_end_to_end_test.run_in_confluent for single_end_to_end_test in proxy_tests_suite]
elif testSet == "apache":
proxy_suite_runner = [single_end_to_end_test.run_in_apache for single_end_to_end_test in proxy_tests_suite]
elif testSet != "clean":
errorExit("Unknown testSet option {}, please input confluent, apache or clean".format(testSet))

execution(testSet, end_to_end_proxy_tests_suite, proxy_suite_runner, driver, nameSalt)
execution(testSet, end_to_end_proxy_tests_suite, driver, nameSalt)
############################ Proxy End To End Test End ############################


def execution(testSet, testSuitList, testSuitEnableList, driver, nameSalt, round=1):
def execution(testSet, testSuitList, driver, nameSalt, round=1):
if testSet == "clean":
for i, test in enumerate(testSuitList):
for test in testSuitList:
test.clean()
print(datetime.now().strftime("\n%H:%M:%S "), "=== All clean done ===")
else:
try:
for i, test in enumerate(testSuitList):
if testSuitEnableList[i]:
driver.createConnector(test.getConfigFileName(), nameSalt)

driver.startConnectorWaitTime()

for r in range(round):
print(datetime.now().strftime("\n%H:%M:%S "), "=== round {} ===".format(r))
for i, test in enumerate(testSuitList):
if testSuitEnableList[i]:
print(datetime.now().strftime("\n%H:%M:%S "),
"=== Sending " + test.__class__.__name__ + " data ===")
test.send()
print(datetime.now().strftime("%H:%M:%S "), "=== Done " + test.__class__.__name__ + " ===",
flush=True)

driver.verifyWaitTime()

for i, test in enumerate(testSuitList):
if testSuitEnableList[i]:
print(datetime.now().strftime("\n%H:%M:%S "), "=== Verify " + test.__class__.__name__ + " ===")
driver.verifyWithRetry(test.verify, r, test.getConfigFileName())
print(datetime.now().strftime("%H:%M:%S "), "=== Passed " + test.__class__.__name__ + " ===",
flush=True)

print(datetime.now().strftime("\n%H:%M:%S "), "=== All test passed ===")
except Exception as e:
print(datetime.now().strftime("%H:%M:%S "), e)
traceback.print_tb(e.__traceback__)
print(datetime.now().strftime("%H:%M:%S "), "Error: ", sys.exc_info()[0], driver.connectorParameters)
exit(1)
testExecutor = TestExecutor()
testExecutor.execute(testSuitList, driver, nameSalt, round)


def run_test_set_with_parameters(kafka_test: KafkaTest, testSet, nameSalt, pressure, skipProxy, allowedTestsCsv):
Expand Down
Loading