Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update individual list rows #57

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Changelog

## [Version 1.1.4](https://github.com/dataiku/dss-plugin-sharepoint-online/releases/tag/v1.1.4) - Feature release - 2024-07-16
## [Version 1.1.5](https://github.com/dataiku/dss-plugin-sharepoint-online/releases/tag/v1.1.5) - Feature release - 2024-10-15

- Update of individual list records

## [Version 1.1.4](https://github.com/dataiku/dss-plugin-sharepoint-online/releases/tag/v1.1.4) - Feature and bugfix release - 2024-07-16

- Fix writing when using presets with no root folder defined
- Limit string length to the 255 characters SharePoint limit
Expand All @@ -10,6 +14,10 @@

- Add login with Azure AD app certificate

## [Version 1.1.3](https://github.com/dataiku/dss-plugin-sharepoint-online/releases/tag/v1.1.3) - Feature release - 2024-06-04

- Add login with Azure AD app certificate

## [Version 1.1.2](https://github.com/dataiku/dss-plugin-sharepoint-online/releases/tag/v1.1.2) - Bugfix release - 2024-05-28

- Fix path creation inside read-only parent directory
Expand Down
15 changes: 13 additions & 2 deletions custom-recipes/sharepoint-online-append-list/recipe.json
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,25 @@
"name": "write_mode",
"label": "Write mode",
"type": "SELECT",
"defaultValue": "append",
"selectChoices": [
{
"value": "append",
"label": "Append to existing list"
},
{
"value": "update",
"label": "Update items in existing list"
}
],
"visibilityCondition": false
"defaultValue": "append",
"visibilityCondition": "model.advanced_parameters == true"
},
{
"name": "columns_to_update",
"label": "Columns to update",
"type": "COLUMNS",
"columnRole": "input_dataset",
"visibilityCondition": "model.advanced_parameters == true && model.write_mode == 'update'"
},
{
"name": "max_workers",
Expand Down
25 changes: 19 additions & 6 deletions custom-recipes/sharepoint-online-append-list/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ def convert_date_format(json_row):
return json_row


def get_write_mode(config):
write_mode = config.get("write_mode", "append")
if write_mode == "create":
write_mode = "append"
return write_mode


input_dataset_names = get_input_names_for_role('input_dataset')
input_dataset = dataiku.Dataset(input_dataset_names[0])
input_dataframe = input_dataset.get_dataframe()
Expand All @@ -43,7 +50,11 @@ def convert_date_format(json_row):
expand_lookup = config.get("expand_lookup", False)
metadata_to_retrieve = config.get("metadata_to_retrieve", [])
advanced_parameters = config.get("advanced_parameters", False)
write_mode = "append"
write_mode = get_write_mode(config)
columns_to_update = config.get("columns_to_update", [])
if columns_to_update and "ID" not in columns_to_update:
columns_to_update.append("ID")

if not advanced_parameters:
max_workers = 1 # no multithread per default
batch_size = 100
Expand All @@ -57,11 +68,13 @@ def convert_date_format(json_row):
display_metadata = len(metadata_to_retrieve) > 0
client = SharePointClient(config)

sharepoint_writer = client.get_writer({"columns": input_schema}, None, None, max_workers, batch_size, write_mode)
sharepoint_writer = client.get_writer({"columns": input_schema}, None, None, max_workers,
batch_size, write_mode, columns_to_update)


with output_dataset.get_writer() as writer:
for index, input_parameters_row in input_dataframe.iterrows():
json_row = input_parameters_row.to_dict()
json_row = convert_date_format(json_row)
sharepoint_writer.write_row_dict(json_row)
writer.write_row_dict(json_row)
straighten_json_row = sharepoint_writer.pandas_row_to_json(input_parameters_row)
sharepoint_writer.write_row_dict(straighten_json_row)
writer.write_row_dict(sharepoint_writer.fix_dates_for_pandas_output(input_parameters_row))
sharepoint_writer.close()
2 changes: 1 addition & 1 deletion plugin.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"id": "sharepoint-online",
"version": "1.1.4",
"version": "1.1.5",
"meta": {
"label": "SharePoint Online",
"description": "Read and write data from/to your SharePoint Online account",
Expand Down
2 changes: 1 addition & 1 deletion python-lib/dss_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class DSSConstants(object):
"sharepoint_oauth": "The access token is missing"
}
PATH = 'path'
PLUGIN_VERSION = "1.1.4"
PLUGIN_VERSION = "1.1.5"
SECRET_PARAMETERS_KEYS = ["Authorization", "sharepoint_username", "sharepoint_password", "client_secret", "client_certificate", "passphrase"]
SITE_APP_DETAILS = {
"sharepoint_tenant": "The tenant name is missing",
Expand Down
44 changes: 41 additions & 3 deletions python-lib/sharepoint_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,21 @@ def get_item_structure(self, list_title, item):
"checkInComment": None
}

def get_update_list_item_kwargs(self, list_title, list_item_entity_type_full_name, item_id, item_update):
# https://sharepoint.stackexchange.com/questions/250659/how-implement-rest-queries-in-sharepoint
list_item_update_info = self.get_list_item_update(item_update, list_item_entity_type_full_name)
headers = DSSConstants.JSON_HEADERS
headers["X-HTTP-Method"] = "MERGE"
headers["If-Match"] = "*"
update_item_url = self.get_update_item_url(list_title, item_id)
kwargs = {
"verb": "PATCH",
"url": update_item_url,
"headers": headers,
"json": list_item_update_info
}
return kwargs

@staticmethod
def get_form_value(field_name, field_value):
return {
Expand All @@ -564,6 +579,17 @@ def get_list_item_create_info(self, list_title):
}
}

