Friday, July 12, 2024

Optimizing Analytics: Storing Athena Query Metrics in Hudi for Advanced Analysis and Audit using AWS Glue

athenahudi

Optimizing Analytics: Storing Athena Query Metrics in Hudi for Advanced Analysis and Audit using AWS Glue

In [2]:
%%configure -f
{
"conf": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet": "false",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.sql.legacy.pathOptionBehavior.enabled": "true",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
}
}
Current session configs: {'conf': {'spark.serializer': 'org.apache.spark.serializer.KryoSerializer', 'spark.sql.hive.convertMetastoreParquet': 'false', 'spark.sql.catalog.spark_catalog': 'org.apache.spark.sql.hudi.catalog.HoodieCatalog', 'spark.sql.legacy.pathOptionBehavior.enabled': 'true', 'spark.sql.extensions': 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension'}, 'kind': 'pyspark'}
No active sessions.

Create Hudi Tables

In [17]:
%%sql
CREATE TABLE IF NOT EXISTS athena_queries_hudi (
    Catalog STRING,
    CompletionDateTime STRING,
    DataScannedInBytes STRING,
    Database STRING,
    EffectiveEngineVersion STRING,
    EngineExecutionTimeInMillis STRING,
    OutputLocation STRING,
    Query STRING,
    QueryExecutionId STRING,
    QueryPlanningTimeInMillis STRING,
    QueryQueueTimeInMillis STRING,
    ResultReuseInformation_ReusedPreviousResult STRING,
    SelectedEngineVersion STRING,
    ServicePreProcessingTimeInMillis STRING,
    ServiceProcessingTimeInMillis STRING,
    State STRING,
    StatementType STRING,
    SubmissionDateTime STRING,
    TotalExecutionTimeInMillis STRING,
    WorkGroup STRING
)
USING Hudi
OPTIONS (
    primaryKey 'QueryExecutionId',
    precombineField 'QueryExecutionId',
    path 's3://soumilshah-dev-1995/silver/athena_queries_hudi'
)
PARTITIONED BY (Database);

Install Library athena-usage-metrics-extractor

In [10]:
%%bash
pip install athena-usage-metrics-extractor
Defaulting to user installation because normal site-packages is not writeable
Requirement already satisfied: athena-usage-metrics-extractor in /home/glue_user/.local/lib/python3.10/site-packages (1.2.0)
Requirement already satisfied: dateparser in /home/glue_user/.local/lib/python3.10/site-packages (from athena-usage-metrics-extractor) (1.2.0)
Requirement already satisfied: boto3 in /home/glue_user/.local/lib/python3.10/site-packages (from athena-usage-metrics-extractor) (1.24.70)
Requirement already satisfied: jmespath<2.0.0,>=0.7.1 in /home/glue_user/.local/lib/python3.10/site-packages (from boto3->athena-usage-metrics-extractor) (0.10.0)
Requirement already satisfied: botocore<1.28.0,>=1.27.70 in /home/glue_user/.local/lib/python3.10/site-packages (from boto3->athena-usage-metrics-extractor) (1.27.96)
Requirement already satisfied: s3transfer<0.7.0,>=0.6.0 in /home/glue_user/.local/lib/python3.10/site-packages (from boto3->athena-usage-metrics-extractor) (0.6.0)
Requirement already satisfied: regex!=2019.02.19,!=2021.8.27 in /home/glue_user/.local/lib/python3.10/site-packages (from dateparser->athena-usage-metrics-extractor) (2022.10.31)
Requirement already satisfied: python-dateutil in /home/glue_user/.local/lib/python3.10/site-packages (from dateparser->athena-usage-metrics-extractor) (2.8.2)
Requirement already satisfied: pytz in /home/glue_user/.local/lib/python3.10/site-packages (from dateparser->athena-usage-metrics-extractor) (2021.1)
Requirement already satisfied: tzlocal in /home/glue_user/.local/lib/python3.10/site-packages (from dateparser->athena-usage-metrics-extractor) (5.2)
Requirement already satisfied: urllib3<1.27,>=1.25.4 in /home/glue_user/.local/lib/python3.10/site-packages (from botocore<1.28.0,>=1.27.70->boto3->athena-usage-metrics-extractor) (1.25.11)
Requirement already satisfied: six>=1.5 in /home/glue_user/.local/lib/python3.10/site-packages (from python-dateutil->dateparser->athena-usage-metrics-extractor) (1.16.0)
[notice] A new release of pip is available: 23.0.1 -> 24.1.2
[notice] To update, run: python3 -m pip install --upgrade pip

Read all Metrics for athena query for given workgroup

In [54]:
import sys
from AthenaUsageExtractor import AthenaUsageExtractor

helper = AthenaUsageExtractor(
    aws_region='us-east-1',
    aws_access_key='<ACCESS KEY GOES HERE>',
    aws_secret_key='<SECRET KEY>'
)


dataset = list(helper.get_usage_for_date(date='2024-06-20', 
                                         workgroup='soumil'))
spark.createDataFrame(dataset).createOrReplaceTempView("new_data")
spark.sql("select count(*) from new_data").show()
+--------+
|count(1)|
+--------+
|      12|
+--------+

Storing Athena Query Metrics in Hudi

In [55]:
%%sql
INSERT INTO athena_queries_hudi
SELECT
    Catalog,
    CompletionDateTime,
    DataScannedInBytes,
    Database,
    EffectiveEngineVersion,
    EngineExecutionTimeInMillis,
    OutputLocation,
    Query,
    QueryExecutionId,
    QueryPlanningTimeInMillis,
    QueryQueueTimeInMillis,
    COALESCE(null, NULL) AS ResultReuseInformation_ReusedPreviousResult,  -- Adjust based on actual logic
    SelectedEngineVersion,
    ServiceProcessingTimeInMillis AS ServicePreProcessingTimeInMillis,
    ServiceProcessingTimeInMillis,
    State,
    StatementType,
    SubmissionDateTime,
    TotalExecutionTimeInMillis,
    WorkGroup
FROM new_data;

Analysis

Calculating Total Data Scanned by WorkGroup in Athena Queries

In [56]:
%%sql
SELECT WorkGroup, SUM(CAST(DataScannedInBytes AS BIGINT)) AS TotalDataScannedBytes
FROM athena_queries_hudi
GROUP BY WorkGroup;

Count of Queries Executed by WorkGroup in Athena

In [57]:
%%sql
SELECT WorkGroup, COUNT(*) AS NumQueriesExecuted
FROM athena_queries_hudi
GROUP BY WorkGroup;

Average Query Execution Time by WorkGroup in Athena

In [58]:
%%sql 
SELECT WorkGroup, AVG(TotalExecutionTimeInMillis) AS AvgExecutionTimeMillis
FROM athena_queries_hudi
GROUP BY WorkGroup;

Athena Query Performance Analysis by WorkGroup

In [59]:
%%sql 
SELECT
    WorkGroup,
    COUNT(*) AS NumQueries,
    AVG(TotalExecutionTimeInMillis) AS AvgExecutionTimeMillis,
    SUM(DataScannedInBytes) AS TotalDataScannedBytes
FROM athena_queries_hudi
GROUP BY WorkGroup;

Performance Analysis of Athena Queries by WorkGroup

In [60]:
%%sql
SELECT
    WorkGroup,
    COUNT(*) AS NumQueries,
    AVG(TotalExecutionTimeInMillis) AS AvgExecutionTimeMillis,
    SUM(DataScannedInBytes) AS TotalDataScannedBytes,
    MAX(TotalExecutionTimeInMillis) AS MaxExecutionTimeMillis,
    MIN(TotalExecutionTimeInMillis) AS MinExecutionTimeMillis
FROM athena_queries_hudi
GROUP BY WorkGroup;

Database and Table Usage Metrics in Athena Queries

In [61]:
%%sql
SELECT
    Database,
    COUNT(DISTINCT Query) AS UniqueQueries,
    COUNT(DISTINCT OutputLocation) AS UniqueOutputLocations,
    COUNT(DISTINCT QueryExecutionId) AS UniqueQueryExecutions,
    SUM(DataScannedInBytes) AS TotalDataScannedBytes,
    AVG(TotalExecutionTimeInMillis) AS AvgExecutionTimeMillis,
    SUM(CASE WHEN State = 'SUCCEEDED' THEN 1 ELSE 0 END) AS SuccessfulQueries,
    SUM(CASE WHEN State = 'FAILED' THEN 1 ELSE 0 END) AS FailedQueries,
    MIN(CompletionDateTime) AS EarliestCompletionTime,
    MAX(CompletionDateTime) AS LatestCompletionTime
FROM athena_queries_hudi
GROUP BY Database;

Detailed Analysis of Athena Query Performance Metrics

In [62]:
%%sql

SELECT
    Database,
    COUNT(DISTINCT Query) AS UniqueQueries,
    COUNT(DISTINCT OutputLocation) AS UniqueOutputLocations,
    COUNT(DISTINCT QueryExecutionId) AS UniqueQueryExecutions,
    SUM(DataScannedInBytes) AS TotalDataScannedBytes,
    AVG(TotalExecutionTimeInMillis) AS AvgExecutionTimeMillis,
    SUM(CASE WHEN State = 'SUCCEEDED' THEN 1 ELSE 0 END) AS SuccessfulQueries,
    SUM(CASE WHEN State = 'FAILED' THEN 1 ELSE 0 END) AS FailedQueries,
    MIN(CompletionDateTime) AS EarliestCompletionTime,
    MAX(CompletionDateTime) AS LatestCompletionTime,
    COUNT(DISTINCT StatementType) AS UniqueStatementTypes,
    SUM(EngineExecutionTimeInMillis) AS TotalEngineExecutionTime,
    AVG(ServiceProcessingTimeInMillis) AS AvgServiceProcessingTime
FROM athena_queries_hudi
GROUP BY Database;

Athena query by time

In [63]:
%%sql 
SELECT
    DATE_FORMAT(CompletionDateTime, 'yyyy-MM-dd') AS QueryDate,
    COUNT(*) AS TotalQueries,
    SUM(DataScannedInBytes) AS TotalDataScannedBytes
FROM athena_queries_hudi
GROUP BY DATE_FORMAT(CompletionDateTime, 'yyyy-MM-dd')
ORDER BY QueryDate;

Feel free to enhance your insights with additional queries and develop dashboards as required

No comments:

Post a Comment

Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST endpoint

gluecat Learn How to Connect to the Glue Data Catalog using AWS Glue Iceberg REST e...