Friday, July 12, 2024

Optimizing Analytics: Storing Athena Query Metrics in Hudi for Advanced Analysis and Audit using AWS Glue

athenahudi

Optimizing Analytics: Storing Athena Query Metrics in Hudi for Advanced Analysis and Audit using AWS Glue

In [2]:
%%configure -f
{
"conf": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet": "false",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.sql.legacy.pathOptionBehavior.enabled": "true",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
}
}
Current session configs: {'conf': {'spark.serializer': 'org.apache.spark.serializer.KryoSerializer', 'spark.sql.hive.convertMetastoreParquet': 'false', 'spark.sql.catalog.spark_catalog': 'org.apache.spark.sql.hudi.catalog.HoodieCatalog', 'spark.sql.legacy.pathOptionBehavior.enabled': 'true', 'spark.sql.extensions': 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension'}, 'kind': 'pyspark'}
No active sessions.

Create Hudi Tables

In [17]:
%%sql
CREATE TABLE IF NOT EXISTS athena_queries_hudi (
    Catalog STRING,
    CompletionDateTime STRING,
    DataScannedInBytes STRING,
    Database STRING,
    EffectiveEngineVersion STRING,
    EngineExecutionTimeInMillis STRING,
    OutputLocation STRING,
    Query STRING,
    QueryExecutionId STRING,
    QueryPlanningTimeInMillis STRING,
    QueryQueueTimeInMillis STRING,
    ResultReuseInformation_ReusedPreviousResult STRING,
    SelectedEngineVersion STRING,
    ServicePreProcessingTimeInMillis STRING,
    ServiceProcessingTimeInMillis STRING,
    State STRING,
    StatementType STRING,
    SubmissionDateTime STRING,
    TotalExecutionTimeInMillis STRING,
    WorkGroup STRING
)
USING Hudi
OPTIONS (
    primaryKey 'QueryExecutionId',
    precombineField 'QueryExecutionId',
    path 's3://soumilshah-dev-1995/silver/athena_queries_hudi'
)
PARTITIONED BY (Database);

Install Library athena-usage-metrics-extractor

In [10]:
%%bash
pip install athena-usage-metrics-extractor
Defaulting to user installation because normal site-packages is not writeable
Requirement already satisfied: athena-usage-metrics-extractor in /home/glue_user/.local/lib/python3.10/site-packages (1.2.0)
Requirement already satisfied: dateparser in /home/glue_user/.local/lib/python3.10/site-packages (from athena-usage-metrics-extractor) (1.2.0)
Requirement already satisfied: boto3 in /home/glue_user/.local/lib/python3.10/site-packages (from athena-usage-metrics-extractor) (1.24.70)
Requirement already satisfied: jmespath<2.0.0,>=0.7.1 in /home/glue_user/.local/lib/python3.10/site-packages (from boto3->athena-usage-metrics-extractor) (0.10.0)
Requirement already satisfied: botocore<1.28.0,>=1.27.70 in /home/glue_user/.local/lib/python3.10/site-packages (from boto3->athena-usage-metrics-extractor) (1.27.96)
Requirement already satisfied: s3transfer<0.7.0,>=0.6.0 in /home/glue_user/.local/lib/python3.10/site-packages (from boto3->athena-usage-metrics-extractor) (0.6.0)
Requirement already satisfied: regex!=2019.02.19,!=2021.8.27 in /home/glue_user/.local/lib/python3.10/site-packages (from dateparser->athena-usage-metrics-extractor) (2022.10.31)
Requirement already satisfied: python-dateutil in /home/glue_user/.local/lib/python3.10/site-packages (from dateparser->athena-usage-metrics-extractor) (2.8.2)
Requirement already satisfied: pytz in /home/glue_user/.local/lib/python3.10/site-packages (from dateparser->athena-usage-metrics-extractor) (2021.1)
Requirement already satisfied: tzlocal in /home/glue_user/.local/lib/python3.10/site-packages (from dateparser->athena-usage-metrics-extractor) (5.2)
Requirement already satisfied: urllib3<1.27,>=1.25.4 in /home/glue_user/.local/lib/python3.10/site-packages (from botocore<1.28.0,>=1.27.70->boto3->athena-usage-metrics-extractor) (1.25.11)
Requirement already satisfied: six>=1.5 in /home/glue_user/.local/lib/python3.10/site-packages (from python-dateutil->dateparser->athena-usage-metrics-extractor) (1.16.0)
[notice] A new release of pip is available: 23.0.1 -> 24.1.2
[notice] To update, run: python3 -m pip install --upgrade pip

Read all Metrics for athena query for given workgroup

In [54]:
import sys
from AthenaUsageExtractor import AthenaUsageExtractor

helper = AthenaUsageExtractor(
    aws_region='us-east-1',
    aws_access_key='<ACCESS KEY GOES HERE>',
    aws_secret_key='<SECRET KEY>'
)


dataset = list(helper.get_usage_for_date(date='2024-06-20', 
                                         workgroup='soumil'))
spark.createDataFrame(dataset).createOrReplaceTempView("new_data")
spark.sql("select count(*) from new_data").show()
+--------+
|count(1)|
+--------+
|      12|
+--------+

Storing Athena Query Metrics in Hudi

In [55]:
%%sql
INSERT INTO athena_queries_hudi
SELECT
    Catalog,
    CompletionDateTime,
    DataScannedInBytes,
    Database,
    EffectiveEngineVersion,
    EngineExecutionTimeInMillis,
    OutputLocation,
    Query,
    QueryExecutionId,
    QueryPlanningTimeInMillis,
    QueryQueueTimeInMillis,
    COALESCE(null, NULL) AS ResultReuseInformation_ReusedPreviousResult,  -- Adjust based on actual logic
    SelectedEngineVersion,
    ServiceProcessingTimeInMillis AS ServicePreProcessingTimeInMillis,
    ServiceProcessingTimeInMillis,
    State,
    StatementType,
    SubmissionDateTime,
    TotalExecutionTimeInMillis,
    WorkGroup
FROM new_data;

Analysis

Calculating Total Data Scanned by WorkGroup in Athena Queries

In [56]:
%%sql
SELECT WorkGroup, SUM(CAST(DataScannedInBytes AS BIGINT)) AS TotalDataScannedBytes
FROM athena_queries_hudi
GROUP BY WorkGroup;

Count of Queries Executed by WorkGroup in Athena

In [57]:
%%sql
SELECT WorkGroup, COUNT(*) AS NumQueriesExecuted
FROM athena_queries_hudi
GROUP BY WorkGroup;

Average Query Execution Time by WorkGroup in Athena

In [58]:
%%sql 
SELECT WorkGroup, AVG(TotalExecutionTimeInMillis) AS AvgExecutionTimeMillis
FROM athena_queries_hudi
GROUP BY WorkGroup;

Athena Query Performance Analysis by WorkGroup

In [59]:
%%sql 
SELECT
    WorkGroup,
    COUNT(*) AS NumQueries,
    AVG(TotalExecutionTimeInMillis) AS AvgExecutionTimeMillis,
    SUM(DataScannedInBytes) AS TotalDataScannedBytes
FROM athena_queries_hudi
GROUP BY WorkGroup;

Performance Analysis of Athena Queries by WorkGroup

In [60]:
%%sql
SELECT
    WorkGroup,
    COUNT(*) AS NumQueries,
    AVG(TotalExecutionTimeInMillis) AS AvgExecutionTimeMillis,
    SUM(DataScannedInBytes) AS TotalDataScannedBytes,
    MAX(TotalExecutionTimeInMillis) AS MaxExecutionTimeMillis,
    MIN(TotalExecutionTimeInMillis) AS MinExecutionTimeMillis
FROM athena_queries_hudi
GROUP BY WorkGroup;

Database and Table Usage Metrics in Athena Queries

In [61]:
%%sql
SELECT
    Database,
    COUNT(DISTINCT Query) AS UniqueQueries,
    COUNT(DISTINCT OutputLocation) AS UniqueOutputLocations,
    COUNT(DISTINCT QueryExecutionId) AS UniqueQueryExecutions,
    SUM(DataScannedInBytes) AS TotalDataScannedBytes,
    AVG(TotalExecutionTimeInMillis) AS AvgExecutionTimeMillis,
    SUM(CASE WHEN State = 'SUCCEEDED' THEN 1 ELSE 0 END) AS SuccessfulQueries,
    SUM(CASE WHEN State = 'FAILED' THEN 1 ELSE 0 END) AS FailedQueries,
    MIN(CompletionDateTime) AS EarliestCompletionTime,
    MAX(CompletionDateTime) AS LatestCompletionTime
FROM athena_queries_hudi
GROUP BY Database;

Detailed Analysis of Athena Query Performance Metrics

In [62]:
%%sql

SELECT
    Database,
    COUNT(DISTINCT Query) AS UniqueQueries,
    COUNT(DISTINCT OutputLocation) AS UniqueOutputLocations,
    COUNT(DISTINCT QueryExecutionId) AS UniqueQueryExecutions,
    SUM(DataScannedInBytes) AS TotalDataScannedBytes,
    AVG(TotalExecutionTimeInMillis) AS AvgExecutionTimeMillis,
    SUM(CASE WHEN State = 'SUCCEEDED' THEN 1 ELSE 0 END) AS SuccessfulQueries,
    SUM(CASE WHEN State = 'FAILED' THEN 1 ELSE 0 END) AS FailedQueries,
    MIN(CompletionDateTime) AS EarliestCompletionTime,
    MAX(CompletionDateTime) AS LatestCompletionTime,
    COUNT(DISTINCT StatementType) AS UniqueStatementTypes,
    SUM(EngineExecutionTimeInMillis) AS TotalEngineExecutionTime,
    AVG(ServiceProcessingTimeInMillis) AS AvgServiceProcessingTime
FROM athena_queries_hudi
GROUP BY Database;

Athena query by time

In [63]:
%%sql 
SELECT
    DATE_FORMAT(CompletionDateTime, 'yyyy-MM-dd') AS QueryDate,
    COUNT(*) AS TotalQueries,
    SUM(DataScannedInBytes) AS TotalDataScannedBytes
FROM athena_queries_hudi
GROUP BY DATE_FORMAT(CompletionDateTime, 'yyyy-MM-dd')
ORDER BY QueryDate;

Feel free to enhance your insights with additional queries and develop dashboards as required

No comments:

Post a Comment

How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide

publish How to Use Publish-Audit-Merge Workflow in Apache Iceberg: A Beginner’s Guide ¶ In [24]: from ...