Skip to content

Commit

Permalink
SNOW-1728000 TestExecutor and TestSelector for e2e tests (#963)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski authored Oct 17, 2024
1 parent b6ffff3 commit bc8e824
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 79 deletions.
36 changes: 36 additions & 0 deletions test/test_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from datetime import datetime
import sys
import traceback

# TestExecutor is responsible for running a given subset of tests
class TestExecutor:

def execute(self, testSuitList, driver, nameSalt, round=1):
try:
for test in testSuitList:
driver.createConnector(test.getConfigFileName(), nameSalt)

driver.startConnectorWaitTime()

for r in range(round):
print(datetime.now().strftime("\n%H:%M:%S "), "=== round {} ===".format(r))
for test in testSuitList:
print(datetime.now().strftime("\n%H:%M:%S "),
"=== Sending " + test.__class__.__name__ + " data ===")
test.send()
print(datetime.now().strftime("%H:%M:%S "), "=== Done " + test.__class__.__name__ + " ===", flush=True)


driver.verifyWaitTime()

for test in testSuitList:
print(datetime.now().strftime("\n%H:%M:%S "), "=== Verify " + test.__class__.__name__ + " ===")
driver.verifyWithRetry(test.verify, r, test.getConfigFileName())
print(datetime.now().strftime("%H:%M:%S "), "=== Passed " + test.__class__.__name__ + " ===", flush=True)

print(datetime.now().strftime("\n%H:%M:%S "), "=== All test passed ===")
except Exception as e:
print(datetime.now().strftime("%H:%M:%S "), e)
traceback.print_tb(e.__traceback__)
print(datetime.now().strftime("%H:%M:%S "), "Error: ", sys.exc_info()[0], driver.connectorParameters)
exit(1)
18 changes: 18 additions & 0 deletions test/test_selector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from test_suites import create_end_to_end_test_suites
import test_suit

# TestSelector is responsible for selecting a subset of tests to be run
# It is meant to filter tests by platform, cloud vendor or any other predicate needed
class TestSelector:

def selectTestsToBeRun(self, driver, nameSalt, schemaRegistryAddress, testPlatform, allowedTestsCsv):
test_suites = create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testPlatform, allowedTestsCsv)

if testPlatform == "apache":
return [single_end_to_end_test.test_instance for single_end_to_end_test in test_suites.values() if single_end_to_end_test.run_in_apache == True]
elif testPlatform == "confluent":
return [single_end_to_end_test.test_instance for single_end_to_end_test in test_suites.values() if single_end_to_end_test.run_in_confluent == True]
elif testPlatform == "clean":
return [single_end_to_end_test.test_instance for single_end_to_end_test in test_suites.values()]
else:
raise test_suit.test_utils.NonRetryableError("unknown testPlatform={}".format(testPlatform))
91 changes: 12 additions & 79 deletions test/test_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from confluent_kafka.admin import AdminClient, NewTopic, ConfigResource, NewPartitions
from confluent_kafka.avro import AvroProducer
from test_suites import create_end_to_end_test_suites
from test_executor import TestExecutor
from test_selector import TestSelector
import time

import test_suit
Expand Down Expand Up @@ -457,56 +459,26 @@ def runStressTests(driver, testSet, nameSalt):
############################ Stress Tests Round 1 ############################
# TestPressure and TestPressureRestart will only run when Running StressTests
print(datetime.now().strftime("\n%H:%M:%S "), "=== Stress Tests Round 1 ===")
testSuitList = [testPressureRestart]

testSuitEnableList = []
if testSet == "confluent":
testSuitEnableList = [True]
elif testSet == "apache":
testSuitEnableList = [True]
elif testSet != "clean":
errorExit("Unknown testSet option {}, please input confluent, apache or clean".format(testSet))

execution(testSet, testSuitList, testSuitEnableList, driver, nameSalt, round=1)
execution(testSet, [testPressureRestart], driver, nameSalt, round=1)
############################ Stress Tests Round 1 ############################

############################ Stress Tests Round 2 ############################
print(datetime.now().strftime("\n%H:%M:%S "), "=== Stress Tests Round 2 ===")
testSuitList = [testPressure]

testSuitEnableList = []
if testSet == "confluent":
testSuitEnableList = [True]
elif testSet == "apache":
testSuitEnableList = [True]
elif testSet != "clean":
errorExit("Unknown testSet option {}, please input confluent, apache or clean".format(testSet))

execution(testSet, testSuitList, testSuitEnableList, driver, nameSalt, round=1)
execution(testSet, [testPressure], driver, nameSalt, round=1)
############################ Stress Tests Round 2 ############################


