Domain-Specific Language (DSL)
- Mohammed Jassim Jasmin

- Jul 8
- 9 min read
Declarative DSL for Real Estate Data Transformation
This document outlines a declarative Domain-Specific Language (DSL) for real estate data transformation. The DSL is designed to be highly readable, flexible, and capable of integrating with various data sources, including CSV, JSON, SQL databases (MySQL/PostgreSQL), and MongoDB. The output can be structured into JSON, XML, or database-ready payloads.
1. DSL Schema (JSON Examples)
The core of the DSL is a pipeline which consists of an array of steps. Each step performs a specific data operation.
JSON
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Real Estate Data Transformation DSL",
"description": "A declarative DSL for transforming real estate data from various sources.",
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Optional name for the transformation pipeline."
},
"description": {
"type": "string",
"description": "Optional description of the pipeline's purpose."
},
"input": {
"type": "object",
"description": "Defines global input parameters for the pipeline.",
"additionalProperties": true
},
"steps": {
"type": "array",
"description": "An ordered list of transformation steps.",
"items": {
"type": "object",
"oneOf": [
{ "$ref": "#/definitions/csv_input" },
{ "$ref": "#/definitions/json_input" },
{ "$ref": "#/definitions/sql_query" },
{ "$ref": "#/definitions/mongo_pipeline" },
{ "$ref": "#/definitions/transform" },
{ "$ref": "#/definitions/filter" },
{ "$ref": "#/definitions/group_by" },
{ "$ref": "#/definitions/window_functions" },
{ "$ref": "#/definitions/sql_case" },
{ "$ref": "#/definitions/mongo_lookup" },
{ "$ref": "#/definitions/array_operations" },
{ "$ref": "#/definitions/geocode" },
{ "$ref": "#/definitions/calculate_mortgage" },
{ "$ref": "#/definitions/join" },
{ "$ref": "#/definitions/output" }
]
}
}
},
"required": ["steps"],
"definitions": {
"data_source_config": {
"type": "object",
"properties": {
"connection": {
"type": "string",
"description": "Named connection string (e.g., 'postgres_prod', 'mongodb_dev')."
}
},
"required": ["connection"]
},
"csv_input": {
"type": "object",
"properties": {
"csv_input": {
"type": "object",
"properties": {
"path": { "type": "string", "description": "Path to the CSV file." },
"delimiter": { "type": "string", "default": ",", "description": "CSV delimiter." },
"header": { "type": "boolean", "default": true, "description": "Does the CSV have a header row?" }
},
"required": ["path"]
}
},
"required": ["csv_input"]
},
"json_input": {
"type": "object",
"properties": {
"json_input": {
"type": "object",
"properties": {
"path": { "type": "string", "description": "Path to the JSON file." },
"json_path": { "type": "string", "description": "Optional JSONPath expression to select data." }
},
"required": ["path"]
}
},
"required": ["json_input"]
},
"sql_query": {
"type": "object",
"properties": {
"sql_query": {
"type": "object",
"properties": {
"query": { "type": "string", "description": "Direct SQL query string." },
"parameters": {
"type": "object",
"description": "Key-value pairs for parameterized queries (e.g., `{{input.city}}`)."
},
"on_error": {
"type": "object",
"properties": {
"sql": { "type": "string", "description": "Fallback SQL query on error." }
}
},
"connection": { "type": "string", "description": "Named SQL database connection." }
},
"required": ["query", "connection"]
}
},
"required": ["sql_query"]
},
"mongo_pipeline": {
"type": "object",
"properties": {
"mongo_pipeline": {
"type": "object",
"properties": {
"collection": { "type": "string", "description": "MongoDB collection name." },
"pipeline": {
"type": "array",
"description": "MongoDB aggregation pipeline stages (JSON array)."
},
"on_error": {
"type": "object",
"properties": {
"mongo": { "type": "array", "description": "Fallback MongoDB pipeline on error." }
}
},
"connection": { "type": "string", "description": "Named MongoDB connection." }
},
"required": ["collection", "pipeline", "connection"]
}
},
"required": ["mongo_pipeline"]
},
"transform": {
"type": "object",
"properties": {
"transform": {
"type": "object",
"description": "Defines new fields or modifies existing ones using expressions.",
"additionalProperties": {
"type": "string",
"description": "Expression using `{{field_name}}` syntax for interpolation."
}
}
},
"required": ["transform"]
},
"filter": {
"type": "object",
"properties": {
"filter": {
"type": "string",
"description": "A boolean expression to filter records (e.g., `price > 100000 and city == 'New York'`)."
}
},
"required": ["filter"]
},
"group_by": {
"type": "object",
"properties": {
"group_by": {
"type": "object",
"properties": {
"keys": {
"type": "array",
"items": { "type": "string" },
"description": "Fields to group by."
},
"aggregations": {
"type": "object",
"description": "Key-value pairs for aggregation operations (e.g., `avg_price: \"avg(price)\"`).",
"additionalProperties": {
"type": "string",
"enum": ["avg", "sum", "count", "min", "max"],
"description": "Aggregation function."
}
}
},
"required": ["keys", "aggregations"]
}
},
"required": ["group_by"]
},
"window_functions": {
"type": "object",
"properties": {
"window_functions": {
"type": "object",
"properties": {
"over": {
"type": "array",
"items": { "type": "string" },
"description": "Partition by fields."
},
"order_by": {
"type": "array",
"items": {
"type": "object",
"properties": {
"field": { "type": "string" },
"direction": { "type": "string", "enum": ["asc", "desc"] }
},
"required": ["field", "direction"]
},
"description": "Order by fields."
},
"functions": {
"type": "object",
"description": "Window functions to apply (e.g., `rank: \"rank() over (...) \"`).",
"additionalProperties": {
"type": "string",
"description": "Expression for the window function (e.g., `rank()`, `row_number()`, `avg(price)`)."
}
}
},
"required": ["over", "order_by", "functions"]
}
},
"required": ["window_functions"]
},
"sql_case": {
"type": "object",
"properties": {
"sql_case": {
"type": "object",
"properties": {
"field": { "type": "string", "description": "The new field to create or update." },
"cases": {
"type": "array",
"items": {
"type": "object",
"properties": {
"when": { "type": "string", "description": "Boolean condition for the case." },
"then": { "type": "string", "description": "Value or expression if condition is true." }
},
"required": ["when", "then"]
}
},
"else": { "type": "string", "description": "Default value or expression if no case matches." }
},
"required": ["field", "cases"]
}
},
"required": ["sql_case"]
},
"mongo_lookup": {
"type": "object",
"properties": {
"mongo_lookup": {
"type": "object",
"properties": {
"from": { "type": "string", "description": "The target collection to join with." },
"local_field": { "type": "string", "description": "Field from the input documents." },
"foreign_field": { "type": "string", "description": "Field from the documents of the `from` collection." },
"as": { "type": "string", "description": "The name of the new array field to add to the input documents." },
"connection": { "type": "string", "description": "Named MongoDB connection for the `from` collection." }
},
"required": ["from", "local_field", "foreign_field", "as", "connection"]
}
},
"required": ["mongo_lookup"]
},
"array_operations": {
"type": "object",
"properties": {
"array_operations": {
"type": "object",
"properties": {
"unwind": { "type": "string", "description": "Field name of the array to unwind (e.g., 'amenities')." },
"filter_array": {
"type": "object",
"properties": {
"field": { "type": "string", "description": "Array field to filter." },
"condition": { "type": "string", "description": "Boolean expression for filtering array elements." }
},
"required": ["field", "condition"]
}
}
}
},
"required": ["array_operations"]
},
"geocode": {
"type": "object",
"properties": {
"geocode": {
"type": "object",
"properties": {
"address_field": { "type": "string", "description": "Field containing the address to geocode." },
"output_latitude_field": { "type": "string", "description": "Field for storing latitude." },
"output_longitude_field": { "type": "string", "description": "Field for storing longitude." },
"method": {
"type": "string",
"enum": ["api", "sql_spatial", "mongo_spatial"],
"description": "Geocoding method."
},
"api_config": {
"type": "object",
"description": "Configuration for external geocoding API (e.g., API key, endpoint)."
},
"connection": { "type": "string", "description": "Database connection for spatial queries." }
},
"required": ["address_field", "output_latitude_field", "output_longitude_field", "method"]
}
},
"required": ["geocode"]
},
"calculate_mortgage": {
"type": "object",
"properties": {
"calculate_mortgage": {
"type": "object",
"properties": {
"price_field": { "type": "string", "description": "Field containing the property price." },
"down_payment_percent": { "type": "number", "description": "Down payment percentage (e.g., 0.20 for 20%)." },
"interest_rate": { "type": "number", "description": "Annual interest rate (e.g., 0.04 for 4%)." },
"loan_term_years": { "type": "integer", "description": "Loan term in years." },
"output_monthly_payment_field": { "type": "string", "description": "Field for monthly payment." },
"method": {
"type": "string",
"enum": ["internal", "api"],
"description": "Calculation method: internal formula or external API."
},
"api_config": {
"type": "object",
"description": "Configuration for external mortgage API."
}
},
"required": ["price_field", "down_payment_percent", "interest_rate", "loan_term_years", "output_monthly_payment_field"]
}
},
"required": ["calculate_mortgage"]
},
"join": {
"type": "object",
"properties": {
"join": {
"type": "object",
"properties": {
"left_source": { "type": "string", "description": "Identifier for the left data source (from a previous step)." },
"right_source": { "type": "string", "description": "Identifier for the right data source (from a previous step)." },
"on": { "type": "string", "description": "Join condition (e.g., 'left.id = right.listing_id')." },
"type": { "type": "string", "enum": ["inner", "left", "right", "full"], "default": "inner" }
},
"required": ["left_source", "right_source", "on"]
}
},
"required": ["join"]
},
"output": {
"type": "object",
"properties": {
"output": {
"type": "object",
"properties": {
"format": { "type": "string", "enum": ["json", "xml", "database", "csv"], "description": "Output format." },
"path": { "type": "string", "description": "File path for JSON/XML/CSV output." },
"table": { "type": "string", "description": "Table name for database output." },
"connection": { "type": "string", "description": "Named database connection for database output." },
"mode": { "type": "string", "enum": ["insert", "upsert", "replace"], "default": "insert", "description": "Database write mode." }
},
"required": ["format"]
}
},
"required": ["output"]
}
}
}
2. Example Use Cases
These examples demonstrate how the DSL can be used for various real estate data transformation scenarios.
Example 1: SQL-to-JSON Transformation with Calculations
This example transforms a PostgreSQL query result into a market report in JSON format, adding calculated fields.
JSON
{
"name": "PostgreSQLMarketReport",
"description": "Generates a market report from PostgreSQL listings data.",
"steps": [
{
"sql_query": {
"query": "SELECT id, price, sqft, city, listed_date FROM listings WHERE listed_date > '2024-01-01'",
"connection": "postgres_listings"
}
},
{
"transform": {
"price_per_sqft": "{{ price / sqft }}",
"is_luxury": "{{ price > 1000000 ? true : false }}",
"listing_age_days": "DATEDIFF('{{ CURRENT_DATE }}', '{{ listed_date }}')"
}
},
{
"output": {
"format": "json",
"path": "/data/market_report_2024.json"
}
}
]
}
Example 2: MongoDB Aggregation and Formatting
This pipeline processes MongoDB property listings, filters for properties with a pool, projects specific fields, and formats the address.
JSON
{
"name": "MongoDBPoolProperties",
"description": "Extracts and formats properties with a 'pool' amenity from MongoDB.",
"steps": [
{
"mongo_pipeline": {
"collection": "properties",
"pipeline": [
{ "$match": { "amenities": "pool" } },
{ "$project": { "_id": 0, "price": 1, "address": 1, "bedrooms": 1, "bathrooms": 1 } }
],
"connection": "mongodb_properties"
}
},
{
"transform": {
"formatted_address": "{{ address.street }}, {{ address.city }}, {{ address.state }} {{ address.zip_code }}",
"has_pool": "true"
}
},
{
"output": {
"format": "json",
"path": "/data/properties_with_pools.json"
}
}
]
}
Example 3: Cross-Database Join (SQL + MongoDB)
This example demonstrates merging SQL listings data with MongoDB agent data.
JSON
{
"name": "HybridListingAgentData",
"description": "Combines listing data from SQL with agent details from MongoDB.",
"steps": [
{
"sql_query": {
"query": "SELECT id, price, agent_id, city FROM listings",
"connection": "postgres_listings",
"output_alias": "sql_listings"
}
},
{
"mongo_lookup": {
"from": "agents",
"local_field": "agent_id",
"foreign_field": "_id",
"as": "agent_info",
"connection": "mongodb_agents"
}
},
{
"transform": {
"agent_name": "{{ agent_info[0].name }}",
"agent_contact": "{{ agent_info[0].email }}"
}
},
{
"filter": "agent_name != null"
},
{
"output": {
"format": "json",
"path": "/data/listings_with_agent_info.json"
}
}
]
}
Example 4: Grouping and Aggregation with SQL-Like Case
This example groups listings by city and calculates average price, then categorizes cities based on average price.
JSON
{
"name": "CityMarketAnalysis",
"description": "Analyzes real estate market trends by city.",
"steps": [
{
"sql_query": {
"query": "SELECT city, price FROM listings",
"connection": "postgres_listings"
}
},
{
"group_by": {
"keys": ["city"],
"aggregations": {
"avg_price_per_city": "avg(price)",
"total_listings": "count(id)"
}
}
},
{
"sql_case": {
"field": "market_category",
"cases": [
{ "when": "avg_price_per_city > 750000", "then": "'High-End'" },
{ "when": "avg_price_per_city > 300000", "then": "'Mid-Range'" }
],
"else": "'Affordable'"
}
},
{
"output": {
"format": "json",
"path": "/data/city_market_categories.json"
}
}
]
}
Example 5: Geocoding and Mortgage Calculation
This pipeline geocodes addresses and calculates estimated monthly mortgage payments.
JSON
{
"name": "PropertyFinancialAnalysis",
"description": "Geocodes properties and calculates estimated mortgage payments.",
"steps": [
{
"csv_input": {
"path": "/data/new_listings.csv"
}
},
{
"geocode": {
"address_field": "full_address",
"output_latitude_field": "latitude",
"output_longitude_field": "longitude",
"method": "api",
"api_config": {
"endpoint": "https://api.geocod.io/v1/geocode",
"api_key": "YOUR_GEOCODING_API_KEY"
}
}
},
{
"calculate_mortgage": {
"price_field": "listing_price",
"down_payment_percent": 0.20,
"interest_rate": 0.065,
"loan_term_years": 30,
"output_monthly_payment_field": "estimated_monthly_payment",
"method": "internal"
}
},
{
"output": {
"format": "database",
"connection": "postgres_analytics",
"table": "property_financials",
"mode": "upsert"
}
}
]
}
3. Type Mapping
The DSL will handle type coercion where necessary, mapping database-specific types to common DSL types.
SQL (PostgreSQL/MySQL) | MongoDB | DSL Type | Notes |
INT, BIGINT | Int32, Int64 | integer | |
DECIMAL, NUMERIC, REAL, DOUBLE PRECISION | Double | float | Handles precision; potential loss for very high precision decimals if not explicitly handled. |
VARCHAR, TEXT | String | string | |
BOOLEAN | Boolean | boolean | |
DATE, TIMESTAMP, DATETIME | Date | datetime | Internal representation will likely be ISO 8601 strings or Unix timestamps for consistency. |
JSONB (PostgreSQL) | Object | object | Stored as nested DSL objects/maps. |
ARRAY | Array | array | |
UUID | BinData | string | Typically represented as strings. |
4. Required Drivers/Libraries (Examples for Python)
To implement an interpreter for this DSL, the following types of libraries would be required, depending on the target language/runtime:
Python:
SQL Databases: psycopg2 (PostgreSQL), mysql-connector-python (MySQL), or a more generic ORM like SQLAlchemy for unified database interaction.
MongoDB: PyMongo
CSV Parsing: Built-in csv module or pandas for more robust data frame operations.
JSON Parsing: Built-in json module.
XML Generation: Built-in xml.etree.ElementTree or lxml.
Expression Evaluation: A safe expression evaluator library (e.g., ast module for Python expressions, or a custom parser for the DSL's interpolation syntax {{...}}).
API Client: requests for geocode and calculate_mortgage if external APIs are used.
Date/Time Handling: datetime module.
Node.js:
SQL Databases: pg (PostgreSQL), mysql2 (MySQL), or Knex.js for query building.
MongoDB: mongoose or mongodb driver.
CSV Parsing: csv-parser.
JSON Parsing: Built-in JSON object.
XML Generation: xmlbuilder.
Expression Evaluation: Custom or a lightweight expression library.
API Client: axios or node-fetch.
5. Performance Considerations
The DSL is designed with performance in mind by promoting "push-down" operations.
Push-down Operations: Whenever possible, filter, group_by, window_functions, and certain transformoperations will be translated into native SQL WHERE clauses, GROUP BY statements, WINDOWfunctions, or MongoDB $match, $group, and other aggregation pipeline stages. This minimizes data transfer and leverages the optimized performance of the underlying database engines.
Batch Processing: For large datasets, the DSL interpreter will implement batch processing mechanisms when reading from CSV/JSON files or fetching results from databases, preventing out-of-memory errors and improving throughput.
Lazy Evaluation: Data will ideally be processed in a streaming or lazy manner, especially for file-based inputs, to avoid loading entire datasets into memory unless explicitly required by an operation (e.g., a global group_by or join).
Connection Pooling: Database connections will be managed using connection pooling to reduce overhead.
By adhering to this declarative structure, the DSL interpreter can optimize the execution plan, choose the most efficient operations, and ensure portability across different environments without requiring users to write imperative code for each data source or transformation step.





Comments