Saturday, January 30, 2021

Learn about Python Dask in a Easy Way

Youtube

Learning Dask With Python Distributed Computing

Authors

  • Soumil Nitin Shah

Soumil Nitin Shah

Bachelor in Electronic Engineering | Masters in Electrical Engineering | Master in Computer Engineering |

Hello! I’m Soumil Nitin Shah, a Software and Hardware Developer based in New York City. I have completed by Bachelor in Electronic Engineering and my Double master’s in Computer and Electrical Engineering. I Develop Python Based Cross Platform Desktop Application , Webpages , Software, REST API, Database and much more I have more than 2 Years of Experience in Python

Step 1: Installation of Dask
  • !pip install dask
  • !pip install cloudpickle
  • !pip install "dask[dataframe]"
  • !pip install "dask[complete]"
In [2]:
!pip show dask
Name: dask
Version: 2020.12.0
Summary: Parallel PyData with Task Scheduling
Home-page: https://github.com/dask/dask/
Author: None
Author-email: None
License: BSD
Location: c:\python38\lib\site-packages
Requires: pyyaml
Required-by: swifter, distributed
In [3]:
# Define the Imports 
try:
    import os
    import json
    import math
    import dask
    from dask.distributed import Client
    import dask.dataframe as dd
    import numpy as np
    import dask.multiprocessing
except Exception as e:
    print("Some Modules are Missing : {} ".format(e))
In [4]:
os.listdir()
Out[4]:
['.ipynb_checkpoints',
 'dask-worker-space',
 'netflix_titles.csv',
 'Stackoverflow-issue-',
 'Untitled.ipynb',
 'Youtube.ipynb']
In [5]:
size = os.path.getsize("netflix_titles.csv") / math.pow(1024,3)
print("Size in GB : {} ".format(size))
Size in GB : 0.0022451020777225494 
In [6]:
client = Client(n_workers=3, threads_per_worker=1, processes=False, memory_limit='2GB')
In [7]:
client
Out[7]:

Client

Cluster

  • Workers: 3
  • Cores: 3
  • Memory: 6.00 GB
Read a csv Files
In [8]:
dd = dd.read_csv("netflix_titles.csv")
In [9]:
dd
Out[9]:
Dask DataFrame Structure:
show_id type title director cast country date_added release_year rating duration listed_in description
npartitions=1
int64 object object object object object object int64 object object object object
... ... ... ... ... ... ... ... ... ... ... ...
Dask Name: read-csv, 1 tasks
In [11]:
dd.head(1)
Out[11]:
show_id type title director cast country date_added release_year rating duration listed_in description
0 81145628 Movie Norm of the North: King Sized Adventure Richard Finn, Tim Maltby Alan Marriott, Andrew Toth, Brian Dobson, Cole... United States, India, South Korea, China September 9, 2019 2019 TV-PG 90 min Children & Family Movies, Comedies Before planning an awesome wedding for his gra...
In [12]:
dd.tail(1)
Out[12]:
show_id type title director cast country date_added release_year rating duration listed_in description
6233 70153404 TV Show Friends NaN Jennifer Aniston, Courteney Cox, Lisa Kudrow, ... United States NaN 2003 TV-14 10 Seasons Classic & Cult TV, TV Comedies This hit sitcom follows the merry misadventure...
In [13]:
dd.shape
Out[13]:
(Delayed('int-f166895e-231a-4261-b2de-5fd6325e3522'), 12)
Selecting colums in Dask
In [14]:
dd.columns
Out[14]:
Index(['show_id', 'type', 'title', 'director', 'cast', 'country', 'date_added',
       'release_year', 'rating', 'duration', 'listed_in', 'description'],
      dtype='object')
In [15]:
dd["show_id"].head(1)
Out[15]:
0    81145628
Name: show_id, dtype: int64
In [16]:
dd.show_id.head(1)
Out[16]:
0    81145628
Name: show_id, dtype: int64
In [17]:
dd[["show_id", "title"]].head(3)
Out[17]:
show_id title
0 81145628 Norm of the North: King Sized Adventure
1 80117401 Jandino: Whatever it Takes
2 70234439 Transformers Prime
In [20]:
dd['type'][dd["show_id"] == 70234439 ].head(1)
Out[20]:
2    TV Show
Name: type, dtype: object
Apply Function in dask
In [21]:
def toupper(x):
    return x.upper()
In [22]:
dd.title = dd["title"].map(toupper)
In [23]:
dd.title.head(2)
Out[23]:
0    NORM OF THE NORTH: KING SIZED ADVENTURE
1                 JANDINO: WHATEVER IT TAKES
Name: title, dtype: object
Apply across cluster
In [28]:
tem = [ result.result()  for result in client.map(toupper , dd["title"]) ]
In [31]:
import dask.array as da
permutations = da.from_array(np.asarray(tem))
In [33]:
dd['random'] = permutations
In [34]:
dd.head(1)
Out[34]:
show_id type title director cast country date_added release_year rating duration listed_in description random
0 81145628 Movie NORM OF THE NORTH: KING SIZED ADVENTURE Richard Finn, Tim Maltby Alan Marriott, Andrew Toth, Brian Dobson, Cole... United States, India, South Korea, China September 9, 2019 2019 TV-PG 90 min Children & Family Movies, Comedies Before planning an awesome wedding for his gra... NORM OF THE NORTH: KING SIZED ADVENTURE

Basics of Dask

In [35]:
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def add(x, y):
    sleep(1)
In [40]:
%%time
# This takes three seconds to run because we call each
# function sequentially, one after the other

x = inc(1)
y = inc(2)
z = add(x, y)
Wall time: 3 s
Parallelize with the dask.delayed decorator
In [41]:
# This runs immediately, all it does is build a graph
from dask import delayed

x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x, y)
In [42]:
%%time
# This actually runs our computation using a local thread pool

z.compute()
Wall time: 2.02 s
Exercise: Parallelize a for loop¶
In [44]:
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def add(x, y):
    sleep(1)

data = [1, 2, 3, 4, 5, 6, 7, 8]
In [45]:
%%time

# Sequential code
results = []
for x in data:
    y = inc(x)
    results.append(y)
    
total = sum(results)
Wall time: 8.01 s
In [46]:
%%time

results = []

for x in data:
    y = delayed(inc)(x)
    results.append(y)
    
total = delayed(sum)(results)
print("Before computing:", total)  # Let's see what type of thing total is
result = total.compute()
print("After computing :", result)  # After it's computed
Before computing: Delayed('sum-444b98df-760d-4339-bf76-10160ed74812')
After computing : 44
Wall time: 3.25 s
if you want to use decorators
In [47]:
from time import sleep

@delayed
def inc(x):
    sleep(1)
    return x + 1

@delayed
def add(x, y):
    sleep(1)
In [48]:
%%time

# this looks like ordinary code

x = inc(15)
y = inc(30)

total = add(x, y)
total.compute()
Wall time: 2.11 s
In [ ]:
 

Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST endpoint

gluecat Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST e...