def runTestSet(driver, testSet, nameSalt, enable_stress_test, skipProxy, allowedTestsCsv):
if enable_stress_test:
runStressTests(driver, testSet, nameSalt)
else:
test_suites = create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testSet, allowedTestsCsv)

############################ round 1 ############################
print(datetime.now().strftime("\n%H:%M:%S "), "=== Round 1 ===")

end_to_end_tests_suite = [single_end_to_end_test.test_instance for single_end_to_end_test in test_suites.values()]
testSelector = TestSelector()
end_to_end_tests_suite = testSelector.selectTestsToBeRun(driver, nameSalt, schemaRegistryAddress, testSet, allowedTestsCsv)

end_to_end_tests_suite_runner = []

if testSet == "confluent":
end_to_end_tests_suite_runner = [single_end_to_end_test.run_in_confluent for single_end_to_end_test in test_suites.values()]
elif testSet == "apache":
end_to_end_tests_suite_runner = [single_end_to_end_test.run_in_apache for single_end_to_end_test in test_suites.values()]
elif testSet != "clean":
errorExit("Unknown testSet option {}, please input confluent, apache or clean".format(testSet))

execution(testSet, end_to_end_tests_suite, end_to_end_tests_suite_runner, driver, nameSalt)
execution(testSet, end_to_end_tests_suite, driver, nameSalt)

############################ Always run Proxy tests in the end ############################

Expand All @@ -529,57 +501,18 @@ def runTestSet(driver, testSet, nameSalt, enable_stress_test, skipProxy, allowed

end_to_end_proxy_tests_suite = [single_end_to_end_test.test_instance for single_end_to_end_test in proxy_tests_suite]

proxy_suite_runner = []

if testSet == "confluent":
proxy_suite_runner = [single_end_to_end_test.run_in_confluent for single_end_to_end_test in proxy_tests_suite]
elif testSet == "apache":
proxy_suite_runner = [single_end_to_end_test.run_in_apache for single_end_to_end_test in proxy_tests_suite]
elif testSet != "clean":
errorExit("Unknown testSet option {}, please input confluent, apache or clean".format(testSet))

execution(testSet, end_to_end_proxy_tests_suite, proxy_suite_runner, driver, nameSalt)
execution(testSet, end_to_end_proxy_tests_suite, driver, nameSalt)
############################ Proxy End To End Test End ############################


def execution(testSet, testSuitList, testSuitEnableList, driver, nameSalt, round=1):
def execution(testSet, testSuitList, driver, nameSalt, round=1):
if testSet == "clean":
for i, test in enumerate(testSuitList):
for test in testSuitList:
test.clean()
print(datetime.now().strftime("\n%H:%M:%S "), "=== All clean done ===")
else:
try:
for i, test in enumerate(testSuitList):
if testSuitEnableList[i]:
driver.createConnector(test.getConfigFileName(), nameSalt)

driver.startConnectorWaitTime()

for r in range(round):
print(datetime.now().strftime("\n%H:%M:%S "), "=== round {} ===".format(r))
for i, test in enumerate(testSuitList):
if testSuitEnableList[i]:
print(datetime.now().strftime("\n%H:%M:%S "),
"=== Sending " + test.__class__.__name__ + " data ===")
test.send()
print(datetime.now().strftime("%H:%M:%S "), "=== Done " + test.__class__.__name__ + " ===",
flush=True)

driver.verifyWaitTime()

for i, test in enumerate(testSuitList):
if testSuitEnableList[i]:
print(datetime.now().strftime("\n%H:%M:%S "), "=== Verify " + test.__class__.__name__ + " ===")
driver.verifyWithRetry(test.verify, r, test.getConfigFileName())
print(datetime.now().strftime("%H:%M:%S "), "=== Passed " + test.__class__.__name__ + " ===",
flush=True)

print(datetime.now().strftime("\n%H:%M:%S "), "=== All test passed ===")
except Exception as e:
print(datetime.now().strftime("%H:%M:%S "), e)
traceback.print_tb(e.__traceback__)
print(datetime.now().strftime("%H:%M:%S "), "Error: ", sys.exc_info()[0], driver.connectorParameters)
exit(1)
testExecutor = TestExecutor()
testExecutor.execute(testSuitList, driver, nameSalt, round)


def run_test_set_with_parameters(kafka_test: KafkaTest, testSet, nameSalt, pressure, skipProxy, allowedTestsCsv):
Expand Down

0 comments on commit bc8e824

Please sign in to comment.