Creating Execution Plans for Data Joins

Introduction to the Planner

This vignette demonstrates how to use create_join_plan() to automate data aggregation and merging workflows. The function creates executable data processing plans based on table metadata and user selections.

1. Define Metadata

First, we will create metadata for two tables: customers and transactions using table_info().

# Define customer metadata
customers_meta <- table_info(
  table_name = "customers",
  source_identifier = "customers.csv",
  identifier_columns = "customer_id",
  key_outcome_specs = list(
    list(OutcomeName = "CustomerCount", 
         ValueExpression = 1, 
         AggregationMethods = list(
           list(AggregatedName = "CountByRegion",
                AggregationFunction = "sum",
                GroupingVariables = "region")
         )
    )
  )
)

# Define transaction metadata
transactions_meta <- table_info(
  "transactions", 
  "t.csv", 
  "tx_id",
  key_outcome_specs = list(
    list(OutcomeName = "Revenue", 
         ValueExpression = quote(price * quantity),
         AggregationMethods = list(
           list(AggregatedName = "RevenueByCustomer",
                AggregationFunction = "sum", 
                GroupingVariables = "customer_id")
         )
  ))
)

# Combine metadata
master_metadata <- create_metadata_registry()
master_metadata <- add_table(master_metadata, customers_meta)
#> Added metadata for table: customers
master_metadata <- add_table(master_metadata, transactions_meta)
#> Added metadata for table: transactions

2. Create Join Plan

Now, we create the plan. We will omit the join_map to show that the function can generate it automatically.

user_selections <- list(
  customers = "region",
  transactions = "RevenueByCustomer"
)
plan <- create_join_plan(
  base_table = "customers",
  selections = user_selections,
  metadata_dt = master_metadata
)

print(plan)
#>     step operation           target                                   details
#>    <num>    <char>           <char>                                    <char>
#> 1:     1 AGGREGATE agg_transactions                  Aggregate 'transactions'
#> 2:     2     MERGE    merged_step_2 Merge 'customers' with 'agg_transactions'
#> 3:     3    SELECT       final_data                      Select final columns
#>                                                                                                     code
#>                                                                                                   <char>
#> 1: agg_transactions <- transactions[, .(RevenueByCustomer = sum(price * quantity)), by = .(customer_id)]
#> 2:      merged_step_2 <- merge(x = customers, y = agg_transactions, by = c('customer_id'), all.x = TRUE)
#> 3:                         final_data <- merged_step_2[, .SD, .SDcols = c('region','RevenueByCustomer')]

3. Handling Invalid Requests

A key feature of the planner is its ability to validate user requests. What happens if we ask for an aggregation that cannot logically be joined to our base table?

Let’s ask for RevenueByProduct (grouped by product_id) to be joined to the customers table (keyed by customer_id). This is not a valid join.

# Add product metadata for this example
products_meta <- table_info("products", "p.csv", "product_id", list(list(OutcomeName="x",ValueExpression=1,AggregationMethods=list(list(AggregatedName="y",AggregationFunction="z",GroupingVariables="category")))))
transactions_meta_v2 <- table_info("transactions", "t.csv", "trans_id", list(
  list(OutcomeName="Revenue", ValueExpression=quote(price*qty), AggregationMethods=list(
    # This aggregation is by product_id, not customer_id
    list(AggregatedName="RevenueByProduct", AggregationFunction="sum", GroupingVariables="product_id")
  ))
))
invalid_metadata <- rbindlist(list(customers_meta, products_meta, transactions_meta_v2))

# The invalid request
invalid_selections <- list(
  customers = "customer_id",
  transactions = "RevenueByProduct"
)

Instead of producing a faulty plan or a cryptic error, create_join_plan stops with a clear, informative message.

create_join_plan(
  base_table = "customers",
  selections = invalid_selections,
  metadata_dt = invalid_metadata
)
#> Warning in create_join_plan(base_table = "customers", selections =
#> invalid_selections, : No direct path found from 'transactions' to 'customers'.
#> Skipping this table.'
#>     step operation     target              details
#>    <num>    <char>     <char>               <char>
#> 1:     1    SELECT final_data Select final columns
#>                                                          code
#>                                                        <char>
#> 1: final_data <- customers[, .SD, .SDcols = c('customer_id')]

The reason this is invalid is that the join key of the selected aggregation does not match the join key of the base table.

  1. The base_table is customers, whose primary join key is customer_id.
  2. The selection asks for the RevenueByProduct aggregation from the transactions table.
  3. According to our metadata, the RevenueByProduct aggregation is grouped by (and therefore keyed on) product_id.
  4. The planner function, create_join_plan(), correctly sees that there is no direct path to join a table keyed by product_id to a table keyed on customer_id.

This strict validation ensures that only logical and correct data manipulation plans are generated, preventing common data analysis errors.