Skip to content

Append to a retained event log

Record activity events on a stream, keep the log bounded automatically, and answer time-window questions — the last n entries, a tally of the last hour, a count over the last day.

src/digest.bynk
commons digest
---
A single thing that happened on a stream. `id` is a caller-supplied dedup key:
`Log.append` is the one non-idempotent storage write, so an at-least-once retry
can append the same event twice — carrying an id lets a consumer collapse the
duplicate.
---
type Event = {
id: String,
kind: String,
who: String,
}
type KindCount = {
kind: String,
count: Int,
}
---
Tally a batch of events by kind. A pure, in-memory query — the *same*
`groupBy` vocabulary the agent runs lazily over its storage `Log`, here run
eagerly over a `List`.
---
fn summarise(events: List[Event]) -> List[KindCount] {
events.groupBy((e) => e.kind,
(k, rows) => KindCount { kind: k, count: rows.count() })
}
src/events.bynk
context events
uses digest
consumes bynk { Clock }
---
An append-only activity stream, one instance per `stream` key (a Cloudflare
Durable Object). State is a `Log` — an ordered, time-indexed sequence.
`@retain(30.days)` bounds it: every `append` drops entries older than the
horizon, so the log stays small without a separate sweep.
`append` stamps `Clock.now()` — the one non-idempotent storage write — so the
recording handler declares `given Clock`. The time-window reads (`since`/
`recent`) are clock-free: the caller passes the cutoff instant in, and the log
answers from its own time index.
---
agent Activity {
key stream: String
store history: Log[Event] @retain(30.days)
on call add(event: Event) -> Effect[()] given Clock {
let _ <- history.append(event)
Effect.pure(())
}
-- the last `n` entries, newest first
on call recent(n: Int) -> Effect[List[Event]] {
history.recent(n).collect()
}
-- every entry at or after a cutoff instant — a clock-free window read
on call since(t: Instant) -> Effect[List[Event]] {
history.since(t).collect()
}
on call countSince(t: Instant) -> Effect[Int] {
history.since(t).count()
}
-- a windowed tally: collect the window lazily, then shape it with the pure
-- `summarise` helper from `commons digest`
on call breakdown(t: Instant) -> Effect[List[KindCount]] {
let window <- history.since(t).collect()
summarise(window)
}
}
service api from http {
on POST("/events") by Visitor (body: Event) -> Effect[HttpResult[Event]] {
let _ <- Activity("main").add(body)
Created(body)
}
on GET("/events/recent") by Visitor () -> Effect[HttpResult[List[Event]]] {
let rows <- Activity("main").recent(20)
Ok(rows)
}
-- "last hour": read the clock, retreat by a literal `Duration` (an Instant
-- minus a Duration is an Instant), then run a clock-free window read.
on GET("/events/last-hour") by Visitor () -> Effect[HttpResult[List[KindCount]]] given Clock {
let now <- Clock.now()
let rows <- Activity("main").breakdown(now - 1.hours)
Ok(rows)
}
on GET("/events/last-day") by Visitor () -> Effect[HttpResult[Int]] given Clock {
let now <- Clock.now()
let n <- Activity("main").countSince(now - 24.hours)
Ok(n)
}
}
Open the full project ↗

This example reaches Workers-only shapes (storage bindings, agents, or cron), so it runs with bynk dev rather than in the browser playground. See Install to get started.

agent Activity is keyed by a stream string and holds history: Log[Event] @retain(30.days) — a Log is an ordered, time-indexed sequence, and @retain drops entries past the horizon on every append, so the log stays bounded with no separate sweep. add is the one non-idempotent write: it stamps Clock.now() internally, so the handler declares given Clock. The Event carries an id as a dedup key, defined in commons digest, because an at-least-once retry can append the same event twice.

The reads are clock-free. recent(n) returns the newest entries; since(t) and countSince(t) take a cutoff instant from the caller and let the log answer from its own time index — no handler-side clock needed. breakdown(t) collects a window lazily, then shapes it with the pure summarise helper from commons digest. summarise runs groupBy eagerly over a List to tally events by kind into KindCount rows — the same query vocabulary the agent runs lazily over its storage Log.

The HTTP service records with POST /events and reads with GET /events/recent (last 20). The windowed routes read the clock at the boundary and do Instant minus Duration arithmetic: GET /events/last-hour passes now - 1.hours into breakdown, and GET /events/last-day passes now - 24.hours into countSince.