Saturday, February 1, 2020

ETL Scripts To Migrate Data from S3 to Elastic Search

Untitled

ETL Scripts To Migrate Data from S3 to Elastic Search

In [94]:
try:
    import pandas as pd
    import os
    import requests
    import json
    import datetime
    import boto3

    import elasticsearch
    from elasticsearch import Elasticsearch
    from elasticsearch import helpers
    print("All Modules loaded ..... | Youtube Video :https://www.youtube.com/watch?v=P5vmmrQ3tPg   ")
except Exception as e:
    print("Some Modules are missing {}". format(e))

AWS Reader Class

In [95]:
class DataStructure(object):
    def __init__(self):
        self.data = {
            "Csv":[],
            "Pdf":[],
            "Gz":[],
            "Jpg":[],
            "Png":[],
            "Json":[],
            "other":[]
        }


class AWSS3(object):

    def __init__(self, BucketName = 'BUCKETNAME'):
        self.BucketName = BucketName
        self.client = boto3.client("s3")
        self.datastructure = DataStructure()

    def getFileData(self, Key = ''):

        """
        Reads the File and gets the Binary Data

        """

        response_new = self.client.get_object(Bucket=self.BucketName, Key=Key)
        MyData = response_new["Body"].read()
        return MyData


    def getFiles(self):
        response = self.client.list_objects(Bucket=self.BucketName)

        for X in response.get("Contents", None):

            x = X['Key']

            if '.gz' in x:
                self.datastructure.data["Gz"].append(x)

            if '.jpg' in x:
                self.datastructure.data["Jpg"].append(x)

            if '.png' in x:
                self.datastructure.data["Png"].append(x)

            if '.csv' in x:
                self.datastructure.data["Csv"].append(x)

            if '.json' in x:
                self.datastructure.data["Json"].append(x)
            else:
                self.datastructure.data["other"].append(x)

        return self.datastructure.data

Elastic Search Class

In [96]:
class ElasticConfig(object):
    def __init__(self):
        pass

    @staticmethod
    def connect_elasticsearch():
        es = None
        es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
        if es.ping():
            print('Yupiee  Connected ')
        else:
            print('Awww it could not connect!')
        return es

File reader class

In [97]:
class Reader(object):

    def __init__(self, FileName='', FileType = 'csv'):
        self.Filename = FileName
        self.FileType = FileType

    def getDF(self):

        """
        perform reading of GZ File
        :return: Pandas Dataframe
        """
        print("Reading File {} ".format(self.Filename))

        if self.FileType == "csv":
            df = pd.read_csv(self.Filename)
        if self.FileType == "json":
            df = pd.read_json(self.Filename, lines=True)

        print("Converting File to to Format ..........")
        df2 = df.to_dict('records')
        return df2

Generator Objects

In [98]:
class Generator(object):

    def __init__(self):
        pass

    @staticmethod
    def generator(df2, IndexName, Dtype):

        for c, line in enumerate(df2):
            yield {
                '_index': IndexName,
                '_type': Dtype,
                '_id': c,
                '_source': {
                    'avg_area_income':line.get('Avg. Area Income', ["No Data"]),
                }
            }
        raise StopIteration

Main File

Step 1: Read the all Items from S3 and store in hashMap

Step 2: Pop Items and get the Binary data

Step 3: Save the File as Temp.EXT EXT is File Extension

Step 4 Convert into pandas Dataframe

Step 5: Convert the Pandas Df into generator Objects

Step 6: Start ETL Scripts

Step 7 : We should also add logger class to Keep track of out Log Files

In [99]:
start = datetime.datetime.now()

aws = AWSS3(BucketName="BUCKETNAME")
Files  = aws.getFiles()
FilesCSV = Files['Csv']

for FileName in FilesCSV:
    FileCsvData = aws.getFileData(Key=FileName)
    with open("tem.csv", "wb") as f:
        f.write(FileCsvData)
        reader = Reader(FileName="tem.csv", FileType="csv")
        df = reader.getDF()
        
        try:
            print("Uploading Data to Elastic Search ............")
            res = helpers.bulk(ElasticConfig.connect_elasticsearch(),
                               Generator.generator(df, 'mydb', 'mytabledb'))
        except Exception as StopIteration:
            print("Done")
            end = datetime.datetime.now()
            print("Total Execution Time: {} ".format(end-start))
            
Reading File tem.csv 
Converting File to to Format ..........
Uploading Data to Elastic Search ............
Yupiee  Connected 
Done
Total Execution Time: 0:00:03.186330 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [92]:
for x in os.listdir():
    print(x)
   
.ipynb_checkpoints
Untitled.ipynb
In [91]:
os.remove("tem.csv")
In [ ]:
 

1 comment:

  1. can we use simple requests lib of python to make a post request to upload data in elasticsearch indexes? please tell how to do so

    ReplyDelete

How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide

publish How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide ¶ In [24]: from ...