Skip to main content

From Monolith to Modular: Rebuilding a Billing Data Pipeline From Scratch

Earlier this year I shipped a pipeline rewrite I’m genuinely proud of. It replaced a 2,200-line SQL monolith — one of those files that everyone’s afraid to touch — with a clean layered architecture that handles 14 products, runs daily, and can be extended by adding a handful of config files.

This post is about how it’s built, why the design works, and the trade-offs I made along the way.

A note on the execution layer: the pipelines in this post run on Jetflow, a configuration-driven data ingestion framework built at Cloudflare that ingests 141 billion rows per day. Jetflow is going open source — you’ll see why it matters to this design as we go.

The Problem
#

The existing pipeline was a single large SQL file. It worked — mostly. But it had accumulated years of product-specific logic, hardcoded dates, and patches on patches. Adding support for a new product meant understanding the whole file before you could safely modify any of it. When something broke, there was no way to rerun just one product; you reran everything or nothing.

The pipeline’s job is to take raw daily usage events and produce monthly billing-ready numbers: one row per customer, per product, per billing metric, for the current month. This feeds directly into the data that determines what customers owe.

At its core, the logic isn’t that complex. The complexity had accreted around it.

The Design
#

The rewrite has five conceptual layers:

Layer 0: Product Metadata Registry
Layer 1: Daily Unpivot Views (wide → long format)
Layer 2: Monthly Aggregation (via a shared table function)
Layer 3: Cross-product Union View
Layer 4: Consumer Views (billing, cap tracking, etc.)

Let me walk through each one.


Layer 0: The Product Metadata Registry
#

The most important decision in the whole design was centralizing business logic into a single metadata view.

Every product has:

  • A raw metric name in the source table
  • A human-readable display name
  • A billing product identifier (for cap matching)
  • A unit of measure (TB, 1M requests, GB-month, etc.)
  • Whether billing rolls up at the zone level or the account level
  • A monthly aggregation rule — more on this below

All of this lives in one place: a BigQuery view built from an UNNEST of a typed array literal. No separate table. No migration scripts. If you need to change the aggregation rule for one product, you change one row in one file.

The aggregation rules are where the real business logic lives:

Rule Meaning Example product
SUM Add up all daily values API requests, operations
LAST_DAY Take the value from the last day of the month Hostnames, seats (headcount-style metrics)
SUM_AVG_30 Sum, then divide by 30 Storage billed by TB-month
SUM_AVG_720 Sum, then divide by 720 Object storage billed by byte-hours
MAX Highest value seen in the month Seat-count maximums

These rules aren’t arbitrary — they reflect how different product contracts actually work. Object storage isn’t billed by how much you stored today; it’s billed by the average you stored over the month. Encoding this in a metadata registry means the aggregation logic is DRY across all products.


Layer 1: Daily Unpivot Views
#

The upstream daily usage tables are in wide format — one column per metric. A CDN usage table might have 26 columns: total requests, data transfer, and then both of those broken out across 12 geographic regions.

The obvious fix would be to reshape those source tables. We didn’t do that — and not by choice. Those tables had too many existing consumers (dashboards, other pipelines, downstream models) that depended on their exact schema. Changing the shape of a wide table in a shared data lake is the kind of thing that breaks things quietly and at a distance. So the source tables were off-limits.

Instead, we added a layer of read-only views on top of them. Each product gets an UNPIVOT view that melts its wide daily table into a long format with two columns: product_metric (the column name as a string) and usage_value (the numeric value). The source tables are untouched; everything downstream of the views gets a consistent schema.

SELECT
  product_name,
  event_date,
  account_id,
  zone_id,
  CAST(requests AS FLOAT64) AS requests,
  CAST(bytes AS FLOAT64) AS bytes,
  -- ... all other metric columns cast to FLOAT64
FROM cdn_usage_daily

UNPIVOT (usage_value FOR product_metric IN (requests, bytes, ...))

A few things worth noting:

All metric columns are explicitly cast to FLOAT64 before the UNPIVOT. Source columns vary in type. Making this explicit prevents type errors downstream and ensures the union across all products later works cleanly.

Bot Management only has one metric, so it uses a plain SELECT with a hardcoded string literal instead of an UNPIVOT clause — because the UNPIVOT overhead isn’t worth it for a single column.

Zone-level vs. account-level products are different. Products like CDN bill per zone (a customer might have 50 zones). Products like object storage or email security bill per account. This distinction is flagged in the metadata registry and shapes the filtering logic.


Layer 2: A Shared Table Function for Aggregation
#

This is the most technically interesting piece.

Instead of writing aggregation SQL for each of 14 products, I wrote a single BigQuery table function — a parameterized, reusable piece of SQL that accepts a table as input and returns an aggregated table as output.

CREATE OR REPLACE TABLE FUNCTION
  monthly_usage_aggregate(vb_daily_table TABLE<...>, event_date_var DATE)
