Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore: Benchmarking #1028

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions samples/samples/benchmarking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import threading
from google.cloud import spanner_v1
import time
import math

from google.cloud.spanner_v1 import transaction, pool, _helpers
from google.cloud import spanner
import concurrent.futures

# Define your Spanner instance and database information
project_id = "your_project_id"
instance_id = "your_instance_id"
database_id = "your_database_id"
spanner_client = spanner_v1.Client(project=project_id)

# Create a Spanner database instance
instance = spanner_client.instance(instance_id)
pool = pool.FixedSizePool(size = 10, logging_enabled=True)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • What's the default size size of FixedSizePool ? Can we avoid passing size = 10 and use the default size? That will take us closer to what a general customer will use.
  • @surbhigarg92 Are we using logging_enabled=True as a global option? I thought we had refactored this to be a pool option similar to close_inactive_transactions

database = instance.database(pool=pool, database_id=database_id, close_inactive_transactions=True)

transaction_time = []
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A global variable for transaction_time is mixing up the results of different kinds of transactions. I see you have 3 types here - executeSql, batch transaction and DML (insert). It would be good to profile what is the latency for each type of transaction so that we understand if any of them is regressing or not.



def calculatePercentile(latencies):
# sort the latencies array
latencies.sort()

# calculate p50 (50th percentile)
p50Index = math.floor(0.5*len(latencies))
p50Latency = latencies[p50Index]

# calculate p90 (90th percentile)
p90Index = math.floor(0.9*len(latencies))
p90Latency = latencies[p90Index]

return [p50Latency, p90Latency]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also evaluate p99.


# [START spanner_query_data]
def query_data(thread_id):
print("running thread ", thread_id)
start_time = time.time()
time.sleep(10)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this sleep time, instead you can increase the # of transaction per run if you want the benchmark to execute over a long period of time.

with database.snapshot() as snapshot:
results = snapshot.execute_sql(
"SELECT 1 FROM Singers"
)

# for row in results:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: could you remove the commented out code?

# print(row)

# for row in results:
# print("SingerId: {}, FirstName: {}, LastName: {}".format(*row))

end_time = time.time()
transaction_time.append(end_time-start_time)
# [END spanner_query_data]

# [START spanner_batch_transaction]
def batch_transaction(thread_id):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this function batch_transaction invoked? Similarly where is insert_with_dml invoked?

print("running thread ", thread_id)
start_time = time.time()
time.sleep(10)
batch_txn = database.batch_snapshot()
batches = batch_txn.execute_sql(
'SELECT * FROM Singers',
)
results = []
for batch in batches:
results.append("SingerId: {}, FirstName: {}, LastName: {}".format(*batch))

# for batch in batches:
# for row in batch_txn.process_read_batch(batch):
# results.append("SingerId: {}, FirstName: {}, LastName: {}".format(*row))
# for result in results:
# print(result)

end_time = time.time()
transaction_time.append(end_time-start_time)
# [END spanner_batch_transaction]

# [START insert_with_dml]
def insert_with_dml(i):
"""Inserts data with a DML statement into the database."""
print("running thread ", i)
start_time = time.time()
time.sleep(10)

def insert_singers(transaction):
row_ct = transaction.execute_update(
"INSERT Singers (SingerId, FirstName, LastName) VALUES ({}, 'Google{}', 'India{}')".format(i, i, i)
)
print("{} record(s) inserted.".format(row_ct))

database.run_in_transaction(insert_singers)
end_time = time.time()
transaction_time.append(end_time-start_time)
# [END insert_with_dml]

# Define the number of threads
num_threads = 20
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • At max only 10 threads will be used up since there are only 10 sessions. Post that 10 threads will always wait.
  • Currently you are running 1 query per thread, it will be good to increase the number of transactions per thread. For example, 2000 transactions in total. 20 threads will make it 200 transactions per thread. This will ensure the run executes for more time and does not finish in < 1 second. This will also allow you to remove sleep() of 10s.

starting = 1

# Create and start the threads
threads = []
start = time.time()
for i in range(starting,starting+num_threads):
thread = threading.Thread(target=query_data, args=(i,))
thread.start()
threads.append(thread)


# Wait for all threads to complete
for thread in threads:
thread.join()

print("All threads have completed.")
end = time.time()

# Print the total execution time
print("total time taken by the execution: ", end-start)

#Writing transaction time to an output file
for t in transaction_time:
with open ('output.txt', 'a') as file:
file.write(str(t)+"\n")
# latency = calculatePercentile(transaction_time)
# print("p50 latency is: ", latency[0])
# print("p90 latency is: ", latency[1])