-
Notifications
You must be signed in to change notification settings - Fork 32
/
Copy pathalgorithm.py
70 lines (54 loc) · 1.9 KB
/
algorithm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import numpy as np
import pandas as pd
import json
import sys
from dataclasses import dataclass
import argparse
import pickle
import tarzan.TARZAN as TARZAN
@dataclass
class CustomParameters:
anomaly_window_size: int = 20
alphabet_size: int = 4
random_state: int = 42
class AlgorithmArgs(argparse.Namespace):
@property
def ts(self) -> np.ndarray:
return self.df.values[:, 1]
@property
def df(self) -> pd.DataFrame:
return pd.read_csv(self.dataInput)
@staticmethod
def from_sys_args() -> 'AlgorithmArgs':
args: dict = json.loads(sys.argv[1])
custom_parameter_keys = dir(CustomParameters())
filtered_parameters = dict(
filter(lambda x: x[0] in custom_parameter_keys, args.get("customParameters", {}).items()))
args["customParameters"] = CustomParameters(**filtered_parameters)
return AlgorithmArgs(**args)
def set_random_state(config: AlgorithmArgs) -> None:
seed = config.customParameters.random_state
import random
random.seed(seed)
np.random.seed(seed)
# the training and execution happens in one
def train(args: AlgorithmArgs):
data = args.ts
with open(args.modelOutput, "wb") as f:
pickle.dump(data, f)
def execute(args: AlgorithmArgs):
data = args.ts
with open(args.modelInput, "rb") as f:
train_data = pickle.load(f)
scores = TARZAN.TARZAN(train_data, data, args.customParameters.anomaly_window_size, args.customParameters.alphabet_size)
np.array(scores).tofile(args.dataOutput, sep="\n")
if __name__ == "__main__":
args = AlgorithmArgs.from_sys_args()
print(f"Configuration: {args}")
set_random_state(args)
if args.executionType == "train":
train(args)
elif args.executionType == "execute":
execute(args)
else:
raise ValueError(f"No executionType '{args.executionType}' available! Choose either 'train' or 'execute'.")