Wednesday, January 13, 2021

Different Ways of Pandas apply function parallelize | Speed Up

Learn Pandas

Pandas apply function parallelize

Soumil Nitin Shah

Bachelor in Electronic Engineering | Masters in Electrical Engineering | Master in Computer Engineering |

We have got a huge pandas data frame, and we want to apply a complex function to it which takes a lot of time.
In [1]:
import os
import json
import pandas as pd
import tqdm
import datetime
import time
In [4]:
df  = pd.read_csv("netflix_titles.csv",chunksize=20)
df = df.dropna()
In [5]:
df.shape
Out[5]:
(3771, 12)
In [6]:
df.head(2)
Out[6]:
show_id type title director cast country date_added release_year rating duration listed_in description
0 81145628 Movie Norm of the North: King Sized Adventure Richard Finn, Tim Maltby Alan Marriott, Andrew Toth, Brian Dobson, Cole... United States, India, South Korea, China 9-Sep-19 2019.0 TV-PG 90 min Children & Family Movies, Comedies Before planning an awesome wedding for his gra...
4 80125979 Movie #realityhigh Fernando Lebrija Nesta Cooper, Kate Walsh, John Michael Higgins... United States 8-Sep-17 2017.0 TV-14 99 min Comedies When nerdy high schooler Dani finally attracts...
Say i need to apply Several transformation on my Data lets take a simple example
In [11]:
def some_expensive_computation(x):
    time.sleep(0.01)
    return x

def apply_transform(df):
    df['title'] = df['title'].apply(lambda x: x.lower())
    df['director'] = df['director'].apply(lambda x: x.lower())
    df['cast'] = df['cast'].apply(lambda x: x.lower())
    df['description'] = df['description'].apply(some_expensive_computation)
    return df
In [12]:
start = datetime.datetime.now()
df1 = apply_transform(df)
end = datetime.datetime.now()

print("-"*44)
print("Execution Time: {} ".format(end-start))
print("-"*44)
--------------------------------------------
Execution Time: 0:00:39.716516 
--------------------------------------------

First issue is i have no idea which column is processing as i dont have any visual output well why not use tqdm

Well just changing apply to progress_apply will gie you nice progress bar atleast you know which column its processing
In [13]:
from tqdm import tqdm
tqdm.pandas()
c:\python38\lib\site-packages\tqdm\std.py:668: FutureWarning: The Panel class is removed from pandas. Accessing it from the top-level namespace will also be removed in the next version
  from pandas import Panel
In [14]:
def some_expensive_computation(x):
    time.sleep(0.01)
    return x

def apply_transform(df):
    df['title'] = df['title'].progress_apply(lambda x: x.lower())
    df['director'] = df['director'].progress_apply(lambda x: x.lower())
    df['cast'] = df['cast'].progress_apply(lambda x: x.lower())
    df['description'] = df['description'].progress_apply(some_expensive_computation)
    return df
In [15]:
start = datetime.datetime.now()
df1 = apply_transform(df)
end = datetime.datetime.now()

print("-"*44)
print("Execution Time: {} ".format(end-start))
print("-"*44)
100%|███████████████████████████████████████████████████████████████████████| 3771/3771 [00:00<00:00, 967857.08it/s]
100%|██████████████████████████████████████████████████████████████████████| 3771/3771 [00:00<00:00, 1257290.97it/s]
100%|███████████████████████████████████████████████████████████████████████| 3771/3771 [00:00<00:00, 763502.63it/s]
100%|███████████████████████████████████████████████████████████████████████████| 3771/3771 [00:40<00:00, 93.50it/s]
--------------------------------------------
Execution Time: 0:00:40.360840 
--------------------------------------------

Well ok now i see a progress bar atleast hmm how can i speed up and what are different things i can use ?

Method 1

In [1]:
from multiprocessing import  Pool
import numpy as np
import os
import json
import pandas as pd
import tqdm
import datetime
import time
import swifter



df  = pd.read_csv("netflix_titles.csv")
df = df.dropna()
In [2]:
def some_expensive_computation(x):
    time.sleep(0.01)
    return x

def apply_transform(df):
    df['title'] = df['title'].swifter.apply(lambda x: x.lower())
    df['director'] = df['director'].swifter.apply(lambda x: x.lower())
    df['cast'] = df['cast'].swifter.apply(lambda x: x.lower())
    df['description'] = df['description'].swifter.apply(some_expensive_computation)
    return df
In [3]:
start = datetime.datetime.now()
df1 = apply_transform(df)
end = datetime.datetime.now()

print("-"*44)
print("Execution Time: {} ".format(end-start))
print("-"*44)


--------------------------------------------
Execution Time: 0:00:00.437838 
--------------------------------------------

Method 2

  • try changing core number and you might get results that vary
In [1]:
from multiprocessing import  Pool
import numpy as np
import os
import json
import pandas as pd
import tqdm
import datetime
import time
import mapply

import time


df  = pd.read_csv("netflix_titles.csv")
df = df.dropna()
mapply.init(n_workers=1)
In [2]:
def some_expensive_computation(x):
    time.sleep(0.01)
    return x

def apply_transform(df):
    df['title'] = df['title'].mapply(lambda x: x.lower())
    df['director'] = df['director'].mapply(lambda x: x.lower())
    df['cast'] = df['cast'].mapply(lambda x: x.lower())
    df['description'] = df['description'].mapply(some_expensive_computation)
    return df
In [3]:
start = datetime.datetime.now()
df1 = apply_transform(df)
end = datetime.datetime.now()

print("-"*44)
print("Execution Time: {} ".format(end-start))
print("-"*44)



--------------------------------------------
Execution Time: 0:00:39.700108 
--------------------------------------------

Method 3

Use Dask plugin

Method 4
In [1]:
from multiprocessing import  Pool
import numpy as np
import os
import json
import pandas as pd
import tqdm
import datetime
import time
import mapply

import time
from multiprocessing import  Pool

df  = pd.read_csv("netflix_titles.csv")
df = df.dropna()
In [2]:
def apply_transform(df):
    df['title'] = df['title'].mapply(lambda x: x.lower())
    df['director'] = df['director'].mapply(lambda x: x.lower())
    df['cast'] = df['cast'].mapply(lambda x: x.lower())
    return df
In [3]:
def parallelize_dataframe(df, func, n_cores=4):
    df_split = np.array_split(df, n_cores)
    pool = Pool(n_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df
In [ ]:
df = parallelize_dataframe(df, apply_transform)

Conclusion

Based on references i feel Dask Map_Partition and Swifter almost takes the same time to apply this method and compute the result for all the rows

1 comment:

  1. Wynn casino opening delayed for several years
    ATLANTIC CITY (AP) — The casino operator's 태백 출장샵 opening 계룡 출장샵 time on Jan. 익산 출장마사지 22 was due to be delayed until Jan. 27 의정부 출장샵 due 의왕 출장샵 to concerns that staff may be

    ReplyDelete

Learn How to Read Hudi Tables on S3 Locally in Your PySpark Environment | Essential Packages You Need to Use

sprksql-2 Learn How to Read Hudi Tables on S3 Locally in Your PySpark Environment |...