Metrics Dictionary Creation Workflow
Purpose
Create a comprehensive dictionary of all metrics and dimensions from documented marts. This workflow extracts metric definitions from dbt model code, categorizes metrics vs dimensions, and generates a structured CSV dictionary for business users and analysts.
Prerequisites
- Snowflake access with permissions to query
INFORMATION_SCHEMA - 1Password CLI installed and signed in (see
setup/1password-cli-setup.md) - Python 3.8+ with required packages:
pip install snowflake-connector-python pyyaml - Access to dbt project (for extracting metric definitions from model code)
- Marts documentation (from
marts-documentation-creation.mdworkflow) - Network access to Snowflake instance
Workflow Overview
- Extract all columns from mart tables in Snowflake
- Categorize columns as metrics vs dimensions
- Parse dbt model code for metric definitions
- Filter out system columns and irrelevant entries
- Generate CSV dictionary with definitions
- Assign priority levels (metrics vs dimensions)
Step-by-Step Procedure
1. Query Snowflake for All Columns
Query INFORMATION_SCHEMA.COLUMNS to get all columns from mart tables:
SELECT
TABLE_NAME,
COLUMN_NAME,
DATA_TYPE,
IS_NULLABLE
FROM PROD_MARTS.INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = 'WHOLESALE_CUSTOMERS'
ORDER BY TABLE_NAME, ORDINAL_POSITION;2. Categorize Metrics vs Dimensions
Metrics (typically):
- Numeric measures (revenue, counts, averages, totals)
- Calculated values (ratios, percentages, growth rates)
- Names containing: COUNT, TOTAL, SUM, AVG, REVENUE, SPEND, VALUE, RATE, GROWTH
Dimensions (typically):
- Identifiers (IDs, keys)
- Descriptive attributes (names, categories, segments)
- Dates and timestamps
- Status flags and classifications
System Columns (exclude):
DBT_UPDATED_AT- dbt metadata_POLYTOMIC_*- system columns- Date pivot columns (e.g.,
2025-01-01,2025-01-02) - Generic
VALUEcolumns from pivot tables
3. Extract Definitions from dbt Code
For each metric/dimension, search dbt model files for:
- Column definitions in
SELECTstatements - Comments explaining calculations
- Business logic in transformations
Example dbt code pattern:
-- Number of days from customer creation to current date
CUSTOMER_LIFETIME_DAYS_SINCE_CREATION AS (
DATEDIFF('day', customer_created_date, CURRENT_DATE())
),
-- Total revenue divided by total orders
AVERAGE_ORDER_VALUE AS (
total_revenue / NULLIF(total_orders, 0)
)4. Filter Out Irrelevant Entries
Python filtering example:
import re
from typing import List, Dict
def is_date_column(column_name: str) -> bool:
"""Check if column name matches date pattern (YYYY-MM-DD)."""
date_pattern = r'^\d{4}-\d{2}-\d{2}$'
return bool(re.match(date_pattern, column_name))
def is_system_column(column_name: str) -> bool:
"""Check if column is a system column."""
system_patterns = [
'DBT_UPDATED_AT',
'_POLYTOMIC_',
'VALUE' # Generic value columns from pivots
]
return any(pattern in column_name.upper() for pattern in system_patterns)
def should_include_column(column_name: str) -> bool:
"""Determine if column should be included in dictionary."""
if is_date_column(column_name):
return False
if is_system_column(column_name):
return False
return True5. Parse dbt Model Code for Definitions
Python script to extract definitions:
import os
import re
from pathlib import Path
def extract_metric_definitions(dbt_project_path: str) -> Dict[str, str]:
"""Extract metric definitions from dbt model files."""
definitions = {}
models_path = Path(dbt_project_path) / 'models'
for sql_file in models_path.rglob('*.sql'):
with open(sql_file, 'r') as f:
content = f.read()
# Pattern: column_name AS (calculation) -- comment
pattern = r'(\w+)\s+AS\s*\([^)]+\)\s*--\s*(.+?)(?=\n|$)'
matches = re.findall(pattern, content, re.MULTILINE)
for column_name, comment in matches:
definitions[column_name] = comment.strip()
return definitions6. Generate Dictionary CSV
Create CSV with the following columns:
| Column | Description | Example |
|---|---|---|
Metric | Column name | TOTAL_REVENUE |
Table | Table(s) containing this metric | WHOLESALE_DIM_CUSTOMERS, WHOLESALE_PARTNER_STATUS_DAILY_SNAPSHOT |
Definitions | Business definition from dbt code | Total revenue from all orders for the customer |
Priority | 1 for metrics, 2 for dimensions | 1 |
7. Complete Python Script Example
import snowflake.connector
import csv
import re
from pathlib import Path
from typing import List, Dict, Set
from collections import defaultdict
def get_all_columns(conn, schema: str) -> List[tuple]:
"""Get all columns from mart tables."""
cursor = conn.cursor()
query = f"""
SELECT
TABLE_NAME,
COLUMN_NAME,
DATA_TYPE
FROM PROD_MARTS.INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = '{schema}'
ORDER BY TABLE_NAME, ORDINAL_POSITION
"""
cursor.execute(query)
return cursor.fetchall()
def is_metric(column_name: str, data_type: str) -> bool:
"""Determine if column is a metric."""
metric_keywords = [
'COUNT', 'TOTAL', 'SUM', 'AVG', 'AVERAGE', 'REVENUE',
'SPEND', 'VALUE', 'RATE', 'GROWTH', 'DAYS', 'PERCENT'
]
# Numeric types are more likely to be metrics
if data_type in ['NUMBER', 'FLOAT', 'DOUBLE']:
if any(keyword in column_name.upper() for keyword in metric_keywords):
return True
return False
def is_dimension(column_name: str, data_type: str) -> bool:
"""Determine if column is a dimension."""
dimension_keywords = ['ID', 'NAME', 'EMAIL', 'SEGMENT', 'STATUS', 'DATE']
if any(keyword in column_name.upper() for keyword in dimension_keywords):
return True
if data_type in ['VARCHAR', 'TEXT', 'STRING']:
return True
return False
def should_exclude(column_name: str) -> bool:
"""Check if column should be excluded."""
exclude_patterns = [
r'^\d{4}-\d{2}-\d{2}$', # Date columns (YYYY-MM-DD)
'DBT_UPDATED_AT',
'_POLYTOMIC_',
'VALUE' # Generic value columns
]
column_upper = column_name.upper()
for pattern in exclude_patterns:
if re.match(pattern, column_name) or pattern in column_upper:
return True
return False
def extract_dbt_definitions(dbt_project_path: str) -> Dict[str, str]:
"""Extract definitions from dbt model code."""
definitions = {}
# This is a simplified version - enhance based on actual dbt structure
models_path = Path(dbt_project_path) / 'models'
if models_path.exists():
for sql_file in models_path.rglob('*.sql'):
with open(sql_file, 'r') as f:
content = f.read()
# Look for comments above column definitions
lines = content.split('\n')
for i, line in enumerate(lines):
if '--' in line and i + 1 < len(lines):
comment = line.split('--')[1].strip()
next_line = lines[i + 1]
# Extract column name from next line
match = re.search(r'(\w+)\s+AS', next_line)
if match:
column_name = match.group(1)
definitions[column_name] = comment
return definitions
# Main workflow
schema = 'WHOLESALE_CUSTOMERS'
conn = get_snowflake_connection(...)
columns = get_all_columns(conn, schema)
dbt_definitions = extract_dbt_definitions('dbt_project')
# Group columns by name (may appear in multiple tables)
column_to_tables = defaultdict(set)
column_info = {}
for table_name, column_name, data_type in columns:
if should_exclude(column_name):
continue
column_to_tables[column_name].add(table_name)
if column_name not in column_info:
column_info[column_name] = {
'data_type': data_type,
'is_metric': is_metric(column_name, data_type),
'is_dimension': is_dimension(column_name, data_type)
}
# Generate CSV
with open('metrics_dictionary.csv', 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=['Metric', 'Table', 'Definitions', 'Priority'])
writer.writeheader()
for column_name, tables in sorted(column_info.items()):
info = column_info[column_name]
# Determine priority: 1 for metrics, 2 for dimensions
if info['is_metric']:
priority = 1
elif info['is_dimension']:
priority = 2
else:
priority = 2 # Default to dimension
# Get definition from dbt code or use default
definition = dbt_definitions.get(column_name, f'{column_name} from {schema} marts')
writer.writerow({
'Metric': column_name,
'Table': ', '.join(sorted(tables)),
'Definitions': definition,
'Priority': priority
})8. Review and Enhance
- Review definitions - Ensure they’re accurate and complete
- Validate categorization - Verify metrics vs dimensions classification
- Add missing definitions - Fill in any gaps from dbt code review
- Check for duplicates - Ensure each metric appears once
- Validate table mappings - Confirm metrics appear in correct tables
Output Format
The CSV should be:
- Comprehensive: All metrics and dimensions included
- Accurate: Definitions match dbt code
- Filtered: System columns and date pivots excluded
- Prioritized: Metrics (1) vs dimensions (2) clearly marked
- Table-mapped: Shows which tables contain each metric
Best Practices
- Parse dbt code systematically - Don’t rely on manual extraction
- Filter aggressively - Remove system columns and date pivots
- Categorize carefully - Metrics vs dimensions affects usage
- Include table context - Shows where metrics are available
- Validate definitions - Cross-reference with dbt model code
- Keep dictionary current - Update when marts change
Common Patterns
Metrics
- Counts:
TOTAL_ORDERS,SHOPIFY_ORDERS_COUNT - Sums:
TOTAL_REVENUE,GS_TOTAL_SPEND_2024 - Averages:
AVERAGE_ORDER_VALUE,AVG_REVENUE_PER_PARTNER - Rates:
GS_GROWTH_2023_TO_2024 - Durations:
DAYS_SINCE_LAST_ORDER,CUSTOMER_LIFETIME_DAYS
Dimensions
- Identifiers:
CUSTOMER_ID,EMAIL - Attributes:
SEGMENT,COMPANY_NAME,CUSTOMER_STATE - Dates:
FIRST_ORDER_DATE,LAST_ORDER_DATE - Status:
PARTNER_STATUS,VERIFIED_EMAIL
Troubleshooting
“Too many date columns”
- Filter out date patterns:
r'^\d{4}-\d{2}-\d{2}$' - Exclude pivot table date columns
“Missing definitions”
- Review dbt model files for comments
- Check intermediate models (
int_*) for calculations - Add manual definitions for complex metrics
“Incorrect categorization”
- Review data types (numeric = more likely metric)
- Check column names for keywords
- Manually review ambiguous cases
“Metrics appear in wrong tables”
- Verify table names match Snowflake schema
- Check that column names are consistent across tables
- Review dbt model dependencies
Related Documentation
- Marts Documentation Creation: See
marts-documentation-creation.md - Snowflake Table Profiling: See
snowflake-table-profiling.md - 1Password CLI Setup: See
setup/1password-cli-setup.md