-
Notifications
You must be signed in to change notification settings - Fork 472
Expand file tree
/
Copy pathpyspark-session-2021-04-14.txt
More file actions
executable file
·158 lines (151 loc) · 4.91 KB
/
pyspark-session-2021-04-14.txt
File metadata and controls
executable file
·158 lines (151 loc) · 4.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
$ cat /tmp/foxdata.txt
a red fox jumped of high
fox jumped over a high fence
red of fox jumped
Python 3.7.2 (v3.7.2:9a3ffc0492, Dec 24 2018, 02:44:43)
...
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.1.1
/_/
Using Python version 3.7.2 (v3.7.2:9a3ffc0492, Dec 24 2018 02:44:43)
Spark context Web UI available at http://10.0.0.93:4040
Spark context available as 'sc' (master = local[*], app id = local-1618456720582).
SparkSession available as 'spark'.
>>>
>>>
>>>
>>> spark
<pyspark.sql.session.SparkSession object at 0x7fc8d618d438>
>>> input_path = "/tmp/foxdata.txt"
>>> input_path
'/tmp/foxdata.txt'
>>> # Read input path and create an RDD[String]
...
>>> records = spark.sparkContext.textFile(input_path)
>>> records
/tmp/foxdata.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0
>>>
>>> records.collect()
[
'a red fox jumped of high',
'fox jumped over a high fence',
'red of fox jumped'
]
>>> records.count()
3
>>> # tokenize records and create RDD[ [String] ]
...
>>> tokenizd = records.map(lambda record: record.split(" "))
>>> tokenizd.collect()
[
['a', 'red', 'fox', 'jumped', 'of', 'high'],
['fox', 'jumped', 'over', 'a', 'high', 'fence'],
['red', 'of', 'fox', 'jumped']
]
>>> tokenizd.count()
3
>>> pairs = tokenizd.map(lambda word : (word, 1))
>>> pairs.collect()
[
(['a', 'red', 'fox', 'jumped', 'of', 'high'], 1),
(['fox', 'jumped', 'over', 'a', 'high', 'fence'], 1),
(['red', 'of', 'fox', 'jumped'], 1)
]
>>>
>>> words = tokenizd.flatMap(lambda arr: arr)
>>> words.collect()
['a', 'red', 'fox', 'jumped', 'of', 'high', 'fox', 'jumped', 'over', 'a', 'high', 'fence', 'red', 'of', 'fox', 'jumped']
>>> words.count()
16
>>> # words : RDD[String]
...
>>> key_value_pairs = words.map(lambda word: (word, 1))
>>> key_value_pairs.collect()
[('a', 1), ('red', 1), ('fox', 1), ('jumped', 1), ('of', 1), ('high', 1), ('fox', 1), ('jumped', 1), ('over', 1), ('a', 1), ('high', 1), ('fence', 1), ('red', 1), ('of', 1), ('fox', 1), ('jumped', 1)]
>>>
>>> # key_value_pairs: RDD[(String, Integer)]
...
>>>
>>> grouped = key_value_pairs.groupByKey()
>>> grouped.collect()
[
('of', <pyspark.resultiterable.ResultIterable object at 0x7fc8d61f9390>),
('high', <pyspark.resultiterable.ResultIterable object at 0x7fc8d61f9400>),
('fence', <pyspark.resultiterable.ResultIterable object at 0x7fc8d61f94e0>),
('a', <pyspark.resultiterable.ResultIterable object at 0x7fc8d61f9470>),
('red', <pyspark.resultiterable.ResultIterable object at 0x7fc8d61f9438>),
('fox', <pyspark.resultiterable.ResultIterable object at 0x7fc8d61f95f8>),
('jumped', <pyspark.resultiterable.ResultIterable object at 0x7fc8d61f9550>),
('over', <pyspark.resultiterable.ResultIterable object at 0x7fc8d61f96d8>)
]
>>>
>>> debugged = grouped.mapValues(lambda values: list(values))
>>> debugged.collect()
[
('of', [1, 1]),
('high', [1, 1]),
('fence', [1]),
('a', [1, 1]),
('red', [1, 1]),
('fox', [1, 1, 1]),
('jumped', [1, 1, 1]),
('over', [1])
]
>>>
>>>
>>> frequency = grouped.mapValues(lambda values: sum(values))
>>> frequency.collect()
[('of', 2), ('high', 2), ('fence', 1), ('a', 2), ('red', 2), ('fox', 3), ('jumped', 3), ('over', 1)]
>>>
>>>
>>>
>>> key_value_pairs.collect()
[('a', 1), ('red', 1), ('fox', 1), ('jumped', 1), ('of', 1), ('high', 1), ('fox', 1), ('jumped', 1), ('over', 1), ('a', 1), ('high', 1), ('fence', 1), ('red', 1), ('of', 1), ('fox', 1), ('jumped', 1)]
>>>
>>>
>>>
>>> reduced = key_value_pairs.reduceByKey(lambda x, y: x+y)
>>> reduced.collect()
[('of', 2), ('high', 2), ('fence', 1), ('a', 2), ('red', 2), ('fox', 3), ('jumped', 3), ('over', 1)]
>>>
>>> rdd7 = reduced.mapValues(lambda x: x+100)
>>> rdd7.collect()
[('of', 102), ('high', 102), ('fence', 101), ('a', 102), ('red', 102), ('fox', 103), ('jumped', 103), ('over', 101)]
>>> rdd77 = reduced.map(lambda x: x[1]+100)
>>> rdd77.collect()
[102, 102, 101, 102, 102, 103, 103, 101]
>>> rdd77 = reduced.map(lambda x: (x[0], x[1]+100))
>>> rdd77.collect()
[('of', 102), ('high', 102), ('fence', 101), ('a', 102), ('red', 102), ('fox', 103), ('jumped', 103), ('over', 101)]
>>>
>>># get number of partitions for rdd77
>>> rdd77.getNumPartitions()
2
>>>
>>>
>>> KV = [('x', 3), ('x', 5), ('x', 8), ('y', 50), ('y', 60), ('y', 70), ('z', 3)]
>>> KV
[('x', 3), ('x', 5), ('x', 8), ('y', 50), ('y', 60), ('y', 70), ('z', 3)]
>>> rdd = spark.sparkContext.parallelize(KV)
>>>
>>> rdd.collect()
[('x', 3), ('x', 5), ('x', 8), ('y', 50), ('y', 60), ('y', 70), ('z', 3)]
>>> rdd.count()
7
>>>
>>> filtered1 = rdd.filter(lambda x : x[1] > 10)
>>> filtered1.collect()
[('y', 50), ('y', 60), ('y', 70)]
>>> filtered2 = rdd.filter(lambda x : x[1] < 10)
>>> filtered2.collect()
[('x', 3), ('x', 5), ('x', 8), ('z', 3)]
>>>
>>>
>>> added = rdd.reduceByKey(lambda a, b: a+b)
>>> added.collect()
[('y', 180), ('z', 3), ('x', 16)]
>>>