AS (
  WITH mapping_enrichment AS (
    SELECT
      t.*,
      m.monthly_aggregation_rule,
      m.billable_level,
      m.metric_unit,
      -- ...
    FROM vb_daily_table t
    JOIN product_mapping m USING (product_name, product_metric)
    WHERE DATE_TRUNC(event_date, MONTH) = DATE_TRUNC(event_date_var, MONTH)
    HAVING billing_product IS NOT NULL  -- drop non-billable metrics
  ),
  monthly_aggregations AS (
    SELECT
      DATE_TRUNC(event_date, MONTH) AS month_date,
      -- dimensions ...
      CASE monthly_aggregation_rule
        WHEN 'SUM'        THEN SUM(usage_value)
        WHEN 'LAST_DAY'   THEN ARRAY_AGG(usage_value ORDER BY event_date DESC LIMIT 1)[OFFSET(0)]
        WHEN 'SUM_AVG_30' THEN SUM(usage_value) / 30
        WHEN 'SUM_AVG_720'THEN SUM(usage_value) / 720
        WHEN 'MAX'        THEN MAX(usage_value)
      END AS usage_value
    FROM mapping_enrichment
    GROUP BY ALL
  )
  SELECT
    m.*,
    SAFE_DIVIDE(usage_value, multiplier) AS normalized_usage
  FROM monthly_aggregations m
  JOIN uom_multipliers USING (metric_unit)
)

Each product then calls this function with its own unpivot view:

SELECT * FROM monthly_usage_aggregate(
  (SELECT * FROM cdn_usage_daily_unpivot WHERE is_billable_zone),
  '{event_date}'
)

This is a sophisticated use of BigQuery’s table-valued function feature. The aggregation logic is defined once and tested once. Adding a new product doesn’t require touching it at all.

One design detail I’m particularly pleased with: the HAVING billing_product IS NOT NULL clause at the end of mapping_enrichment. Upstream daily tables often contain metrics that are tracked for observability but aren’t billed — intermediate counts, debug signals, things like that. Rather than maintaining an explicit exclusion list, the HAVING clause silently drops anything that doesn’t have a billing product mapping. The metadata registry acts as an allowlist.

Bandwidth deduplication required special handling. CDN pricing is regional — a customer might have a cap specifically for North America traffic. For non-bandwidth metrics, a simple SUM works. For Data Transfer / Bandwidth, there’s a COALESCE hierarchy that resolves which cap level applies (regional → global → null geo → raw sum), preventing double-counting across pricing tiers.


Layer 3: The Union View
#

All 14 monthly aggregate tables are UNION ALL’d into a single view:

SELECT * FROM cdn_usage_monthly
UNION ALL
SELECT * FROM workers_usage_monthly
UNION ALL
-- ... 12 more products
WHERE month_date < CURRENT_DATE()

The month_date < CURRENT_DATE() filter excludes the in-progress current month, which would be partial data. Downstream consumers query this one view and get coverage across all products.


Layer 4: Consumer Views
#

The consumer layer is where the original motivation for this rewrite becomes visible. The view that reports usage-against-cap for the current month — previously 80 lines of Bot Management-only SQL — is now 44 lines that cover all 14 products, because it’s querying the union view and the product metadata registry rather than hardcoded product-specific logic.


The Orchestration
#

Jetflow as the execution layer
#

Each layer (helper views, unpivot views, monthly aggregates) is a Jetflow pipeline: a YAML file that declares a consumer, optional transformers, and one or more loaders. Jetflow handles the streaming execution, Parquet conversion, GCS writes, and BigQuery loads. This is what a monthly aggregate pipeline config looks like:

# cdn.yaml
job_name: cdn_monthly_aggregate
schedule: daily
pipeline:
  - stage: bigquery_storage        # runs the SQL, streams results as Arrow
    batch_size: 64000
    retries: 3
  - stage: parquet_file_transformer
  - stage: gcs_loader              # writes Parquet to object storage, partitioned by month
    partition: month_date
    partition_type: MONTH
    retries: 5
  - stage: bq_loader               # loads from GCS into BigQuery
    blocking: true

The SQL file it references contains just the call to the shared table function. Jetflow handles everything else: parallelism, retries, Arrow-native streaming, memory management. If you’re not familiar with it, the Cloudflare blog post is worth reading — the short version is that it achieves 2–5 million rows per second per database connection by keeping data in columnar Arrow format end-to-end, avoiding the row→column→row conversions that slow down most ELT frameworks.

The dry-run flag
#

One contribution I made to Jetflow itself while building this pipeline: a --dry-run flag. During development, you often want to validate that a query is syntactically correct and will produce the expected schema without actually writing data or burning quota.

