-
Notifications
You must be signed in to change notification settings - Fork 0
/
datetime_handle.py
181 lines (137 loc) · 6.24 KB
/
datetime_handle.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import pandas as pd
import numpy as np
import sys
import os
import re
def get_fulltime(series, freq='D'):
try:
start_time = series[0]
end_time = series[-1]
fulltime = pd.date_range(start_time, end_time, freq=freq)
return fulltime
except Exception as e:
raise ValueError(f"{e}")
# ------------------------------------------------------------------------------
def fulltime_table(df, fulltime_series):
if type(df.index[0])==type(fulltime_series[0]):
null_table = pd.DataFrame(
data=None,
columns=df.columns,
index=fulltime_series
)
_merge = pd.concat([df, null_table])
_merge = _merge.sort_index()
return _merge
else:
sys.exit("Data types of DataFrame indexes and input series do not match")
# ------------------------------------------------------------------------------
def convert_to_datetime(colname):
"""
Convert a string to a datetime object based on specific validation rules.
Parameters:
- colname (str): The input string to be converted.
Returns:
- datetime: A datetime object representing the date.
Raises:
- ValueError: If the input string does not meet the required format.
"""
# Check if the input contains alphabetical characters
match = re.search(r'[A-Za-z]', colname)
if match:
# Get the last alphabetical character's index and check the remaining string
last_alpha_idx = match.end() - 1
# Extract the part after the last alphabetical character
numeric_part = colname[last_alpha_idx + 1:]
# Check if the remaining part has exactly 8 digits
if len(numeric_part) == 8 and numeric_part.isdigit():
return pd.to_datetime(numeric_part, format='%Y%m%d')
else:
raise ValueError("Input must end with 'YYYYMMDD' after letters.")
# If no alphabetical characters, check if the string is 8 digits
elif colname.isdigit() and len(colname) == 8:
return pd.to_datetime(colname, format='%Y%m%d')
else:
raise ValueError("Input must be 'YYYYMMDD' or contain letters followed by 'YYYYMMDD'.")
# ------------------------------------------------------------------------------
def datetime_to_string(date, initial_char='N'):
"""
Convert a datetime object to a formatted string with an optional prefix.
Parameters:
- date (datetime): The datetime object to convert.
- initial_char (str): The characters to prefix the date string. Default is 'N'.
Returns:
- str: The formatted date string in the form of initial_char + 'YYYYMMDD'.
Raises:
- ValueError: If the input initial_char contains non-alphabetical characters.
- TypeError: If the date is not a datetime object.
"""
# Validate input
if not isinstance(initial_char, str) or not initial_char.isalpha():
raise ValueError("Initial character(s) must only contain alphabetical characters.")
if not isinstance(date, (pd.Timestamp, pd.DatetimeIndex, pd.Timestamp)):
raise TypeError("The date must be a datetime object.")
# Convert datetime to string in 'YYYYMMDD' format
date_str = date.strftime('%Y%m%d')
# Combine initial character with the formatted date
return f"{initial_char}{date_str}"
# ------------------------------------------------------------------------------
def intersect_time_index(df1_index, df2_index):
"""
Finds the intersection of two time indices.
Args:
df1_index (iterable): An iterable of time indices (e.g., list, set, pandas Index) for the first dataset.
df2_index (iterable): An iterable of time indices (e.g., list, set, pandas Index) for the second dataset.
Returns:
list: A sorted list of the common elements in both time indices.
Raises:
TypeError: If either input is not an iterable.
ValueError: If either input is empty.
"""
try:
# Ensure inputs are iterables that can be converted to sets
_a = set(df1_index)
_b = set(df2_index)
except TypeError as e:
raise TypeError("Both inputs must be iterables (e.g., list, set, pandas Index).") from e
if not _a:
raise ValueError("The first input time index is empty.")
if not _b:
raise ValueError("The second input time index is empty.")
# Find intersection and sort the result
intersection = sorted(list(_a.intersection(_b)))
return intersection
# ------------------------------------------------------------------------------
def extract_datetime_from_mfile(mfile):
"""
Extract unique datetime components from filenames in a specified mfile.
Parameters:
- mfile (str): Path to the mfile containing list of filenames.
Returns:
- list: Sorted list of unique datetime components extracted from filenames.
"""
# Read lines from the mfile and strip whitespace/newline characters
with open(mfile, "r") as input_file:
lines = [line.strip() for line in input_file]
# Extract basenames (without extension) from the lines
basenames = [os.path.basename(os.path.splitext(line)[0]) for line in lines]
# Use a set comprehension to collect unique datetime components
datetimes = {"N"+name.split("_")[-2][3:] for name in basenames}.union({"N"+name.split("_")[-1][3:] for name in basenames})
# Return a sorted list of unique datetime components
return sorted(datetimes)
# ------------------------------------------------------------------------------
def numeric_time_index(time_series):
"""
Generate a numeric time index for a given time series, excluding null values.
Parameters:
time_series (pandas.Series): A pandas Series with a DatetimeIndex, which may contain null values.
Returns:
numpy.ndarray: An array of numeric indices corresponding to the non-null values in the input time series.
"""
# Create a boolean filter for non-null values in the time series
non_null_filter = time_series.notna()
# Generate a numeric array representing the time indices
numeric_time_array = np.arange(len(time_series))
# Apply the non-null filter to the numeric time array
numeric_time_array_finite = numeric_time_array[non_null_filter]
return numeric_time_array_finite
# ------------------------------------------------------------------------------