-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpi-hole-process.pyspark
67 lines (56 loc) · 2.32 KB
/
pi-hole-process.pyspark
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import explode
from pyspark.sql.functions import from_unixtime
from pyspark.sql.functions import when
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc.getOrCreate())
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## Logging the logs!
logger = glueContext.get_logger()
sc.setLogLevel('INFO')
## @type: DataSource
## @args: [database = "pihole-raw", table_name = "pihole_raw_uploads", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
pi_raw_dyf = glueContext.create_dynamic_frame.from_catalog(
database="pihole-raw",
table_name="pihole_raw_uploads",
transformation_ctx = "datasource0")
df = pi_raw_dyf.toDF()
if (df.count() == 0):
sys.exit()
# Get rid of that annoying nested array
exploded = df.select(explode(df.data))
# Assign names and data types to the columns
processed = exploded.select(
exploded.col[0].alias("epoch_timestamp").cast("long"),
exploded.col[1].alias("query_type"),
exploded.col[2].alias("domain"),
exploded.col[3].alias("client"),
exploded.col[4].alias("query_status").cast("int"),
exploded.col[5].alias("dnssec").cast("int"),
exploded.col[6].alias("reply").cast("int"),
exploded.col[7].alias("delay").cast("float"),
exploded.col[8].alias("CNAME_domain"),
exploded.col[9].alias("regex_idx").cast("int"),
exploded.col[10].alias("upstream_name"),
).dropDuplicates()
# Map my DNS names to catch those times when DNS wasn't set up yet.
mapped = processed.withColumn("client", \
when(processed.client == "172.22.22.22", "sk-mac.sk.lan") \
.when(processed.client == "172.22.22.34", "bigmac.sk.lan") \
.otherwise(processed.client))
timestamp = mapped.withColumn("timestamp", from_unixtime(mapped.epoch_timestamp).cast("timestamp"))
pi_dyf = DynamicFrame.fromDF(timestamp, glueContext, "Pihole Data")
glueContext.write_dynamic_frame.from_options(pi_dyf, connection_type = "s3", connection_options = {"path": "s3://pihole-processed/"}, format = "parquet")
job.commit()