Tuesday, June 30, 2020

4 Ways to do Pagination or scrolling in Elastic Search Tutorials

ELK

Elastic Search Tutorials

4 Ways to do Pagination or scrolling in Elastic Search Tutorials

Method 1:

Step 1:

In [1]:
try:
    import os
    import sys
    
    import elasticsearch
    from elasticsearch import Elasticsearch 
    import pandas as pd
    
    print("All Modules Loaded ! ")
except Exception as e:
    print("Some Modules are Missing {}".format(e))
    
All Modules Loaded ! 

Step 2:

In [2]:
def connect_elasticsearch():
    es = None
    es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
    if es.ping():
        print('Yupiee  Connected ')
    else:
        print('Awww it could not connect!')
    return es
es = connect_elasticsearch()
Yupiee  Connected 

Step 3: Define Query

In [3]:
myquey = {
   "_source": [],
   "size": 10,
   "query": {
      "bool": {
         "must": [],
         "filter": [
            {
               "exists": {
                  "field": "director"
               }
            }
         ],
         "should": [
            {
               "match_phrase": {
                  "director": "Richard "
               }
            }
         ],
         "must_not": []
      }
   }
}

Step 4:

  • index -> name of the index name
  • Scroll -> How long you want the scroll to stay in his case 2m
  • Size -> How many records you need in each cycle
  • Body-> ELK Query
In [4]:
res = es.search(
  index = 'netflix',
  scroll = '2m',
  size = 10,
  body = myquey)
In [5]:
counter = 0 
sid =  res["_scroll_id"]
scroll_size = res['hits']['total']
scroll_size = scroll_size['value']


# Start scrolling
while (scroll_size > 0):

    #print("Scrolling...")
    page = es.scroll(scroll_id = sid, scroll = '10m')
    
    #print("Hits : ",len(page["hits"]["hits"]))
    
    # Update the scroll ID
    sid = page['_scroll_id']

    # Get the number of results that we returned in the last scroll
    scroll_size = len(page['hits']['hits'])

    #print("Scroll Size {} ".format(scroll_size))
    
    # Do something with the obtained page
    counter = counter + 1

print("Total Pages  : {}".format(counter))
    
Total Pages  : 427

Method 2:

  • the idea Goes Like this we need to map page number and we divide the search into parts
  • say the size was 500
  • Page 1 -> 0-10
  • Page 2 -> 10-20
  • All this time the size is same the query is same all that is changing is from and to word
  • think this way you will only query once and create a hashmap key are page number and value would be sliced Records
In [6]:
res = es.search(
  index = 'netflix',
  size = 100,
  body = myquey)
In [7]:
data = res["hits"]["hits"]
In [17]:
hashmap = {}
In [ ]:
step = 2
hashmap = {}
for i in range(len(data)):
    if i==0:
        hashmap[i] = data[0:step]
    else:
        startIndex = step * i
        EndIndex =  ((i+1) * (step))
        sample = data[startIndex:EndIndex]
        hashmap[i] = sample

Method 3:

  • The approach we are taking here is basically
  • page 1 correspond to from 0
  • page 2 correspond to from 10
  • idea is eevry page keep increment the from varibale

First Time Query Becomes

In [ ]:
myquey = {
   "_source": [],
   "size": 10,
    "from":0
   "query": {
      "bool": {
         "must": [],
         "filter": [
            {
               "exists": {
                  "field": "director"
               }
            }
         ],
         "should": [
            {
               "match_phrase": {
                  "director": "Richard "
               }
            }
         ],
         "must_not": []
      }
   }
}

Secodn Time Query Becomes

In [20]:
myquey = {
   "_source": [],
   "size": 10,
    "from":10,
   "query": {
      "bool": {
         "must": [],
         "filter": [
            {
               "exists": {
                  "field": "director"
               }
            }
         ],
         "should": [
            {
               "match_phrase": {
                  "director": "Richard "
               }
            }
         ],
         "must_not": []
      }
   }
}

Method 4: Search After Query

In [19]:
myquey = {
   "_source": [],
   "size": 10,
   "query": {
      "bool": {
         "must": [],
         "filter": [
            {
               "exists": {
                  "field": "director"
               }
            }
         ],
         "should": [
            {
               "match_phrase": {
                  "director": "Richard "
               }
            }
         ],
         "must_not": []
      }
   }
}
In [21]:
res = es.search(
  index = 'netflix',
  size = 100,
  body = myquey)
In [22]:
def create_scroll(res):
    """
    :param res: json
    :return: string
    """

    try:
        data = res.get("hits", None).get("hits", None)
        data = data[-1]
        score = data.get("_score", None)
        scroll_id_ = data.get("_id", None)
        unique_scroll_id = "{},{}".format(score, scroll_id_)
        return unique_scroll_id
    except Exception as e:
        return "Error,scroll error "
In [23]:
scroll = create_scroll(res)

This is out unique Scroll

In [24]:
scroll
Out[24]:
'0.0,8URc93IB135PBBnB55dH'

Next time we will pass this scroll

In [25]:
score, scroll_id = scroll.split(",")
In [26]:
myquey["search_after"] = [score, scroll_id]
myquey["sort"] = [{"_score": "desc", "_id": "desc"}]

New Query for next page becomes

In [ ]:
new_query ={
   "_source":[

   ],
   "size":10,
   "query":{
      "bool":{
         "must":[

         ],
         "filter":[
            {
               "exists":{
                  "field":"director"
               }
            }
         ],
         "should":[
            {
               "match_phrase":{
                  "director":"Richard "
               }
            }
         ],
         "must_not":[

         ]
      }
   },
   "search_after":[
      "0.0",
      "8URc93IB135PBBnB55dH"
   ],
   "sort":[
      {
         "_score":"desc",
         "_id":"desc"
      }
   ]
}

Now perfrom the search on Elastic search and you will get the result

  • make sure again create a scroll and then next page keep reperating the process

Please Dont Forget to Like and Share the Article if found useful

Learn How to configure your Spark Session to Join Managed (S3 Table Buckets) and Unmanaged Iceberg Tables | Hands on Labs

test-tble-bucket-joins Learn How to configure your Spark Session to Join Managed (S...