Saturday, May 16, 2020

Parallelizing word2vec in Python with Threadding

Threadding

Parallelizing word2vec in Python with Threadding

In [1]:
try:
    import elasticsearch
    from elasticsearch import Elasticsearch
    
    import pandas as pd
    import json
    from ast import literal_eval
    from tqdm import tqdm
    import datetime
    import os
    import sys
    import numpy as np
    
    from elasticsearch import helpers
    print("Loaded  .. . . . . . . .")
except Exception as E:
    print("Some Modules are Missing {} ".format(e))
Loaded  .. . . . . . . .
In [2]:
df= pd.read_csv("netflix_titles.csv")
In [3]:
titles = df["title"].to_list()
In [40]:
len(titles)
Out[40]:
6234
In [14]:
import tensorflow as tf
import tensorflow_hub as hub
from datetime import datetime

start = datetime.now()
module_url = "https://tfhub.dev/google/nnlm-en-dim128/2"
embed = hub.KerasLayer(module_url)

vector = []

for c, title in enumerate(titles):
    x = tf.constant([title])
    embeddings = embed(x)
    x = np.asarray(embeddings)
    x = x[0].tolist()
    vector.append(x)
end = datetime.now()
print("Execution time : {} ".format(end-start))
Execution time : 0:00:07.202030 

Processing with Threadding

In [56]:
from queue import PriorityQueue
In [57]:
pqueue = PriorityQueue()
In [58]:
def threadding_work(c, title):
    x = tf.constant([title])
    embeddings = embed(x)
    x = np.asarray(embeddings)
    x = x[0].tolist()
    pqueue.put((c, x))
In [62]:
import threading

start = datetime.now()

threadsPool = []
data = []

for c, title in enumerate(titles):
    t = threading.Thread(target=threadding_work, args=(c, title))
    threadsPool.append(t)

print(len(threadsPool))
for thread in threadsPool:
    thread.start()

for thread in threadsPool:
    thread.join()

while not pqueue.empty():
    data.append(pqueue.get())

end = datetime.now()
print("Execution time : {} ".format(end-start))
print(len(data))
6234
Execution time : 0:00:09.526026 
6234

processing in Parallel

In [25]:
from gensim.test.utils import common_texts, get_tmpfile
from gensim.models import Word2Vec

path = get_tmpfile("word2vec.model")
model = Word2Vec(common_texts, size=100, window=5, min_count=1, workers=4)
model.save("word2vec.model")

model = Word2Vec.load("word2vec.model")
pqueue1 = PriorityQueue()


def threadding_gensim_work(c, title):
    model.train([[title]], total_examples=1, epochs=1)
    vector = model.wv['computer']  # numpy vector of a word
    x = np.asarray(vector)
    x=  x[0].tolist()
    pqueue1.put((c, x))
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
In [65]:
import threading
start = datetime.now()

threadspool = []

for c, title in enumerate(titles):
    t = threading.Thread(target=threadding_gensim_work, args=(c, title))
    threadspool.append(t)
  
for thread in threadspool:
    thread.start()
    
for thread in threadspool:
    thread.join()

data1 = []

while not pqueue1.empty():
    data1.append(pqueue1.get())
    
end = datetime.now()
print("Execution time : {} ".format(end-start))
print(len(data1))
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
h_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay

WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay

WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay


WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay

WARNING:gensim.models.base_any2vec:under 10 jobs per worker: consider setting a smaller `batch_words' for smoother alpha decay
Execution time : 0:01:31.807310 
6234
In [66]:
len(data1)
Out[66]:
6234

No comments:

Post a Comment

Developer Guide: Getting Started with Flink (PyFlink) and Hudi - Setting Up Your Local Environment and Performing CRUD Operations via flink

flink-hudi-final Install Flink and Python ¶ conda info --envs # Create ENV conda ...