Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor schedule parser #119

Merged
merged 1 commit into from
Jan 27, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
358 changes: 184 additions & 174 deletions src/ferry_planner/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def _get_download_url(
*,
date: datetime,
) -> str:
return self.base_url + f"{origin_id}-{destination_id}?&scheduleDate={date.strftime('%m/%d/%Y')}"
return f"{self.base_url}{origin_id}-{destination_id}?&scheduleDate={date.strftime('%m/%d/%Y')}"

def _get_filepath(
self,
Expand Down Expand Up @@ -228,7 +228,7 @@ async def _download_schedule_async(
msg = f"status {response.status_code}"
raise ScheduleDownloadError(msg, url=url)
self._log(f"fetched schedule: {route}:{date.date()}")
result = parse_schedule_html(response, date)
result = ScheduleParser.parse_schedule_html(response, date)
if result.redirect_url:
if len(redirects) > max_redirects_count:
msg = "too many redirects"
Expand Down Expand Up @@ -310,180 +310,190 @@ def _refresh_task(self) -> None:
time.sleep(self.refresh_interval)


def parse_schedule_html(response: httpx.Response, date: datetime) -> HtmlParseResult:
html = response.text.replace("\u2060", "")
soup = BeautifulSoup(markup=html, features="html.parser")
table_tag = soup.find("table", id="dailyScheduleTableOnward")
daterange_tag = soup.find("div", id="dateRangeModal") # for seasonal
rows: Sequence[Tag] = []
if table_tag and isinstance(table_tag, Tag) and table_tag.tbody:
rows = table_tag.tbody.find_all("tr")
elif daterange_tag and isinstance(daterange_tag, Tag):
hrefs = [a["href"] for a in daterange_tag.find_all("a")]
index = get_seasonal_schedule_daterange_index(hrefs, date)
if index < 0:
msg = f"date {date} is out of seasonal schedules range"
raise ScheduleParseError(msg, url=str(response.url))
url = response.url.scheme + "://" + response.url.host + hrefs[index]
if index > 0 and url != str(response.url):
return HtmlParseResult.redirect(url)
rows = get_seasonal_schedule_rows(str(response.url), soup, date)
sailings = parse_sailings_from_html_rows(rows, date)
notes = []
if not sailings:
err = "No sailings found"
for msg in NO_SAILINGS_MESSAGES:
if msg in html:
err = msg
break
notes.append(err)
print(f"{err} at {response.url}")
return HtmlParseResult.from_sailings(sailings, notes)


def parse_sailings_from_html_rows(rows: Sequence[Tag], date: datetime) -> Sequence[FerrySailing]:
sailing_row_min_td_count = 3
sailings = []
for row in rows:
class ScheduleParser:
@staticmethod
def _log(message: str, /, *, level: str = "INFO") -> None:
print(f"[{ScheduleParser.__name__}:{level}] {message}")

@staticmethod
def parse_schedule_html(response: httpx.Response, date: datetime) -> HtmlParseResult:
html = response.text.replace("\u2060", "")
soup = BeautifulSoup(markup=html, features="html.parser")
table_tag = soup.find("table", id="dailyScheduleTableOnward")
daterange_tag = soup.find("div", id="dateRangeModal") # for seasonal
rows: Sequence[Tag] = []
if table_tag and isinstance(table_tag, Tag) and table_tag.tbody:
rows = table_tag.tbody.find_all("tr")
elif daterange_tag and isinstance(daterange_tag, Tag):
hrefs = [a["href"] for a in daterange_tag.find_all("a")]
index = ScheduleParser.get_seasonal_schedule_daterange_index(hrefs, date)
if index < 0:
msg = f"date {date} is out of seasonal schedules range"
raise ScheduleParseError(msg, url=str(response.url))
url = f"{response.url.scheme}://{response.url.host}{hrefs[index]}"
if index > 0 and url != str(response.url):
return HtmlParseResult.redirect(url)
rows = ScheduleParser.get_seasonal_schedule_rows(str(response.url), soup, date)
sailings = ScheduleParser.parse_sailings_from_html_rows(rows, date)
notes = []
tds = row.find_all("td")
if (
len(tds) < sailing_row_min_td_count
or "No sailings available" in tds[1].text
or "No passengers permitted" in tds[1].text
):
continue
td1 = tds[1].text.strip().split("\n", maxsplit=1)
if len(td1) > 1:
notes = parse_sailing_comment(td1[1])
# assumiing dates are always in the first note
if is_schedule_excluded_on_date(notes[0], date):
if not sailings:
err = "No sailings found"
for msg in NO_SAILINGS_MESSAGES:
if msg in html:
err = msg
break
notes.append(err)
ScheduleParser._log(f"{err} at {response.url}", level="WARNING")
return HtmlParseResult.from_sailings(sailings, notes)

@staticmethod
def parse_sailings_from_html_rows(rows: Sequence[Tag], date: datetime) -> Sequence[FerrySailing]:
sailing_row_min_td_count = 3
sailings = []
for row in rows:
tds = row.find_all("td")
if (
len(tds) < sailing_row_min_td_count
or "No sailings available" in tds[1].text
or "No passengers permitted" in tds[1].text
):
continue
notes = [n for n in notes if n]
departure = datetime.strptime(
td1[0].strip(),
"%I:%M %p",
).replace(year=date.year, month=date.month, day=date.day)
arrival = datetime.strptime(
row.find_all("td")[2].text.strip(),
"%I:%M %p",
).replace(year=date.year, month=date.month, day=date.day)
td3 = tds[3].text.strip()
if "h " in td3 and "m" in td3:
td3format = "%Hh %Mm"
elif "m" in td3:
td3format = "%Mm"
elif "h" in td3:
td3format = "%Hh"
else:
td3format = "%H:%M"
duration = int(
datetime_to_timedelta(
datetime.strptime(
td3,
td3format,
),
).total_seconds(),
)
sailing = FerrySailing(
departure=departure,
arrival=arrival,
duration=duration,
notes=tuple(notes),
)
sailings.append(sailing)
return sailings


def parse_sailing_comment(comment: str) -> list[str]:
notes: list[str] = []
comment = comment.strip()
notes.append(comment)
pos = comment.find("Note:")
if pos > 0:
notes.append(comment[pos:])
comment = comment[:pos].strip()
if comment.startswith("Last "):
notes.append(comment)
comment = ""
notes[0] = comment # replace original with truncated
return notes


def get_seasonal_schedule_rows(url: str, soup: BeautifulSoup, date: datetime) -> Sequence[Tag]:
rows: Sequence[Tag] = []
form = soup.find("form", id="seasonalSchedulesForm")
if not isinstance(form, Tag):
msg = "'seasonalSchedulesForm' not found"
raise ScheduleParseError(msg, url=url)
weekday = WEEKDAY_NAMES[date.weekday()]
for thead in form.find_all("thead"):
if thead.text.lower().strip().startswith(weekday):
rows = [x for x in itertools.takewhile(lambda t: t.name != "thead", thead.next_siblings) if x.name == "tr"]
break
return rows


def get_seasonal_schedule_daterange_index(hrefs: Sequence[str], date: datetime) -> int:
for i, href in enumerate(hrefs):
dates = get_seasonal_schedule_daterange_from_url(href)
if dates and date.date() >= dates[0].date() and date.date() <= dates[1].date():
return i
return -1


def get_seasonal_schedule_daterange_from_url(href: str) -> tuple[datetime, datetime] | None:
dates = href.replace("=", "-").replace("_", "-").split("-")[-2:]
expected_dates_count = 2
if (len(dates)) != expected_dates_count:
return None
date_from = datetime.strptime(dates[0], "%Y%m%d")
date_to = datetime.strptime(dates[1], "%Y%m%d")
return (date_from, date_to)


def is_schedule_excluded_on_date(schedule_comment: str, date: datetime) -> bool:
if not schedule_comment:
return False
schedule_comment = schedule_comment.strip()
if schedule_comment.upper().startswith("ONLY"):
return not match_specific_schedule_date(schedule_comment, date)
if schedule_comment.upper().startswith(("EXCEPT", "NOT AVAILABLE")):
return match_specific_schedule_date(schedule_comment, date)
print("Unknown comment: " + schedule_comment)
return False


def match_specific_schedule_date(schedule_dates: str, date: datetime) -> bool:
month: int | None = None
schedule_dates = schedule_dates.upper()
for c in [".", "&", " ON ", " ON:"]:
schedule_dates = schedule_dates.replace(c, ",")
tokens = [x.strip() for x in schedule_dates.split(",")]
tokens = [x for x in tokens if x and x not in ["ONLY", "EXCEPT", "NOT AVAILABLE"]]
for token in tokens:
if token in MONTHS:
month = MONTHS.index(token) + 1
continue
_date: datetime
if token.isnumeric():
if not month:
print(f"Failed to parse schedule dates: No month for {token} in '{schedule_dates}")
return False
_date = datetime(year=date.year, month=month, day=int(token))
else:
dt = token.split(" ")
expected_tokens_count = 2
if len(dt) == expected_tokens_count and dt[0].isnumeric() and dt[1] in MONTHS:
# 01 JAN, 02 JAN, 05 FEB, 06 FEB
_date = datetime(year=date.year, month=MONTHS.index(dt[1]) + 1, day=int(dt[0]))
elif len(dt) == expected_tokens_count and dt[1].isnumeric() and dt[0] in MONTHS:
# Jan 1, 2, Feb 5 & 6
month = MONTHS.index(dt[0]) + 1
_date = datetime(year=date.year, month=month, day=int(dt[1]))
td1 = tds[1].text.strip().split("\n", maxsplit=1)
departure_time, comments = td1 if len(td1) > 1 else (td1[0], "")
if comments:
notes = ScheduleParser.parse_sailing_comments(comments)
if any(ScheduleParser.is_sailing_excluded_on_date(note, date) for note in notes):
continue
else:
notes = []
departure = datetime.strptime(
departure_time.strip(),
"%I:%M %p",
).replace(year=date.year, month=date.month, day=date.day)
arrival = datetime.strptime(
row.find_all("td")[2].text.strip(),
"%I:%M %p",
).replace(year=date.year, month=date.month, day=date.day)
td3 = tds[3].text.strip()
if "h " in td3 and "m" in td3:
td3format = "%Hh %Mm"
elif "m" in td3:
td3format = "%Mm"
elif "h" in td3:
td3format = "%Hh"
else:
print(f"Failed to parse schedule dates: Unknown word '{token}' in '{schedule_dates}")
td3format = "%H:%M"
duration = int(
datetime_to_timedelta(
datetime.strptime(
td3,
td3format,
),
).total_seconds(),
)
sailing = FerrySailing(
departure=departure,
arrival=arrival,
duration=duration,
notes=tuple(notes),
)
sailings.append(sailing)
return sailings

@staticmethod
def parse_sailing_comments(comments: str) -> list[str]:
comments = comments.strip()
notes = comments.splitlines()
for i, note in enumerate(notes):
if note.startswith("Note:"):
notes[i] = note.lstrip("Note:").strip()
return [note.strip() for note in notes if note]

@staticmethod
def get_seasonal_schedule_rows(url: str, soup: BeautifulSoup, date: datetime) -> Sequence[Tag]:
rows: Sequence[Tag] = []
form = soup.find("form", id="seasonalSchedulesForm")
if not isinstance(form, Tag):
msg = "'seasonalSchedulesForm' not found"
raise ScheduleParseError(msg, url=url)
weekday = WEEKDAY_NAMES[date.weekday()]
for thead in form.find_all("thead"):
if thead.text.lower().strip().startswith(weekday):
rows = [
x for x in itertools.takewhile(lambda t: t.name != "thead", thead.next_siblings) if x.name == "tr"
]
break
if date.month == _date.month and date.day == _date.day:
return rows

@staticmethod
def get_seasonal_schedule_daterange_index(hrefs: Sequence[str], date: datetime) -> int:
for i, href in enumerate(hrefs):
dates = ScheduleParser.get_seasonal_schedule_daterange_from_url(href)
if dates and date.date() >= dates[0].date() and date.date() <= dates[1].date():
return i
return -1

@staticmethod
def get_seasonal_schedule_daterange_from_url(href: str) -> tuple[datetime, datetime] | None:
dates = href.replace("=", "-").replace("_", "-").split("-")[-2:]
expected_dates_count = 2
if (len(dates)) != expected_dates_count:
return None
date_from = datetime.strptime(dates[0], "%Y%m%d")
date_to = datetime.strptime(dates[1], "%Y%m%d")
return (date_from, date_to)

@staticmethod
def is_sailing_excluded_on_date(schedule_comment: str, date: datetime) -> bool:
if not schedule_comment:
return False
schedule_comment = schedule_comment.strip()
if schedule_comment.upper() == "FOOT PASSENGERS ONLY":
return True
return False
if schedule_comment.upper().startswith("ONLY"):
return not ScheduleParser.match_specific_sailing_date(schedule_comment, date)
if schedule_comment.upper().startswith(("EXCEPT", "NOT AVAILABLE")):
return ScheduleParser.match_specific_sailing_date(schedule_comment, date)
ScheduleParser._log(f"unknown sailing comment: '{schedule_comment}'", level="WARNING")
return False

@staticmethod
def match_specific_sailing_date(schedule_dates: str, date: datetime) -> bool:
month: int | None = None
schedule_dates = schedule_dates.upper()
for c in [".", "&", " ON ", " ON:"]:
schedule_dates = schedule_dates.replace(c, ",")
tokens = [x.strip() for x in schedule_dates.split(",")]
tokens = [x for x in tokens if x and x not in ["ONLY", "EXCEPT", "NOT AVAILABLE", "FOOT PASSENGERS ONLY"]]
for token in tokens:
if token in MONTHS:
month = MONTHS.index(token) + 1
continue
_date: datetime
if token.isnumeric():
if not month:
ScheduleParser._log(
f"failed to parse schedule dates: No month for '{token}' in '{schedule_dates}'",
level="WARNING",
)
return False
_date = datetime(year=date.year, month=month, day=int(token))
else:
dt = token.split(" ")
expected_tokens_count = 2
if len(dt) == expected_tokens_count and dt[0].isnumeric() and dt[1] in MONTHS:
# 01 JAN, 02 JAN, 05 FEB, 06 FEB
_date = datetime(year=date.year, month=MONTHS.index(dt[1]) + 1, day=int(dt[0]))
elif len(dt) == expected_tokens_count and dt[1].isnumeric() and dt[0] in MONTHS:
# Jan 1, 2, Feb 5 & 6
month = MONTHS.index(dt[0]) + 1
_date = datetime(year=date.year, month=month, day=int(dt[1]))
else:
ScheduleParser._log(
f"failed to parse schedule dates: Unknown word '{token}' in '{schedule_dates}'",
level="WARNING",
)
break
if date.month == _date.month and date.day == _date.day:
return True
return False
Loading