@staticmethod
def get_list_item_update(item, list_item_entity_type_full_name):
# https://learn.microsoft.com/en-us/sharepoint/dev/sp-add-ins/working-with-lists-and-list-items-with-rest
ret = {}
ret["__metadata"] = {
"type": "{}".format(list_item_entity_type_full_name)
}
for field_name in item:
ret[field_name] = item.get(field_name)
return ret

def process_batch(self, kwargs_array):
batch_id = self.get_random_guid()
change_set_id = self.get_random_guid()
Expand Down Expand Up @@ -628,6 +654,7 @@ def log_batch_errors(self, response, kwargs_array):
logger.info("Batch error analysis")
statuses = re.findall('HTTP/1.1 (.*?) ', str(response.content))
dump_response_content = False
reason_to_raise = None
for status, kwarg in zip(statuses, kwargs_array):
if not status.startswith("20"):
if dump_response_content:
Expand All @@ -644,6 +671,7 @@ def log_batch_errors(self, response, kwargs_array):
error_messages = re.findall('"ErrorMessage":"(.*?)}', str(response.content))
for error_message in error_messages:
logger.warning("Error:'{}'".format(error_message))
reason_to_raise = error_message
if dump_response_content:
if self.number_dumped_logs == 0:
logger.warning("response.content={}".format(response.content))
Expand All @@ -652,6 +680,8 @@ def log_batch_errors(self, response, kwargs_array):
self.number_dumped_logs += 1
else:
logger.info("Batch error analysis OK")
if reason_to_raise:
raise SharePointClientError("There was at least one issue during batch processing ({}). Look into the logs for more details.".format(reason_to_raise))

def get_base_url(self):
return "{}/{}/_api/Web".format(
Expand Down Expand Up @@ -689,6 +719,13 @@ def get_list_add_item_using_path_url(self, list_title):
self.escape_path(list_title)
)

def get_update_item_url(self, list_title, item_id):
return self.get_base_url() + "/GetList(@a1)/items({})?@a1='/{}/Lists/{}'".format(
item_id,
self.sharepoint_site,
self.escape_path(list_title)
)

def get_list_fields_url(self, list_title):
return self.get_lists_by_title_url(list_title) + "/fields"

Expand Down Expand Up @@ -716,7 +753,7 @@ def get_file_url(self, full_path):
)

def get_file_content_url(self, full_path):
return self.get_file_url(full_path) + "/$value"
return self.get_file_url(full_path.replace("#", "%23")) + "/$value"

def get_move_url(self, from_path, to_path):
# Using the new method leads to 403.
Expand Down Expand Up @@ -918,7 +955,7 @@ def escape_path(path):
return path.replace("'", "''")

