Optimizing Analytics: Storing Athena Query Metrics in Hudi for Advanced Analysis and Audit using AWS Glue¶
In [2]:
%%configure -f
{
"conf": {
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet": "false",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
"spark.sql.legacy.pathOptionBehavior.enabled": "true",
"spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
}
}
Create Hudi Tables¶
In [17]:
%%sql
CREATE TABLE IF NOT EXISTS athena_queries_hudi (
Catalog STRING,
CompletionDateTime STRING,
DataScannedInBytes STRING,
Database STRING,
EffectiveEngineVersion STRING,
EngineExecutionTimeInMillis STRING,
OutputLocation STRING,
Query STRING,
QueryExecutionId STRING,
QueryPlanningTimeInMillis STRING,
QueryQueueTimeInMillis STRING,
ResultReuseInformation_ReusedPreviousResult STRING,
SelectedEngineVersion STRING,
ServicePreProcessingTimeInMillis STRING,
ServiceProcessingTimeInMillis STRING,
State STRING,
StatementType STRING,
SubmissionDateTime STRING,
TotalExecutionTimeInMillis STRING,
WorkGroup STRING
)
USING Hudi
OPTIONS (
primaryKey 'QueryExecutionId',
precombineField 'QueryExecutionId',
path 's3://soumilshah-dev-1995/silver/athena_queries_hudi'
)
PARTITIONED BY (Database);
Install Library athena-usage-metrics-extractor¶
In [10]:
%%bash
pip install athena-usage-metrics-extractor
Read all Metrics for athena query for given workgroup¶
In [54]:
import sys
from AthenaUsageExtractor import AthenaUsageExtractor
helper = AthenaUsageExtractor(
aws_region='us-east-1',
aws_access_key='<ACCESS KEY GOES HERE>',
aws_secret_key='<SECRET KEY>'
)
dataset = list(helper.get_usage_for_date(date='2024-06-20',
workgroup='soumil'))
spark.createDataFrame(dataset).createOrReplaceTempView("new_data")
spark.sql("select count(*) from new_data").show()
Storing Athena Query Metrics in Hudi¶
In [55]:
%%sql
INSERT INTO athena_queries_hudi
SELECT
Catalog,
CompletionDateTime,
DataScannedInBytes,
Database,
EffectiveEngineVersion,
EngineExecutionTimeInMillis,
OutputLocation,
Query,
QueryExecutionId,
QueryPlanningTimeInMillis,
QueryQueueTimeInMillis,
COALESCE(null, NULL) AS ResultReuseInformation_ReusedPreviousResult, -- Adjust based on actual logic
SelectedEngineVersion,
ServiceProcessingTimeInMillis AS ServicePreProcessingTimeInMillis,
ServiceProcessingTimeInMillis,
State,
StatementType,
SubmissionDateTime,
TotalExecutionTimeInMillis,
WorkGroup
FROM new_data;
Analysis¶
Calculating Total Data Scanned by WorkGroup in Athena Queries¶
In [56]:
%%sql
SELECT WorkGroup, SUM(CAST(DataScannedInBytes AS BIGINT)) AS TotalDataScannedBytes
FROM athena_queries_hudi
GROUP BY WorkGroup;
Count of Queries Executed by WorkGroup in Athena¶
In [57]:
%%sql
SELECT WorkGroup, COUNT(*) AS NumQueriesExecuted
FROM athena_queries_hudi
GROUP BY WorkGroup;
Average Query Execution Time by WorkGroup in Athena¶
In [58]:
%%sql
SELECT WorkGroup, AVG(TotalExecutionTimeInMillis) AS AvgExecutionTimeMillis
FROM athena_queries_hudi
GROUP BY WorkGroup;
Athena Query Performance Analysis by WorkGroup¶
In [59]:
%%sql
SELECT
WorkGroup,
COUNT(*) AS NumQueries,
AVG(TotalExecutionTimeInMillis) AS AvgExecutionTimeMillis,
SUM(DataScannedInBytes) AS TotalDataScannedBytes
FROM athena_queries_hudi
GROUP BY WorkGroup;
Performance Analysis of Athena Queries by WorkGroup¶
In [60]:
%%sql
SELECT
WorkGroup,
COUNT(*) AS NumQueries,
AVG(TotalExecutionTimeInMillis) AS AvgExecutionTimeMillis,
SUM(DataScannedInBytes) AS TotalDataScannedBytes,
MAX(TotalExecutionTimeInMillis) AS MaxExecutionTimeMillis,
MIN(TotalExecutionTimeInMillis) AS MinExecutionTimeMillis
FROM athena_queries_hudi
GROUP BY WorkGroup;
Database and Table Usage Metrics in Athena Queries¶
In [61]:
%%sql
SELECT
Database,
COUNT(DISTINCT Query) AS UniqueQueries,
COUNT(DISTINCT OutputLocation) AS UniqueOutputLocations,
COUNT(DISTINCT QueryExecutionId) AS UniqueQueryExecutions,
SUM(DataScannedInBytes) AS TotalDataScannedBytes,
AVG(TotalExecutionTimeInMillis) AS AvgExecutionTimeMillis,
SUM(CASE WHEN State = 'SUCCEEDED' THEN 1 ELSE 0 END) AS SuccessfulQueries,
SUM(CASE WHEN State = 'FAILED' THEN 1 ELSE 0 END) AS FailedQueries,
MIN(CompletionDateTime) AS EarliestCompletionTime,
MAX(CompletionDateTime) AS LatestCompletionTime
FROM athena_queries_hudi
GROUP BY Database;
Detailed Analysis of Athena Query Performance Metrics¶
In [62]:
%%sql
SELECT
Database,
COUNT(DISTINCT Query) AS UniqueQueries,
COUNT(DISTINCT OutputLocation) AS UniqueOutputLocations,
COUNT(DISTINCT QueryExecutionId) AS UniqueQueryExecutions,
SUM(DataScannedInBytes) AS TotalDataScannedBytes,
AVG(TotalExecutionTimeInMillis) AS AvgExecutionTimeMillis,
SUM(CASE WHEN State = 'SUCCEEDED' THEN 1 ELSE 0 END) AS SuccessfulQueries,
SUM(CASE WHEN State = 'FAILED' THEN 1 ELSE 0 END) AS FailedQueries,
MIN(CompletionDateTime) AS EarliestCompletionTime,
MAX(CompletionDateTime) AS LatestCompletionTime,
COUNT(DISTINCT StatementType) AS UniqueStatementTypes,
SUM(EngineExecutionTimeInMillis) AS TotalEngineExecutionTime,
AVG(ServiceProcessingTimeInMillis) AS AvgServiceProcessingTime
FROM athena_queries_hudi
GROUP BY Database;
Athena query by time¶
In [63]:
%%sql
SELECT
DATE_FORMAT(CompletionDateTime, 'yyyy-MM-dd') AS QueryDate,
COUNT(*) AS TotalQueries,
SUM(DataScannedInBytes) AS TotalDataScannedBytes
FROM athena_queries_hudi
GROUP BY DATE_FORMAT(CompletionDateTime, 'yyyy-MM-dd')
ORDER BY QueryDate;
No comments:
Post a Comment