diff --git a/baseline1.py b/baseline1.py index dafe804..532c73f 100644 --- a/baseline1.py +++ b/baseline1.py @@ -1,5 +1,5 @@ from config import ATTRIBUTE, FILE_SIZE -from util import getData, createStream, validate +from util import getData, createStream, validate, ENCODE_FORMAT STREAM = 'data' DO_VALIDATION = False @@ -11,7 +11,7 @@ def createStreams(api): def insert(api, data): for line in data: - hexstr = line.encode('utf-8').hex() + hexstr = line.encode(ENCODE_FORMAT).hex() attributes = line.split(" ") for i in range(len(ATTRIBUTE)): api.publish(STREAM, ATTRIBUTE[i] + attributes[i], hexstr) diff --git a/baseline3.py b/baseline3.py new file mode 100644 index 0000000..9f6f50d --- /dev/null +++ b/baseline3.py @@ -0,0 +1,66 @@ +from config import ATTRIBUTE, ATTRIBUTE_NAME, FILE_SIZE, NUM_NODE +from util import getData, createStream, validate, ENCODE_FORMAT +from sortedcontainers import SortedList + + +DO_VALIDATION = False + +att_dict = {key: value for key, value in zip(ATTRIBUTE, ATTRIBUTE_NAME)} +att_name_index = {value: counter for counter, + value in enumerate(ATTRIBUTE_NAME)} + + +def createStreams(api): + for att in ATTRIBUTE_NAME: + createStream(api, att) + + +def insert(api, data): + result = api.listunspent(0) + txid = result["result"][0]["txid"] + vout = result["result"][0]["vout"] + address = api.getaddresses()["result"][0] + for line in data: + hexstr = line.encode(ENCODE_FORMAT).hex() + values = line.split(" ") + data = [] + for att, v in zip(ATTRIBUTE_NAME, values): + data.append({"for": att, "key": v, "data": hexstr}) + txid = api.createrawtransaction( + [{'txid': txid, 'vout': vout}], {address: 0}, data, 'send')["result"] + vout = 0 + + +def pointQuery(api, attribute, sort=False, reverse=False): + result = api.liststreamkeyitems( + att_dict[attribute[0]], attribute[1:], False, FILE_SIZE) + if DO_VALIDATION: + validate(getData(result["result"]), attribute[1:]) + + # input(getData(result["result"])) + return getData(result["result"]) + + +def rangeQuery(api, start, end): + result = [] + stream = att_dict['T'] + timestamps = api.liststreamkeys(stream)["result"] + sl = SortedList(list(map(int, [key['key'] for key in timestamps]))) + for timestamp in sl.irange(start, end): + result += getData(api.liststreamkeyitems(stream, + str(timestamp))['result']) + return result + + +def andQuery(api, attributes): + resultSet = [] + for attr in attributes: + resultSet.append(set(pointQuery(api, attr))) + result = resultSet[0] + for i in range(1, len(resultSet)): + result &= resultSet[i] + return list(result) + + +def sortResult(results, attribute, reverse=False): + return results.sort(reverse=reverse, key=lambda line: int(line.split(" ")[att_name_index[attribute]])) diff --git a/baseline3sort.py b/baseline3sort.py new file mode 100644 index 0000000..8cbb008 --- /dev/null +++ b/baseline3sort.py @@ -0,0 +1,71 @@ +from config import ATTRIBUTE, ATTRIBUTE_NAME, FILE_SIZE, NUM_NODE, ATTRIBUTE_TYPE +from util import getData, createStream, validate, ENCODE_FORMAT +from sortedcontainers import SortedList + + +DO_VALIDATION = False + +att_dict = {key: value for key, value in zip(ATTRIBUTE, ATTRIBUTE_NAME)} +att_name_index = {value: counter for counter, + value in enumerate(ATTRIBUTE_NAME)} + + +def createStreams(api): + for att in ATTRIBUTE_NAME: + createStream(api, att) + + +def insert(api, data): + result = api.listunspent(0) + txid = result["result"][0]["txid"] + vout = result["result"][0]["vout"] + address = api.getaddresses()["result"][0] + for line in data: + hexstr = line.encode(ENCODE_FORMAT).hex() + values = line.split(" ") + data = [] + for att, v in zip(ATTRIBUTE_NAME, values): + data.append({"for": att, "key": v, "data": hexstr}) + txid = api.createrawtransaction( + [{'txid': txid, 'vout': vout}], {address: 0}, data, 'send')["result"] + vout = 0 + + +def pointQuery(api, attribute, sort=True, reverse=False): + result = api.liststreamkeyitems( + att_dict[attribute[0]], attribute[1:], False, FILE_SIZE) + if DO_VALIDATION: + validate(getData(result["result"]), attribute[1:]) + result = getData(result["result"]) + att_name = att_dict[attribute[0]] + if type(ATTRIBUTE_TYPE[att_name]) is int and sort: + result = sortResult(result, att_name, reverse) + # print(*result, sep='\n') + # input() + return result + + +def rangeQuery(api, start, end, reverse=False): + result = [] + stream = att_dict['T'] + timestamps = api.liststreamkeys(stream)["result"] + sl = SortedList(list(map(int, [key['key'] for key in timestamps]))) + for timestamp in sl.irange(start, end): + result += getData(api.liststreamkeyitems(stream, + str(timestamp))['result']) + result = sortResult(result, stream, reverse) + return result + + +def andQuery(api, attributes): + resultSet = [] + for attr in attributes: + resultSet.append(set(pointQuery(api, attr, sort=False))) + result = resultSet[0] + for i in range(1, len(resultSet)): + result &= resultSet[i] + return list(result) + + +def sortResult(results, attribute, reverse=False): + return results.sort(reverse=reverse, key=lambda line: int(line.split(" ")[att_name_index[attribute]])) diff --git a/benchmark.py b/benchmark.py index 3a60889..7660523 100644 --- a/benchmark.py +++ b/benchmark.py @@ -132,13 +132,13 @@ def insertionTest(): output_json['insertion'] = total/NUM_NODE -def getAverageNodeRound(func, *args): +def getAverageNodeRound(func, *args, rounds=MAX_ROUND): elapsed = 0 # log.debug(args) - for i in range(MAX_ROUND): + for i in range(rounds): for j in range(NUM_NODE): elapsed += measure(func, nodes[j], *args) - return elapsed / (MAX_ROUND * NUM_NODE) + return elapsed / (rounds * NUM_NODE) def pointQueryTest(): @@ -149,26 +149,15 @@ def pointQueryTest(): elapsed = 0 fields = testcases['pointQuery'][i].split(" ") qtime = getAverageNodeRound(baseline.pointQuery, - ATTRIBUTE[i] + fields[i]) - # for j in range(MAX_ROUND): - # for k in range(NUM_NODE): - # elapsed += measure(baseline.pointQuery, - # (nodes[k], ATTRIBUTE[i] + fields[i])) - # qtime = elapsed / (MAX_ROUND * NUM_NODE) + ATTRIBUTE[i] + fields[i], rounds=50) total += qtime log.info('Q%d[%s]: %f' % (i+1, ATTRIBUTE_NAME[i], qtime)) output_json['point_query'][ATTRIBUTE_NAME[i]] = qtime - # additional test is for querying nonexistent record - elapsed = 0 - for j in range(MAX_ROUND): - for k in range(NUM_NODE): - elapsed += measure(baseline.pointQuery, nodes[k], " ") + qtime = elapsed / (MAX_ROUND * NUM_NODE) total += qtime - log.info('Q%d[Empty]: %f' % (i+2, qtime)) log.info('Average Query Time: %f' % (total / TESTCASE_CONFIG['pointQuery'])) - # output_json['point_query'] = total / (sampleNum * NUM_NODE) def rangeQueryTest(): @@ -180,11 +169,10 @@ def rangeQueryTest(): total = 0 for scale in RANGE_SCALE: qtime = getAverageNodeRound( - baseline.rangeQuery, int(start), int(start) + scale) + baseline.rangeQuery, int(start), int(start) + scale, rounds=1) total += qtime log.info('Range %d: %f' % (scale, qtime)) output_json['range_query'][scale] = qtime - # output_json['rangeQuery'] = total/NUM_NODE def andQueryTest(): diff --git a/benchmark_img/benchmark3.png b/benchmark_img/benchmark3.png new file mode 100644 index 0000000..5b7d482 Binary files /dev/null and b/benchmark_img/benchmark3.png differ diff --git a/config.py b/config.py index 6dc8651..2740185 100644 --- a/config.py +++ b/config.py @@ -12,6 +12,8 @@ ATTRIBUTE = ['T', 'N', 'I', 'r', 'U', 'A', 'R'] ATTRIBUTE_NAME = ['Timestamp', 'Node', 'ID', 'Ref-ID', 'User', 'Activity', 'Resource'] +ATTRIBUTE_TYPE = {'Timestamp': int, 'Node': int, 'ID': int, + 'Ref-ID': int, 'User': int, 'Activity': str, 'Resource': str} datadir = './testData/' diff --git a/util.py b/util.py index c1f97c3..b205210 100644 --- a/util.py +++ b/util.py @@ -1,7 +1,7 @@ from Savoir import Savoir from timeit import default_timer as timer from config import NUM_NODE -import matplotlib.pyplot as plt +from sortedcontainers import SortedList ENCODE_FORMAT = 'utf-8' @@ -42,8 +42,6 @@ def getData(result, isHex=False): if result is None: return [] for item in result: - # print("AAAAA", item) - # if item is not None: if isHex: data.append(item['data']) else: @@ -65,7 +63,3 @@ def validate(lines: str, *attributes, verbose=False): if flag: result += line return result - - -def draw(files=None): - pass