def get_writer(self, dataset_schema, dataset_partitioning,
partition_id, max_workers, batch_size, write_mode):
partition_id, max_workers, batch_size, write_mode, columns_to_update=[]):
return SharePointListWriter(
self.config,
self,
Expand All @@ -927,7 +964,8 @@ def get_writer(self, dataset_schema, dataset_partitioning,
partition_id,
max_workers=max_workers,
batch_size=batch_size,
write_mode=write_mode
write_mode=write_mode,
columns_to_update=columns_to_update
)

def get_read_schema(self, display_metadata=False, metadata_to_retrieve=[]):
Expand Down
82 changes: 76 additions & 6 deletions python-lib/sharepoint_lists.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import pandas
from concurrent.futures import ThreadPoolExecutor, as_completed
from sharepoint_constants import SharePointConstants
from dss_constants import DSSConstants
Expand Down Expand Up @@ -54,6 +55,8 @@ def assert_list_title(list_title):


def dss_to_sharepoint_date(date):
if "T" not in date or "Z" not in date:
return date
return format_date(date, DSSConstants.DATE_FORMAT, SharePointConstants.DATE_FORMAT)


Expand All @@ -78,9 +81,18 @@ def format_date(date, from_format, to_format):
return date


def get_columns_types(input_schema):
columns_types = {}
for column_schema in input_schema:
column_name = column_schema.get("name", "")
column_type = column_schema.get("type", "string")
columns_types[column_name] = column_type
return columns_types


class SharePointListWriter(object):

def __init__(self, config, client, dataset_schema, dataset_partitioning, partition_id, max_workers=5, batch_size=100, write_mode="create"):
def __init__(self, config, client, dataset_schema, dataset_partitioning, partition_id, max_workers=5, batch_size=100, write_mode="create", columns_to_update=[]):
self.client = client
self.config = config
self.dataset_schema = dataset_schema
Expand All @@ -89,10 +101,13 @@ def __init__(self, config, client, dataset_schema, dataset_partitioning, partiti
self.buffer = []
logger.info('init SharepointListWriter with {} workers and batch size of {}'.format(max_workers, batch_size))
self.columns = dataset_schema[SharePointConstants.COLUMNS]
self.dss_columns_types = get_columns_types(self.columns)
self.sharepoint_column_ids = {}
self.sharepoint_existing_column_names = {}
self.sharepoint_existing_column_entity_property_names = {}
self.web_name = self.client.sharepoint_list_title
self.write_mode = write_mode
self.columns_to_update = columns_to_update

if write_mode == SharePointConstants.WRITE_MODE_CREATE:
logger.info('flush:recycle_list "{}"'.format(self.client.sharepoint_list_title))
Expand Down Expand Up @@ -132,14 +147,52 @@ def write_row(self, row):
def write_row_dict(self, row_dict):
row = []
for element in row_dict:
row.append(str(row_dict.get(element)))
row.append(row_dict.get(element))
self.write_row(row)

def pandas_row_to_json(self, input_pandas_row):
input_row = input_pandas_row.to_dict()
# Now lets fix what has been savaged by the panda
output_row = {}
for key in input_row:
target_type = self.dss_columns_types.get(key)
value = input_row.get(key)
if not value or pandas.isna(value):
straighten_value = None
elif target_type in ["int", "bigint"]:
if isinstance(value, int):
straighten_value = str(value)
else:
# If there was one NaN in the int column, the whole column has been converted in float
# Because, obviously...
straighten_value = str(int(value))
else:
straighten_value = str(value)
output_row[key] = straighten_value
return output_row

def fix_dates_for_pandas_output(self, input_pandas_row):
input_row = input_pandas_row.to_dict()
fixed_output_row = {}
for key in input_row:
target_type = self.dss_columns_types.get(key)
value = input_row.get(key)
if pandas.isna(value):
fixed_output_row[key] = None
elif target_type == "date":
fixed_output_row[key] = value.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
else:
fixed_output_row[key] = value
return fixed_output_row

def flush(self):
if self.max_workers > 1:
self.upload_rows_multithreaded()
if self.write_mode == "update":
self.update_rows()
else:
self.upload_rows()
if self.max_workers > 1:
self.upload_rows_multithreaded()
else:
self.upload_rows()

def upload_rows_multithreaded(self):
logger.info("Starting multithreaded rows adding")
Expand Down Expand Up @@ -171,6 +224,20 @@ def upload_rows(self):
self.client.process_batch(kwargs)
logger.info("{} items written".format(len(kwargs)))

def update_rows(self):
logger.info("Starting updating items")
kwargs = []
for row in self.buffer:
item = self.build_row_dictionary(row, self.columns_to_update)
item_id = item.pop("ID", None)
if item_id is None:
kwargs.append(self.client.get_add_list_item_kwargs(self.web_name, item))
# raise Exception("Item in column 'ID' cannot be left empty")
else:
kwargs.append(self.client.get_update_list_item_kwargs(self.web_name, self.list_item_entity_type_full_name, item_id, item))
self.client.process_batch(kwargs)
logger.info("{} items written".format(len(kwargs)))

def create_sharepoint_columns(self):
""" Create the list's columns on SP, retrieve their SP id and map it to their DSS column name """
logger.info("create_sharepoint_columns")
Expand Down Expand Up @@ -198,9 +265,12 @@ def create_sharepoint_columns(self):
else:
self.sharepoint_column_ids[dss_column_name] = dss_column_name

def build_row_dictionary(self, row):
def build_row_dictionary(self, row, columns_to_update=None):
ret = {}
for column, structure in zip(row, self.columns):
if columns_to_update:
if structure[SharePointConstants.NAME_COLUMN] not in columns_to_update:
continue
key_to_use = self.sharepoint_existing_column_names.get(
structure[SharePointConstants.NAME_COLUMN],
self.sharepoint_column_ids[structure[SharePointConstants.NAME_COLUMN]]
Expand Down
4 changes: 4 additions & 0 deletions tests/python/integration/test_scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ def test_run_sharepoint_online_append_to_list_recipe(user_dss_clients):
dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="APPENDTOLISTRECIPE")


def test_run_sharepoint_online_update_individual_list_rows(user_dss_clients):
dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="UPDATEINDIVIDUALLISTROWS")


def test_run_sharepoint_online_write_file_in_path_w_ro_parent(user_dss_clients):
dss_scenario.run(user_dss_clients, project_key=TEST_PROJECT_KEY, scenario_id="SC169288_WRITE_FILE_WITH_RO_PARENT_FOLDER")

Expand Down