-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathbulk_operation.py
executable file
·140 lines (115 loc) · 5 KB
/
bulk_operation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
"""
Copyright (c) 2018, salesforce.com, inc.
All rights reserved.
SPDX-License-Identifier: BSD-3-Clause
For full license text, see LICENSE.txt file in the repo root or
https://opensource.org/licenses/BSD-3-Clause
This module authenticates with the bulk api service and uses the bulk api
to read QuoteLine__c data and insert custom product recommendation objects.
"""
import os
import json
import time
import logging
from salesforce_bulk.util import IteratorBytesIO
from salesforce_bulk import SalesforceBulk
from salesforce_bulk import CsvDictsAdapter
import recommend
logging.basicConfig(
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
RECOMMENDATION_OBJECT = 'ProductRecommendation__c'
PRODUCT_ID_FIELD = 'Product2Id__c'
RECOMMENDED_PRODUCT_ID_FIELD = 'RecommendedProduct2Id__c'
SCORE_FIELD = 'Score__c'
PRODUCT_OBJECT = 'SBQQ__Product__c'
QUOTE_OBJECT = 'SBQQ__Quote__c'
QUOTELINE_OBJECT = 'SBQQ__QuoteLine__c'
bulk = None
def get_recommendation_record(product_id, recommended_id, score):
return dict({PRODUCT_ID_FIELD: product_id,
RECOMMENDED_PRODUCT_ID_FIELD: recommended_id,
SCORE_FIELD: score})
def query_from_db(*fields, object_name=None, where_clause=None):
if object_name is None:
raise ValueError('object_name is not provided.')
job = bulk.create_query_job(object_name, contentType='JSON')
query = 'select ' + ', '.join(fields) + ' from ' + object_name
if where_clause is not None:
query += ' where ' + where_clause
logging.info(query)
batch = bulk.query(job, query)
bulk.close_job(job)
try:
while not bulk.is_batch_done(batch):
logging.info('batch status: %s', bulk.batch_status(batch_id=batch)['state'])
time.sleep(10)
finally:
if not bulk.is_batch_done(batch):
bulk.abort_job(job)
logging.info('aborted job')
records = []
for result in bulk.get_all_results_for_query_batch(batch):
result = json.load(IteratorBytesIO(result))
for row in result:
records.append(row)
return records
def get_all_quotes_from_db():
return query_from_db(PRODUCT_OBJECT, QUOTE_OBJECT, object_name=QUOTELINE_OBJECT)
def insert_recommendations_in_db(product_id_vs_recommended_ids):
if len(product_id_vs_recommended_ids) == 0:
return
job = bulk.create_insert_job(RECOMMENDATION_OBJECT, contentType='CSV',
concurrency='Parallel')
recommendations = []
for product_id in product_id_vs_recommended_ids:
recommended_id_vs_scores = product_id_vs_recommended_ids[product_id]
for recommended_id, score in recommended_id_vs_scores.items():
recommendations.append(get_recommendation_record(product_id, recommended_id, score))
if not recommendations:
return
csv_iter_recommendations = CsvDictsAdapter(iter(recommendations))
batch_insert_recommendations = bulk.post_batch(job, csv_iter_recommendations)
bulk.wait_for_batch(job, batch_insert_recommendations)
bulk.close_job(job)
logging.info("Done. Recommendations uploaded.")
def delete_recommendations_in_db(product_ids):
"""delete all recommendation object records for product_ids"""
if not product_ids:
return
where_clause = '{0} IN ({1})'.format(PRODUCT_ID_FIELD,
', '.join("'{0}'".format(w) for w in product_ids))
recommendation_object_ids = query_from_db('Id',
object_name=RECOMMENDATION_OBJECT,
where_clause=where_clause)
ids = [{'Id': row['Id']} for row in recommendation_object_ids]
if not ids:
logging.info("Done. No Recommendations to delete.")
return
job = bulk.create_delete_job(RECOMMENDATION_OBJECT,
contentType='CSV',
concurrency='Parallel')
csv_iter_recommendations = CsvDictsAdapter(iter(ids))
batch_delete_recommendations = bulk.post_batch(job, csv_iter_recommendations)
bulk.wait_for_batch(job, batch_delete_recommendations)
bulk.close_job(job)
logging.info("Done. Recommendations deleted.")
def login():
global bulk
logging.info('logging in...')
# domain passed to SalesforceBulk should be 'test' or 'login' or 'something.my'
bulk = SalesforceBulk(username=os.environ['ORG_USERNAME'], password=os.environ['ORG_PASSWORD'],
security_token=os.environ['ORG_SECURITY_TOKEN'], domain=os.environ['ORG_DOMAIN'])
logging.info('login successful !')
def run():
login()
quote_lines = get_all_quotes_from_db()
if not quote_lines:
return
product_id_vs_recommended_ids = recommend.get_product_recommendations(quote_lines)
if len(product_id_vs_recommended_ids) != 0:
delete_recommendations_in_db(product_id_vs_recommended_ids.keys())
insert_recommendations_in_db(product_id_vs_recommended_ids)
if __name__ == '__main__':
run()