Saturday, February 1, 2020

ETL Scripts To Migrate Data from S3 to Elastic Search

Untitled

ETL Scripts To Migrate Data from S3 to Elastic Search

In [94]:
try:
    import pandas as pd
    import os
    import requests
    import json
    import datetime
    import boto3

    import elasticsearch
    from elasticsearch import Elasticsearch
    from elasticsearch import helpers
    print("All Modules loaded ..... | Youtube Video :https://www.youtube.com/watch?v=P5vmmrQ3tPg   ")
except Exception as e:
    print("Some Modules are missing {}". format(e))

AWS Reader Class

In [95]:
class DataStructure(object):
    def __init__(self):
        self.data = {
            "Csv":[],
            "Pdf":[],
            "Gz":[],
            "Jpg":[],
            "Png":[],
            "Json":[],
            "other":[]
        }


class AWSS3(object):

    def __init__(self, BucketName = 'BUCKETNAME'):
        self.BucketName = BucketName
        self.client = boto3.client("s3")
        self.datastructure = DataStructure()

    def getFileData(self, Key = ''):

        """
        Reads the File and gets the Binary Data

        """

        response_new = self.client.get_object(Bucket=self.BucketName, Key=Key)
        MyData = response_new["Body"].read()
        return MyData


    def getFiles(self):
        response = self.client.list_objects(Bucket=self.BucketName)

        for X in response.get("Contents", None):

            x = X['Key']

            if '.gz' in x:
                self.datastructure.data["Gz"].append(x)

            if '.jpg' in x:
                self.datastructure.data["Jpg"].append(x)

            if '.png' in x:
                self.datastructure.data["Png"].append(x)

            if '.csv' in x:
                self.datastructure.data["Csv"].append(x)

            if '.json' in x:
                self.datastructure.data["Json"].append(x)
            else:
                self.datastructure.data["other"].append(x)

        return self.datastructure.data

Elastic Search Class

In [96]:
class ElasticConfig(object):
    def __init__(self):
        pass

    @staticmethod
    def connect_elasticsearch():
        es = None
        es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
        if es.ping():
            print('Yupiee  Connected ')
        else:
            print('Awww it could not connect!')
        return es

File reader class

In [97]:
class Reader(object):

    def __init__(self, FileName='', FileType = 'csv'):
        self.Filename = FileName
        self.FileType = FileType

    def getDF(self):

        """
        perform reading of GZ File
        :return: Pandas Dataframe
        """
        print("Reading File {} ".format(self.Filename))

        if self.FileType == "csv":
            df = pd.read_csv(self.Filename)
        if self.FileType == "json":
            df = pd.read_json(self.Filename, lines=True)

        print("Converting File to to Format ..........")
        df2 = df.to_dict('records')
        return df2

Generator Objects

In [98]:
class Generator(object):

    def __init__(self):
        pass

    @staticmethod
    def generator(df2, IndexName, Dtype):

        for c, line in enumerate(df2):
            yield {
                '_index': IndexName,
                '_type': Dtype,
                '_id': c,
                '_source': {
                    'avg_area_income':line.get('Avg. Area Income', ["No Data"]),
                }
            }
        raise StopIteration

Main File

Step 1: Read the all Items from S3 and store in hashMap

Step 2: Pop Items and get the Binary data

Step 3: Save the File as Temp.EXT EXT is File Extension

Step 4 Convert into pandas Dataframe

Step 5: Convert the Pandas Df into generator Objects

Step 6: Start ETL Scripts

Step 7 : We should also add logger class to Keep track of out Log Files

In [99]:
start = datetime.datetime.now()

aws = AWSS3(BucketName="BUCKETNAME")
Files  = aws.getFiles()
FilesCSV = Files['Csv']

for FileName in FilesCSV:
    FileCsvData = aws.getFileData(Key=FileName)
    with open("tem.csv", "wb") as f:
        f.write(FileCsvData)
        reader = Reader(FileName="tem.csv", FileType="csv")
        df = reader.getDF()
        
        try:
            print("Uploading Data to Elastic Search ............")
            res = helpers.bulk(ElasticConfig.connect_elasticsearch(),
                               Generator.generator(df, 'mydb', 'mytabledb'))
        except Exception as StopIteration:
            print("Done")
            end = datetime.datetime.now()
            print("Total Execution Time: {} ".format(end-start))
            
Reading File tem.csv 
Converting File to to Format ..........
Uploading Data to Elastic Search ............
Yupiee  Connected 
Done
Total Execution Time: 0:00:03.186330 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [ ]:
 
In [92]:
for x in os.listdir():
    print(x)
   
.ipynb_checkpoints
Untitled.ipynb
In [91]:
os.remove("tem.csv")
In [ ]:
 

1 comment:

  1. can we use simple requests lib of python to make a post request to upload data in elasticsearch indexes? please tell how to do so

    ReplyDelete

Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST endpoint

gluecat Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST e...