-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Import transactions by quarter, in chunks of a few hundred #212
Changes from all commits
2b8dd5e
17b9b27
9bf8ad3
422d309
45d6ab6
2eb539d
33f3c33
c5645ce
c453cb5
0c189ca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,26 @@ | ||
THIS_YEAR=$(shell date +"%Y") | ||
NIGHTLY_YEARS=$(shell seq 2023 $(THIS_YEAR)) | ||
QUARTERLY_YEARS=$(shell seq 2020 $(THIS_YEAR)) | ||
|
||
define quarterly_target | ||
$(foreach YEAR,$(1),$(patsubst %,import/$(2)_%_$(YEAR),1 2 3 4)) | ||
endef | ||
Comment on lines
+5
to
+7
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Create a target for each year ( |
||
|
||
.PHONY : quarterly | ||
quarterly: import/candidates import/pacs import/candidate_filings import/pac_filings import/CON_2020 import/EXP_2020 import/CON_2021 import/EXP_2021 import/CON_2022 import/EXP_2022 import/CON_2023 import/EXP_2023 import/CON_2024 import/EXP_2024 | ||
quarterly: import/candidates import/pacs import/candidate_filings import/pac_filings \ | ||
$(call quarterly_target,$(QUARTERLY_YEARS),CON) $(call quarterly_target,$(QUARTERLY_YEARS),EXP) | ||
python manage.py make_search_index | ||
|
||
.PHONY : nightly | ||
nightly: import/candidates import/pacs import/candidate_filings import/pac_filings import/CON_2023 import/EXP_2023 import/CON_2024 import/EXP_2024 | ||
nightly: import/candidates import/pacs import/candidate_filings import/pac_filings \ | ||
$(call quarterly_target,$(NIGHTLY_YEARS),CON) $(call quarterly_target,$(NIGHTLY_YEARS),EXP) | ||
python manage.py make_search_index | ||
|
||
import/% : _data/sorted/%.csv | ||
.SECONDEXPANSION: | ||
import/% : _data/sorted/$$(word 1, $$(subst _, , $$*))_$$(word 3, $$(subst _, , $$*)).csv | ||
Comment on lines
+19
to
+20
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Parse the transaction type and year out of a pattern like |
||
python manage.py import_transactions --transaction-type $(word 1, $(subst _, , $*)) \ | ||
--year $(word 2, $(subst _, , $*)) \ | ||
--quarters $(word 2, $(subst _, , $*)) \ | ||
--year $(word 3, $(subst _, , $*)) \ | ||
--file $< | ||
|
||
import/pac_filings : _data/raw/pac_committee_filings.csv | ||
|
@@ -30,7 +41,6 @@ _data/raw/%_committees.csv : | |
_data/raw/%_committee_filings.csv : | ||
wget --no-check-certificate --no-use-server-timestamps -O $@ "https://openness-project-nmid.s3.amazonaws.com/$*_committee_filings.csv" | ||
|
||
|
||
_data/sorted/%.csv : _data/raw/%.csv | ||
xsv fixlengths $< | xsv sort -s OrgID,"Report Name","Start of Period","End of Period" > $@ | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
import csv | ||
import math | ||
import re | ||
from itertools import groupby | ||
|
||
|
@@ -13,17 +14,35 @@ | |
|
||
|
||
def filing_key(record): | ||
start_date = parse_date(record["Start of Period"]) | ||
end_date = parse_date(record["End of Period"]) | ||
|
||
return ( | ||
record["OrgID"], | ||
record["Report Name"], | ||
start_date.year if start_date else None, | ||
end_date.year if end_date else None, | ||
parse_date(record["Start of Period"]), | ||
parse_date(record["End of Period"]), | ||
) | ||
|
||
|
||
def get_quarter(date_str): | ||
date = parse_date(date_str) | ||
return math.ceil(date.month / 3.0) | ||
|
||
|
||
def get_month_range(quarters): | ||
quarter_to_month_range = { | ||
1: (1, 3), | ||
2: (4, 6), | ||
3: (7, 9), | ||
4: (10, 12), | ||
} | ||
|
||
months = [] | ||
|
||
for q in quarters: | ||
months.extend(quarter_to_month_range[q]) | ||
|
||
return min(months), max(months) | ||
|
||
|
||
class Command(BaseCommand): | ||
help = """ | ||
Import data from the New Mexico Campaign Finance System: | ||
|
@@ -39,12 +58,24 @@ def add_arguments(self, parser): | |
default="CON", | ||
help="Type of transaction to import: CON, EXP (Default: CON)", | ||
) | ||
parser.add_argument( | ||
"--quarters", | ||
dest="quarters", | ||
default="1,2,3,4", | ||
help="Comma-separated list of quarters to import (Default: 1,2,3,4)", | ||
) | ||
parser.add_argument( | ||
"--year", | ||
dest="year", | ||
default="2023", | ||
help="Year to import (Default: 2023)", | ||
) | ||
parser.add_argument( | ||
"--batch-size", | ||
dest="batch_size", | ||
default=500, | ||
help="Number of transaction records to bulk create at once (Default: 500)", | ||
) | ||
parser.add_argument( | ||
"--file", | ||
dest="file", | ||
|
@@ -53,38 +84,83 @@ def add_arguments(self, parser): | |
) | ||
|
||
def handle(self, *args, **options): | ||
if options["transaction_type"] not in ("EXP", "CON"): | ||
transaction_type = options["transaction_type"] | ||
|
||
if transaction_type not in ("EXP", "CON"): | ||
raise ValueError("Transaction type must be one of: EXP, CON") | ||
|
||
year = options["year"] | ||
|
||
self.stdout.write(f"Loading data from {transaction_type}_{year}.csv") | ||
|
||
quarters = {int(q) for q in options["quarters"].split(",")} | ||
quarter_string = ", ".join(f"Q{q}" for q in quarters) | ||
|
||
with open(options["file"]) as f: | ||
if options["transaction_type"] == "CON": | ||
self.import_contributions(f, year) | ||
self.stdout.write( | ||
f"Importing transactions from filing periods beginning in {quarter_string}" | ||
) | ||
|
||
if transaction_type == "CON": | ||
self.import_contributions(f, quarters, year, options["batch_size"]) | ||
|
||
elif transaction_type == "EXP": | ||
self.import_expenditures(f, quarters, year, options["batch_size"]) | ||
|
||
elif options["transaction_type"] == "EXP": | ||
self.import_expenditures(f, year) | ||
self.stdout.write(self.style.SUCCESS("Transactions imported!")) | ||
|
||
self.stdout.write( | ||
f"Totaling filings from periods beginning in {quarter_string}" | ||
) | ||
self.total_filings(quarters, year) | ||
self.stdout.write(self.style.SUCCESS("Filings totaled!")) | ||
|
||
self.total_filings(year) | ||
call_command("aggregate_data") | ||
|
||
def import_contributions(self, f, year): | ||
def _records_by_filing(self, records, filing_quarters): | ||
""" | ||
Group records by filing, then filter for filings beginning in the specified | ||
quarter/s. Note that, because transactions are organized by year, transactions | ||
for one filing can appear across two files, if the reporting period begins in | ||
one year and ends in the next. This approach will return filings beginning in | ||
the specified quarter in *any* year, so that these split cases will be covered. | ||
For example, consider a filing period starting in December 2023 and ending in | ||
February 2024. Transactions would be split across the 2023 and 2024 files. To | ||
get them all, you would run the Q4 import for both 2023 and 2024. | ||
""" | ||
records_in_quarters = filter( | ||
lambda x: get_quarter(x["Start of Period"]) in filing_quarters, records | ||
) | ||
return groupby(tqdm(records_in_quarters), key=filing_key) | ||
|
||
def _save_batch(self, batch): | ||
""" | ||
Contributions are represented by several different types of models. Sort | ||
then group them by class, then save each group of records. | ||
""" | ||
for cls, cls_records in groupby( | ||
sorted(batch, key=lambda x: str(type(x))), key=lambda x: type(x) | ||
hancush marked this conversation as resolved.
Show resolved
Hide resolved
|
||
): | ||
yield cls.objects.bulk_create(cls_records) | ||
|
||
def import_contributions(self, f, quarters, year, batch_size): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Such a clean function. |
||
reader = csv.DictReader(f) | ||
batch = [] | ||
|
||
for filing_group, records in groupby(tqdm(reader), key=filing_key): | ||
for _, records in self._records_by_filing(reader, quarters): | ||
for i, record in enumerate(records): | ||
if i == 0: | ||
try: | ||
filing = self._get_filing(record) | ||
except ValueError: | ||
break | ||
|
||
# the contributions file are organized by the year | ||
# of a transaction date not the date of the | ||
# The contributions files are organized by the year | ||
# of the transaction date, not the date of the | ||
# filing, so transactions from the same filing can | ||
# appear in multiple contribution files. | ||
# | ||
# we need to make sure we just clear out the | ||
# We need to make sure we just clear out the | ||
# contributions in a file that were purportedly made | ||
# in a given year. | ||
models.Loan.objects.filter( | ||
|
@@ -105,17 +181,26 @@ def import_contributions(self, f, year): | |
record["Contribution Type"] in {"Loans Received", "Special Event"} | ||
or "Contribution" in record["Contribution Type"] | ||
): | ||
self.make_contribution(record, contributor, filing).save() | ||
contribution = self.make_contribution(record, contributor, filing) | ||
batch.append(contribution) | ||
|
||
else: | ||
self.stderr.write( | ||
f"Could not determine contribution type from record: {record['Contribution Type']}" | ||
) | ||
|
||
def import_expenditures(self, f, year): | ||
if len(batch) % batch_size == 0: | ||
self._save_batch(batch) | ||
batch = [] | ||
|
||
if len(batch) > 0: | ||
self._save_batch(batch) | ||
|
||
def import_expenditures(self, f, quarters, year, batch_size): | ||
reader = csv.DictReader(f) | ||
batch = [] | ||
|
||
for filing_group, records in groupby(tqdm(reader), key=filing_key): | ||
for _, records in self._records_by_filing(reader, quarters): | ||
for i, record in enumerate(records): | ||
if i == 0: | ||
try: | ||
|
@@ -129,7 +214,12 @@ def import_expenditures(self, f, year): | |
received_date__year=year, | ||
).delete() | ||
|
||
self.make_contribution(record, None, filing).save() | ||
contribution = self.make_contribution(record, None, filing) | ||
batch.append(contribution) | ||
|
||
if not len(batch) % batch_size: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when doing modulo, i think it's better to have the form that
i think it's just a touch more explicit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Love it. Done. |
||
self._save_batch(batch) | ||
batch = [] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you also need to handle the case that you have iterated through all the records, but the batch isn't modulo the batch_size There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great catch, thanks. |
||
|
||
def make_contributor(self, record): | ||
state, _ = models.State.objects.get_or_create( | ||
|
@@ -268,7 +358,10 @@ def _get_filing(self, record): | |
"filing_period__initial_date", | ||
"filing_period__end_date", | ||
) | ||
msg = f"{filings.count()} filings found for PAC {pac} from record {record}:\n{filing_meta}\n\nUsing most recent filing matching query..." | ||
msg = ( | ||
f"{filings.count()} filings found for PAC {pac} from record " | ||
f"{record}:\n{filing_meta}\n\nUsing most recent filing matching query..." | ||
) | ||
self.stderr.write(msg) | ||
|
||
return filing | ||
|
@@ -410,12 +503,16 @@ def make_contribution(self, record, contributor, filing): | |
|
||
return contribution | ||
|
||
def total_filings(self, year): | ||
for filing in models.Filing.objects.filter( | ||
final=True, | ||
filing_period__initial_date__year__lte=year, | ||
filing_period__end_date__year__gte=year, | ||
).iterator(): | ||
def total_filings(self, quarters, year): | ||
start, end = get_month_range(quarters) | ||
|
||
for filing in tqdm( | ||
models.Filing.objects.filter( | ||
final=True, | ||
filing_period__initial_date__month__gte=start, | ||
filing_period__initial_date__month__lte=end, | ||
).iterator() | ||
): | ||
contributions = filing.contributions().aggregate(total=Sum("amount")) | ||
expenditures = filing.expenditures().aggregate(total=Sum("amount")) | ||
loans = filing.loans().aggregate(total=Sum("amount")) | ||
|
@@ -425,5 +522,3 @@ def total_filings(self, year): | |
filing.total_loans = loans["total"] or 0 | ||
|
||
filing.save() | ||
|
||
self.stdout.write(f"Totalled {filing}") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,3 @@ | ||
version: "2.4" | ||
|
||
services: | ||
app: | ||
# Don't restart the service when the command exits | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,7 @@ | ||
version: '2.4' | ||
|
||
services: | ||
app: | ||
image: nmid | ||
build: . | ||
build: . | ||
container_name: nmid | ||
stdin_open: true | ||
tty: true | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: Change back to deploy before merge.