-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcustomers.py
83 lines (59 loc) · 2.38 KB
/
customers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""Provides tasks for downloading all gomus customers into the database."""
import datetime as dt
import os
import luigi
from luigi.format import UTF8
import pandas as pd
from _utils import CsvToDb, DataPreparationTask
from ._utils.cleanse_data import CleansePostalCodes
from ._utils.extract_customers import hash_id
from ._utils.fetch_report import FetchGomusReport
class CustomersToDb(CsvToDb):
amount = luigi.parameter.Parameter(default='regular')
today = luigi.parameter.DateParameter(default=dt.datetime.today())
table = 'gomus_customer'
def requires(self):
return CleansePostalCodes(
amount=self.amount,
columns=[col[0] for col in self.columns],
today=self.today)
class GomusToCustomerMappingToDb(CsvToDb):
table = 'gomus_to_customer_mapping'
today = luigi.parameter.DateParameter(default=dt.datetime.today())
def requires(self):
return ExtractGomusToCustomerMapping(
columns=[col[0] for col in self.columns],
table=self.table,
today=self.today)
class ExtractGomusToCustomerMapping(DataPreparationTask):
columns = luigi.parameter.ListParameter(description="Column names")
today = luigi.parameter.DateParameter(default=dt.datetime.today())
def _requires(self):
return luigi.task.flatten([
CustomersToDb(today=self.today),
super()._requires()
])
def requires(self):
suffix = '_1day' if self.minimal_mode else \
f'_{os.getenv("GOMUS_HISTORIC_DAYS", 7)}days'
return FetchGomusReport(report='customers',
today=self.today,
suffix=suffix)
def output(self):
return luigi.LocalTarget(
f'{self.output_dir}/gomus/gomus_to_customers_mapping.csv',
format=UTF8
)
def run(self):
with next(self.input()).open('r') as input_csv:
df = pd.read_csv(input_csv)
df = df.filter(['Nummer', 'E-Mail'])
df.columns = self.columns
df['gomus_id'] = df['gomus_id'].apply(int)
df['customer_id'] = df.apply(
lambda x: hash_id(
x['customer_id'], alternative=x['gomus_id']
), axis=1)
df = self.filter_fkey_violations(df)
with self.output().open('w') as output_csv:
df.to_csv(output_csv, index=False, header=True)