ETL Scripts To Migrate Data from S3 to Elastic Search¶
In [94]:
try:
import pandas as pd
import os
import requests
import json
import datetime
import boto3
import elasticsearch
from elasticsearch import Elasticsearch
from elasticsearch import helpers
print("All Modules loaded ..... | Youtube Video :https://www.youtube.com/watch?v=P5vmmrQ3tPg ")
except Exception as e:
print("Some Modules are missing {}". format(e))
AWS Reader Class¶
In [95]:
class DataStructure(object):
def __init__(self):
self.data = {
"Csv":[],
"Pdf":[],
"Gz":[],
"Jpg":[],
"Png":[],
"Json":[],
"other":[]
}
class AWSS3(object):
def __init__(self, BucketName = 'BUCKETNAME'):
self.BucketName = BucketName
self.client = boto3.client("s3")
self.datastructure = DataStructure()
def getFileData(self, Key = ''):
"""
Reads the File and gets the Binary Data
"""
response_new = self.client.get_object(Bucket=self.BucketName, Key=Key)
MyData = response_new["Body"].read()
return MyData
def getFiles(self):
response = self.client.list_objects(Bucket=self.BucketName)
for X in response.get("Contents", None):
x = X['Key']
if '.gz' in x:
self.datastructure.data["Gz"].append(x)
if '.jpg' in x:
self.datastructure.data["Jpg"].append(x)
if '.png' in x:
self.datastructure.data["Png"].append(x)
if '.csv' in x:
self.datastructure.data["Csv"].append(x)
if '.json' in x:
self.datastructure.data["Json"].append(x)
else:
self.datastructure.data["other"].append(x)
return self.datastructure.data
Elastic Search Class¶
In [96]:
class ElasticConfig(object):
def __init__(self):
pass
@staticmethod
def connect_elasticsearch():
es = None
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
if es.ping():
print('Yupiee Connected ')
else:
print('Awww it could not connect!')
return es
File reader class¶
In [97]:
class Reader(object):
def __init__(self, FileName='', FileType = 'csv'):
self.Filename = FileName
self.FileType = FileType
def getDF(self):
"""
perform reading of GZ File
:return: Pandas Dataframe
"""
print("Reading File {} ".format(self.Filename))
if self.FileType == "csv":
df = pd.read_csv(self.Filename)
if self.FileType == "json":
df = pd.read_json(self.Filename, lines=True)
print("Converting File to to Format ..........")
df2 = df.to_dict('records')
return df2
Generator Objects¶
In [98]:
class Generator(object):
def __init__(self):
pass
@staticmethod
def generator(df2, IndexName, Dtype):
for c, line in enumerate(df2):
yield {
'_index': IndexName,
'_type': Dtype,
'_id': c,
'_source': {
'avg_area_income':line.get('Avg. Area Income', ["No Data"]),
}
}
raise StopIteration
Main File¶
Step 1: Read the all Items from S3 and store in hashMap¶
Step 2: Pop Items and get the Binary data¶
Step 3: Save the File as Temp.EXT EXT is File Extension¶
Step 4 Convert into pandas Dataframe¶
Step 5: Convert the Pandas Df into generator Objects¶
Step 6: Start ETL Scripts¶
Step 7 : We should also add logger class to Keep track of out Log Files¶
In [99]:
start = datetime.datetime.now()
aws = AWSS3(BucketName="BUCKETNAME")
Files = aws.getFiles()
FilesCSV = Files['Csv']
for FileName in FilesCSV:
FileCsvData = aws.getFileData(Key=FileName)
with open("tem.csv", "wb") as f:
f.write(FileCsvData)
reader = Reader(FileName="tem.csv", FileType="csv")
df = reader.getDF()
try:
print("Uploading Data to Elastic Search ............")
res = helpers.bulk(ElasticConfig.connect_elasticsearch(),
Generator.generator(df, 'mydb', 'mytabledb'))
except Exception as StopIteration:
print("Done")
end = datetime.datetime.now()
print("Total Execution Time: {} ".format(end-start))
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [ ]:
In [92]:
for x in os.listdir():
print(x)
In [91]:
os.remove("tem.csv")
In [ ]:
can we use simple requests lib of python to make a post request to upload data in elasticsearch indexes? please tell how to do so
ReplyDelete