Step 1:¶
- Creating Spark Session
try:
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import filter
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, FloatType, StructType, StructField
from IPython.core.display import Image, display
print("all modules are ok ...... ")
except Exception as e:
print("some modules are missing : {} ".format(e))
spark = SparkSession.builder\
.appName("MyProcess")\
.master("local[*]")\
.getOrCreate()
spark
all modules are ok ......
SparkSession - in-memory
- Sample Dataset for learning purposes
data = map(lambda r: (r[0], r[1], float(r[2])),
map(lambda x: x.split(","),
["Paris,Food,19.00", "Marseille,Clothing,12.00",
"Paris,Food,8.00", "Paris,Clothing,15.00",
"Marseille,Food,20.00", "Lyon,Book,10.00"]))
schema = StructType([
StructField("city", StringType(), nullable=True),
StructField("type", StringType(), nullable=True),
StructField("price", FloatType(), nullable=True)
])
df = spark.createDataFrame(data, schema=schema)
df.show()
+---------+--------+-----+ | city| type|price| +---------+--------+-----+ | Paris| Food| 19.0| |Marseille|Clothing| 12.0| | Paris| Food| 8.0| | Paris|Clothing| 15.0| |Marseille| Food| 20.0| | Lyon| Book| 10.0| +---------+--------+-----+
"""Collect() – Retrieve data from Spark RDD/DataFrame"""
df.collect()
[Row(city='Paris', type='Food', price=19.0), Row(city='Marseille', type='Clothing', price=12.0), Row(city='Paris', type='Food', price=8.0), Row(city='Paris', type='Clothing', price=15.0), Row(city='Marseille', type='Food', price=20.0), Row(city='Lyon', type='Book', price=10.0)]
"""Returns the first n rows in the DataFrame."""
df.take(4)
[Row(city='Paris', type='Food', price=19.0), Row(city='Marseille', type='Clothing', price=12.0), Row(city='Paris', type='Food', price=8.0), Row(city='Paris', type='Clothing', price=15.0)]
"""gets the first two values """
df.head(2)
[Row(city='Paris', type='Food', price=19.0), Row(city='Marseille', type='Clothing', price=12.0)]
"""tails gets the Last values """
df.tail(2)
[Row(city='Marseille', type='Food', price=20.0), Row(city='Lyon', type='Book', price=10.0)]
"""selecting single column"""
df.select("City").show(2)
+---------+ | City| +---------+ | Paris| |Marseille| +---------+ only showing top 2 rows
"""selecting multiplke column"""
df.select(["city", "type"]).show(2)
+---------+--------+ | city| type| +---------+--------+ | Paris| Food| |Marseille|Clothing| +---------+--------+ only showing top 2 rows
- Filter
df.filter(df.city == "Paris").show()
+-----+--------+-----+ | city| type|price| +-----+--------+-----+ |Paris| Food| 19.0| |Paris| Food| 8.0| |Paris|Clothing| 15.0| +-----+--------+-----+
df.filter(df.city == "Paris").filter(df.type == "Food").show()
+-----+----+-----+ | city|type|price| +-----+----+-----+ |Paris|Food| 19.0| |Paris|Food| 8.0| +-----+----+-----+
df.filter(
(df.city == "Paris") & (df.type == "Food") ).show()
+-----+----+-----+ | city|type|price| +-----+----+-----+ |Paris|Food| 19.0| |Paris|Food| 8.0| +-----+----+-----+
df.filter(
(df.city == "Paris") & (df.price > 18.0) ).select('city').show()
+-----+ | city| +-----+ |Paris| +-----+
df.filter(df.price < 20 ).orderBy(df.price.asc()).select("price").show()
+-----+ |price| +-----+ | 8.0| | 10.0| | 12.0| | 15.0| | 19.0| +-----+
df.filter(df.price < 20 ).sort(df.price.asc()).select("price").show()
+-----+ |price| +-----+ | 8.0| | 10.0| | 12.0| | 15.0| | 19.0| +-----+
- Manipulating Columns
df.show(2)
+---------+--------+-----+ | city| type|price| +---------+--------+-----+ | Paris| Food| 19.0| |Marseille|Clothing| 12.0| +---------+--------+-----+ only showing top 2 rows
df.withColumn("50percent", df.price / 2).show()
+---------+--------+-----+---------+ | city| type|price|50percent| +---------+--------+-----+---------+ | Paris| Food| 19.0| 9.5| |Marseille|Clothing| 12.0| 6.0| | Paris| Food| 8.0| 4.0| | Paris|Clothing| 15.0| 7.5| |Marseille| Food| 20.0| 10.0| | Lyon| Book| 10.0| 5.0| +---------+--------+-----+---------+
df.drop(df.type).show()
+---------+-----+ | city|price| +---------+-----+ | Paris| 19.0| |Marseille| 12.0| | Paris| 8.0| | Paris| 15.0| |Marseille| 20.0| | Lyon| 10.0| +---------+-----+
df.drop(*["city", "price"]).filter(df.type == "Food").show()
+----+ |type| +----+ |Food| |Food| |Food| +----+
- Rename Columns
df.withColumnRenamed("city", "CITY").show()
+---------+--------+-----+ | CITY| type|price| +---------+--------+-----+ | Paris| Food| 19.0| |Marseille|Clothing| 12.0| | Paris| Food| 8.0| | Paris|Clothing| 15.0| |Marseille| Food| 20.0| | Lyon| Book| 10.0| +---------+--------+-----+
Applying functions¶
@udf("float")
def to_float(price):
return float(price)
df.select(to_float(df.price).alias('PRICE')).show()
+-----+ |PRICE| +-----+ | 19.0| | 12.0| | 8.0| | 15.0| | 20.0| | 10.0| +-----+
df.dtypes
[('city', 'string'), ('type', 'string'), ('price', 'float')]
Say i want to return JSON¶
df.show()
+---------+--------+-----+ | city| type|price| +---------+--------+-----+ | Paris| Food| 19.0| |Marseille|Clothing| 12.0| | Paris| Food| 8.0| | Paris|Clothing| 15.0| |Marseille| Food| 20.0| | Lyon| Book| 10.0| +---------+--------+-----+
df.dtypes
[('city', 'string'), ('type', 'string'), ('price', 'float')]
def to_some_transformation(price):
if price < 8.0:
return {"name":"value1"}
else:
return {"name":"value2"}
udfValueToCategoryGeo = udf(to_some_transformation, MapType(
StringType(), StringType()
))
df.withColumn("NEW", udfValueToCategoryGeo("price")).show()
+---------+--------+-----+----------------+ | city| type|price| NEW| +---------+--------+-----+----------------+ | Paris| Food| 19.0|{name -> value2}| |Marseille|Clothing| 12.0|{name -> value2}| | Paris| Food| 8.0|{name -> value2}| | Paris|Clothing| 15.0|{name -> value2}| |Marseille| Food| 20.0|{name -> value2}| | Lyon| Book| 10.0|{name -> value2}| +---------+--------+-----+----------------+
Removing Null and Empty items¶
df = spark.createDataFrame([
[1,'Navee','Srikanth']
, [2,'','Srikanth'] ,
[3,'Naveen','']],
['ID','FirstName','LastName']
)
df.show()
+---+---------+--------+ | ID|FirstName|LastName| +---+---------+--------+ | 1| Navee|Srikanth| | 2| |Srikanth| | 3| Naveen| | +---+---------+--------+
df.na.drop().show()
+---+---------+--------+ | ID|FirstName|LastName| +---+---------+--------+ | 1| Navee|Srikanth| | 2| |Srikanth| | 3| Naveen| | +---+---------+--------+
def remove(x):
if x == '':
return 1
else:
return 0
udfValueToCategory = udf(remove, IntegerType())
df1 = df.withColumn("category", udfValueToCategory("FirstName"))
df1.filter(df1.category == 0).show()
+---+---------+--------+--------+ | ID|FirstName|LastName|category| +---+---------+--------+--------+ | 1| Navee|Srikanth| 0| | 3| Naveen| | 0| +---+---------+--------+--------+
Aditional Resources¶
schema = StructType([
StructField("city", StringType(), nullable=True),
StructField("type", StringType(), nullable=True),
StructField("price", FloatType(), nullable=True)
])
data = map(lambda r: (r[0], r[1], float(r[2])),
map(lambda x: x.split(","),
["Paris,Food,19.00", "Marseille,Clothing,12.00",
"Paris,Food,8.00", "Paris,Clothing,15.00",
"Marseille,Food,20.00", "Lyon,Book,10.00"]))
df = spark.createDataFrame(data, schema=schema)
transactions = df
print('Number of partitions: {}'.format(transactions.rdd.getNumPartitions()))
print('Partitioner: {}'.format(transactions.rdd.partitioner))
print('Partitions structure: {}'.format(transactions.rdd.glom().collect()))
Number of partitions: 8 Partitioner: None Partitions structure: [[], [Row(city='Paris', type='Food', price=19.0)], [Row(city='Marseille', type='Clothing', price=12.0)], [Row(city='Paris', type='Food', price=8.0)], [], [Row(city='Paris', type='Clothing', price=15.0)], [Row(city='Marseille', type='Food', price=20.0)], [Row(city='Lyon', type='Book', price=10.0)]]
display(Image(url='https://iamluminousmen-media.s3.amazonaws.com/media/spark-partitions/spark-partitions-2.jpg', width=500, unconfined=True))
Spark splits data in order to process it in parallel in memory.nce the user has submitted his job into the cluster, each partition is sent to a specific executor for further processing. Only one partition is processed by one executor at a time, so the size and number of partitions transferred to the executor are directly proportional to the time it takes to complete them. Thus the more partitions the more work is distributed to executors, with a smaller number of partitions the work will be done in larger pieces (and often faster).[2]
The most important reason is performance. By having all the data needed to calculate on a single node, we reduce the overhead on the shuffle (the need for serialization and network traffic).[2]
The second reason is the cost reduction — better utilization of the cluster will help to reduce idle resources.[2]
What is Differnce between repartition and coalesce ?¶
- repartition() specifies new number of partition, up or down
- coalesce() uses existing partition, only decrease , no shuffle
df.rdd.saveAsTextFile(path='./learn/test1')
df.rdd.repartition(2).saveAsTextFile(path='./learn/test2')
df.rdd.coalesce(1).saveAsTextFile(path='./learn/test3')
display(Image(url='./1.jpg', width=200, unconfined=True))
df.show()
+---------+--------+-----+ | city| type|price| +---------+--------+-----+ | Paris| Food| 19.0| |Marseille|Clothing| 12.0| | Paris| Food| 8.0| | Paris|Clothing| 15.0| |Marseille| Food| 20.0| | Lyon| Book| 10.0| +---------+--------+-----+
iterating over all columns¶
for x in df.rdd.collect():
print(x.city,end="\n")
Paris Marseille Paris Paris Marseille Lyon
GroupBy¶
df.groupBy("city").mean("price").show()
+---------+----------+ | city|avg(price)| +---------+----------+ |Marseille| 16.0| | Paris| 14.0| | Lyon| 10.0| +---------+----------+
getting list of unique columns¶
df.select("city").distinct().show()
+---------+ | city| +---------+ |Marseille| | Paris| | Lyon| +---------+
"""This is dataframe object """
df
DataFrame[city: string, type: string, price: float]
"""This is RDD object """
df.rdd
MapPartitionsRDD[483] at javaToPython at NativeMethodAccessorImpl.java:0
Question¶
What is difference betweeb RDD and Dataframe [4]¶
So, a DataFrame has additional metadata due to its tabular format, which allows Spark to run certain optimizations on the finalized query.
An RDD, on the other hand, is merely a Resilient Distributed Dataset that is more of a blackbox of data that cannot be optimized as the operations that can be performed against it, are not as constrained.
However, you can go from a DataFrame to an RDD via its rdd method, and you can go from an RDD to a DataFrame (if the RDD is in a tabular format) via the toDF method
In general it is recommended to use a DataFrame where possible due to the built in query optimization.
aggregation¶
df.groupBy(df.city).agg({"*": "count"}).orderBy("city",ascending=False ).show()
+---------+--------+ | city|count(1)| +---------+--------+ | Paris| 3| |Marseille| 2| | Lyon| 1| +---------+--------+
No comments:
Post a Comment