-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_word_count.py
42 lines (33 loc) · 1.11 KB
/
test_word_count.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import findspark
from pyspark.sql.types import LongType, StringType, StructField, StructType
findspark.init()
from pyspark.sql import SparkSession
from exercises.a_spark_streaming_socket_source.word_count import transform
from .comparers import assert_frames_functionally_equivalent
spark = SparkSession.builder.getOrCreate()
def test_word_count():
input_str = "I love Spark , Spark Structure Streaming. I am learning Spark !"
df = spark.createDataFrame(
[[input_str]], schema=StructType([StructField("value", StringType(), True)])
)
res_df = transform(input_df=df)
expected_df = spark.createDataFrame(
[
("!", 1),
("love", 1),
("learning", 1),
("I", 2),
(",", 1),
("Spark", 3),
("am", 1),
("Structure", 1),
("Streaming.", 1),
],
schema=StructType(
[
StructField("word", StringType(), True),
StructField("count", LongType(), False),
]
),
)
assert_frames_functionally_equivalent(res_df, expected_df)