Skip to main content

Incremental Ripples

A Ripple recomputes its tables each run — but nothing says it must recompute them from scratch. A Pond's working database persists between runs, so a Ripple can read its own previous output, work out what's new, and append. This page is the supported pattern for that today; a first-class incremental construct (Trickle, which moves the watermark bookkeeping into the framework) is planned but not yet built.

The self-read pattern

Three steps: cold-start if the table doesn't exist yet, compute the new slice past a watermark, append atomically.

from duckstring import ripple


@ripple
def events(pond):
pond.read_table("tracker.event") # registers the Source table as the view `event`

exists = pond.con.sql(
"SELECT 1 FROM information_schema.tables WHERE table_name = 'events'"
).fetchall()
if not exists:
# Cold start: take everything.
pond.write_table("events", pond.con.sql("SELECT * FROM event"))
return

new = pond.con.sql("""
SELECT * FROM event
WHERE ingested_at > (SELECT MAX(ingested_at) FROM events)
""")
pond.write_table("events", pond.read_table("events").union(new))

The watermark is yours: any monotonic column works (an ingestion timestamp, a sequence id). With a timestamp that can carry ties, prefer a strictly increasing key, or guard with an anti-join on the row's id instead of > alone.

The demo's transactions and products Inlets use this same shape to grow their tables run over run — an Inlet building on its own previous output is the pattern in its simplest form.

Freshness as the watermark

When no data column fits, the framework provides one: pond.f, the run's freshness. It has a property wall-clock lacks: crash replay and immediate retries re-execute at the same F, so rows stamped with it are identical no matter how many attempts the run took — while an on-change retry is a genuinely new run at a new F, which is exactly the distinction a watermark wants.

Stamping with it gives a Source-to-Sink incremental protocol with no bespoke columns: the Source stamps what it publishes, the Sink takes everything fresher than what it has consumed.

# In the Source — stamp each run's rows:
@ripple
def publish(pond):
pond.write_table("event", pond.con.sql(
f"SELECT *, TIMESTAMP '{pond.f.strftime('%Y-%m-%d %H:%M:%S.%f')}' AS run_f FROM staged"
))

# In the Sink — consume only what's new:
@ripple
def consume(pond):
pond.read_table("tracker.event")
new = pond.con.sql("""
SELECT * FROM event
WHERE run_f > COALESCE((SELECT MAX(run_f) FROM events), TIMESTAMP '1970-01-01')
""")
pond.write_table("events", new if _cold(pond) else pond.read_table("events").union(new))

(_cold is the same information_schema existence check as above — the COALESCE already makes the filter cold-start-safe, so the branch only decides between create and append.)

This is the protocol the planned Trickle construct will formalise; using pond.f now means nothing to unlearn later.

Why this is replay-safe

Two mechanics make the pattern exactly-once per run, even across crashes:

  • write_table is build-then-swap. The new table materialises in full (reading the old events while doing so), then replaces it in one transaction. A run that dies mid-write leaves the previous state untouched.
  • Recovery re-runs only incomplete Ripples. After a crash, the worker's ledger re-runs the Ripple from the start — and because the previous state survived, it recomputes the same append. There is no half-applied increment to double-apply.

control force composes sensibly too: a forced recompute re-reads the unchanged Source, finds nothing past the watermark, and appends nothing.

Multiple Sources

With several inputs, keep one watermark per Source — usually just a MAX(...) per input column as above. If the bookkeeping grows beyond that, store it explicitly in a small state table the Ripple writes alongside its output (pond.write_table("_watermarks", ...)); it persists and replays by exactly the same rules.

Full refresh

Sometimes you want to rebuild from nothing — after a logic fix that changes history, say.

  • Locally, duckstring pond run --fresh ignores the self-puddle seed and starts cold.
  • On a deployed Pond there is no built-in full-refresh verb yet. The operational route: make sure the Pond is idle (duckstring control sleep, and kill if a run is in flight), delete its working database — ponds/{name}/m{major}/registry.duckdb under the Catchment root — then duckstring control force. The next run finds no table and takes the cold-start branch. The published Parquet snapshot stays in place until that run completes, so Sinks keep reading consistent data throughout.

A --full-refresh control verb that does this safely in one step is on the roadmap alongside Trickle.