Saturday, January 30, 2021

Learn about Python Dask in a Easy Way

Youtube

Learning Dask With Python Distributed Computing

Authors

  • Soumil Nitin Shah

Soumil Nitin Shah

Bachelor in Electronic Engineering | Masters in Electrical Engineering | Master in Computer Engineering |

Hello! I’m Soumil Nitin Shah, a Software and Hardware Developer based in New York City. I have completed by Bachelor in Electronic Engineering and my Double master’s in Computer and Electrical Engineering. I Develop Python Based Cross Platform Desktop Application , Webpages , Software, REST API, Database and much more I have more than 2 Years of Experience in Python

Step 1: Installation of Dask
  • !pip install dask
  • !pip install cloudpickle
  • !pip install "dask[dataframe]"
  • !pip install "dask[complete]"
In [2]:
!pip show dask
Name: dask
Version: 2020.12.0
Summary: Parallel PyData with Task Scheduling
Home-page: https://github.com/dask/dask/
Author: None
Author-email: None
License: BSD
Location: c:\python38\lib\site-packages
Requires: pyyaml
Required-by: swifter, distributed
In [3]:
# Define the Imports 
try:
    import os
    import json
    import math
    import dask
    from dask.distributed import Client
    import dask.dataframe as dd
    import numpy as np
    import dask.multiprocessing
except Exception as e:
    print("Some Modules are Missing : {} ".format(e))
In [4]:
os.listdir()
Out[4]:
['.ipynb_checkpoints',
 'dask-worker-space',
 'netflix_titles.csv',
 'Stackoverflow-issue-',
 'Untitled.ipynb',
 'Youtube.ipynb']
In [5]:
size = os.path.getsize("netflix_titles.csv") / math.pow(1024,3)
print("Size in GB : {} ".format(size))
Size in GB : 0.0022451020777225494 
In [6]:
client = Client(n_workers=3, threads_per_worker=1, processes=False, memory_limit='2GB')
In [7]:
client
Out[7]:

Client

Cluster

  • Workers: 3
  • Cores: 3
  • Memory: 6.00 GB
Read a csv Files
In [8]:
dd = dd.read_csv("netflix_titles.csv")
In [9]:
dd
Out[9]:
Dask DataFrame Structure:
show_id type title director cast country date_added release_year rating duration listed_in description
npartitions=1
int64 object object object object object object int64 object object object object
... ... ... ... ... ... ... ... ... ... ... ...
Dask Name: read-csv, 1 tasks
In [11]:
dd.head(1)
Out[11]:
show_id type title director cast country date_added release_year rating duration listed_in description
0 81145628 Movie Norm of the North: King Sized Adventure Richard Finn, Tim Maltby Alan Marriott, Andrew Toth, Brian Dobson, Cole... United States, India, South Korea, China September 9, 2019 2019 TV-PG 90 min Children & Family Movies, Comedies Before planning an awesome wedding for his gra...
In [12]:
dd.tail(1)
Out[12]:
show_id type title director cast country date_added release_year rating duration listed_in description
6233 70153404 TV Show Friends NaN Jennifer Aniston, Courteney Cox, Lisa Kudrow, ... United States NaN 2003 TV-14 10 Seasons Classic & Cult TV, TV Comedies This hit sitcom follows the merry misadventure...
In [13]:
dd.shape
Out[13]:
(Delayed('int-f166895e-231a-4261-b2de-5fd6325e3522'), 12)
Selecting colums in Dask
In [14]:
dd.columns
Out[14]:
Index(['show_id', 'type', 'title', 'director', 'cast', 'country', 'date_added',
       'release_year', 'rating', 'duration', 'listed_in', 'description'],
      dtype='object')
In [15]:
dd["show_id"].head(1)
Out[15]:
0    81145628
Name: show_id, dtype: int64
In [16]:
dd.show_id.head(1)
Out[16]:
0    81145628
Name: show_id, dtype: int64
In [17]:
dd[["show_id", "title"]].head(3)
Out[17]:
show_id title
0 81145628 Norm of the North: King Sized Adventure
1 80117401 Jandino: Whatever it Takes
2 70234439 Transformers Prime
In [20]:
dd['type'][dd["show_id"] == 70234439 ].head(1)
Out[20]:
2    TV Show
Name: type, dtype: object
Apply Function in dask
In [21]:
def toupper(x):
    return x.upper()
In [22]:
dd.title = dd["title"].map(toupper)
In [23]:
dd.title.head(2)
Out[23]:
0    NORM OF THE NORTH: KING SIZED ADVENTURE
1                 JANDINO: WHATEVER IT TAKES
Name: title, dtype: object
Apply across cluster
In [28]:
tem = [ result.result()  for result in client.map(toupper , dd["title"]) ]
In [31]:
import dask.array as da
permutations = da.from_array(np.asarray(tem))
In [33]:
dd['random'] = permutations
In [34]:
dd.head(1)
Out[34]:
show_id type title director cast country date_added release_year rating duration listed_in description random
0 81145628 Movie NORM OF THE NORTH: KING SIZED ADVENTURE Richard Finn, Tim Maltby Alan Marriott, Andrew Toth, Brian Dobson, Cole... United States, India, South Korea, China September 9, 2019 2019 TV-PG 90 min Children & Family Movies, Comedies Before planning an awesome wedding for his gra... NORM OF THE NORTH: KING SIZED ADVENTURE

Basics of Dask

In [35]:
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def add(x, y):
    sleep(1)
In [40]:
%%time
# This takes three seconds to run because we call each
# function sequentially, one after the other

x = inc(1)
y = inc(2)
z = add(x, y)
Wall time: 3 s
Parallelize with the dask.delayed decorator
In [41]:
# This runs immediately, all it does is build a graph
from dask import delayed

x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x, y)
In [42]:
%%time
# This actually runs our computation using a local thread pool

z.compute()
Wall time: 2.02 s
Exercise: Parallelize a for loop¶
In [44]:
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def add(x, y):
    sleep(1)

data = [1, 2, 3, 4, 5, 6, 7, 8]
In [45]:
%%time

# Sequential code
results = []
for x in data:
    y = inc(x)
    results.append(y)
    
total = sum(results)
Wall time: 8.01 s
In [46]:
%%time

results = []

for x in data:
    y = delayed(inc)(x)
    results.append(y)
    
total = delayed(sum)(results)
print("Before computing:", total)  # Let's see what type of thing total is
result = total.compute()
print("After computing :", result)  # After it's computed
Before computing: Delayed('sum-444b98df-760d-4339-bf76-10160ed74812')
After computing : 44
Wall time: 3.25 s
if you want to use decorators
In [47]:
from time import sleep

@delayed
def inc(x):
    sleep(1)
    return x + 1

@delayed
def add(x, y):
    sleep(1)
In [48]:
%%time

# this looks like ordinary code

x = inc(15)
y = inc(30)

total = add(x, y)
total.compute()
Wall time: 2.11 s
In [ ]:
 

Learn How to configure your Spark Session to Join Managed (S3 Table Buckets) and Unmanaged Iceberg Tables | Hands on Labs

test-tble-bucket-joins Learn How to configure your Spark Session to Join Managed (S...