The implementation wires through three layers:

  1. CLI flag--dry-run added to ApplicationFlags, threaded into JobConfiguration
  2. BigQuery SDK — sets DryRun: true on the query config before submission, which triggers BigQuery’s built-in query validation mode
  3. Consumer logic — if dry-run mode is active, logs “Query validates successfully.” and returns immediately after BigQuery responds, skipping GCS writes and BQ loads entirely

Dry-run jobs in BigQuery don’t reach Done state, so the consumer branches between job.LastStatus() (immediate, for dry runs) and job.Wait(ctx) (blocking, for real runs). The mock interface was updated to match, so unit tests cover both paths.

In practice: make compose-up dry-run=true validates the entire pipeline in seconds without touching production data.

Three DAGs, one daily sequence
#

Three Airflow DAGs run daily:

  1. Helper views (1 AM) — deploys/refreshes the product metadata registry and unit multiplier lookup via Jetflow’s bq_view_batch loader
  2. Monthly aggregates (2 AM) — first creates or replaces the BigQuery table function via BigQueryInsertJobOperator, then triggers all 14 Jetflow product pipelines
  3. Unpivot views (3 AM) — deploys the 14 per-product wide-to-long views via Jetflow

Each DAG has a dependency check sensor to verify upstream data freshness before running (commented out in the staging branch for ad-hoc testing flexibility).

The monthly aggregates DAG runs all 14 products as a single Jetflow task today. That’s a known limitation — a future version should use per-product Airflow task groups for parallel execution and isolated retries.


The Makefile Shortcut
#

One small ergonomic improvement that turned out to be disproportionately useful: a make upload target that pushes a DAG directly to the staging Airflow environment.

upload:
  @gcloud auth print-identity-token > /dev/null || (echo "Not authenticated. Run gcloud auth login." && exit 1)
  @gsutil cp $(DAG) gs://$(STAGING_BUCKET)/dags/$(notdir $(DAG:.py=))_$(TICKET).py
  @echo "View at: $(AIRFLOW_UI)?dag_id=$(notdir $(DAG:.py=))_$(TICKET)"

Before this, testing a DAG change meant pushing to a branch and waiting for CI/CD. With this, make upload DAG=dags/monthly_aggregates.py TICKET=my-branch gets you into staging Airflow in seconds, with the DAG namespaced by ticket so it doesn’t collide with the production DAG. The target also validates that you have an active gcloud auth session before attempting anything, with a clear error message if not.


Trade-offs and Known Limitations
#

I’m not going to pretend this design is perfect. From the README I wrote for the team:

File proliferation. 14 products × 3 files each = 42 files just for the monthly aggregation layer. That’s manageable now; it might not scale to 40 products. A templating approach would reduce this.

The table function lives in the DAG. The CREATE OR REPLACE TABLE FUNCTION DDL is embedded as a Python string constant in the Airflow DAG rather than in a standalone SQL file. That’s because Jetflow’s pipeline stages don’t natively support DDL execution — so the DAG falls back to Airflow’s BigQueryInsertJobOperator to run it before the Jetflow task. It works, but it’s awkward: the function isn’t version-controlled as SQL, and you need to read Python to find it. I filed a ticket to get DDL support added to Jetflow.

Dynamic UNPIVOT is possible but not implemented. BigQuery’s INFORMATION_SCHEMA.COLUMNS lets you discover metric columns dynamically, which would make the unpivot views auto-updating when new metrics are added upstream. The current approach requires a config change for each new column. I prototyped a dynamic version (a SQL script that generates and executes a CREATE OR REPLACE VIEW via EXECUTE IMMEDIATE) but didn’t ship it in this iteration — the static approach is more readable and debuggable.

No automated validation. Row count checks and data freshness alerts exist at the DAG level but not at the per-product level. A product with zero rows for the month would not trigger an alert today.


What I’d Do Differently
#

Design the metadata registry first, before writing any pipeline code. I ended up retrofitting some of the aggregation rules into the metadata view partway through, which required adjusting the table function. Starting with a fully-specified schema for the registry would have saved a few iterations.

Ship the dynamic UNPIVOT from day one. The static UNPIVOT views are the most maintenance-heavy part of the design. Every time an upstream team adds a new metric column, someone needs to update a YAML file. The dynamic version doesn’t have this problem.

Per-product parallel task groups from the start. Retrofitting Airflow task groups into an existing DAG is messier than designing for them upfront.


The Result
#

The rewrite went from a 2,200-line SQL monolith to:

  • 1 product metadata view (the registry)
  • 1 unit-of-measure lookup view
  • 14 unpivot views (one per product)
  • 14 monthly aggregate pipeline configs
  • 1 aggregation table function (~100 lines)
  • 1 cross-product union view
  • 3 Airflow DAGs

Total lines of SQL in the critical path: under 200. Everything else is configuration.

Adding a new product now means: adding a row to the metadata registry, adding an unpivot view config, and adding a pipeline config. No changes to shared code. No risk of breaking other products.

That’s the goal of any refactor: make the easy thing the right thing.