Append to a retained event log
Record activity events on a stream, keep the log bounded automatically, and answer time-window questions — the last n entries, a tally of the last hour, a count over the last day.
commons digest
---A single thing that happened on a stream. `id` is a caller-supplied dedup key:`Log.append` is the one non-idempotent storage write, so an at-least-once retrycan append the same event twice — carrying an id lets a consumer collapse theduplicate.---type Event = { id: String, kind: String, who: String,}
type KindCount = { kind: String, count: Int,}
---Tally a batch of events by kind. A pure, in-memory query — the *same*`groupBy` vocabulary the agent runs lazily over its storage `Log`, here runeagerly over a `List`.---fn summarise(events: List[Event]) -> List[KindCount] { events.groupBy((e) => e.kind, (k, rows) => KindCount { kind: k, count: rows.count() })}context events
uses digestconsumes bynk { Clock }
---An append-only activity stream, one instance per `stream` key (a CloudflareDurable Object). State is a `Log` — an ordered, time-indexed sequence.`@retain(30.days)` bounds it: every `append` drops entries older than thehorizon, so the log stays small without a separate sweep.
`append` stamps `Clock.now()` — the one non-idempotent storage write — so therecording handler declares `given Clock`. The time-window reads (`since`/`recent`) are clock-free: the caller passes the cutoff instant in, and the loganswers from its own time index.---agent Activity { key stream: String
store history: Log[Event] @retain(30.days)
on call add(event: Event) -> Effect[()] given Clock { let _ <- history.append(event) Effect.pure(()) }
-- the last `n` entries, newest first on call recent(n: Int) -> Effect[List[Event]] { history.recent(n).collect() }
-- every entry at or after a cutoff instant — a clock-free window read on call since(t: Instant) -> Effect[List[Event]] { history.since(t).collect() }
on call countSince(t: Instant) -> Effect[Int] { history.since(t).count() }
-- a windowed tally: collect the window lazily, then shape it with the pure -- `summarise` helper from `commons digest` on call breakdown(t: Instant) -> Effect[List[KindCount]] { let window <- history.since(t).collect() summarise(window) }}
service api from http { on POST("/events") by Visitor (body: Event) -> Effect[HttpResult[Event]] { let _ <- Activity("main").add(body) Created(body) }
on GET("/events/recent") by Visitor () -> Effect[HttpResult[List[Event]]] { let rows <- Activity("main").recent(20) Ok(rows) }
-- "last hour": read the clock, retreat by a literal `Duration` (an Instant -- minus a Duration is an Instant), then run a clock-free window read. on GET("/events/last-hour") by Visitor () -> Effect[HttpResult[List[KindCount]]] given Clock { let now <- Clock.now() let rows <- Activity("main").breakdown(now - 1.hours) Ok(rows) }
on GET("/events/last-day") by Visitor () -> Effect[HttpResult[Int]] given Clock { let now <- Clock.now() let n <- Activity("main").countSince(now - 24.hours) Ok(n) }}This example reaches Workers-only shapes (storage bindings, agents, or cron), so it runs with bynk dev rather than in the browser playground. See Install to get started.
How it works
Section titled “How it works”agent Activity is keyed by a stream string and holds history: Log[Event] @retain(30.days) — a Log is an ordered, time-indexed sequence, and @retain
drops entries past the horizon on every append, so the log stays bounded with no
separate sweep. add is the one non-idempotent write: it stamps Clock.now()
internally, so the handler declares given Clock. The Event carries an id as a
dedup key, defined in commons digest, because an at-least-once retry can append the
same event twice.
The reads are clock-free. recent(n) returns the newest entries; since(t) and
countSince(t) take a cutoff instant from the caller and let the log answer from its
own time index — no handler-side clock needed. breakdown(t) collects a window
lazily, then shapes it with the pure summarise helper from commons digest.
summarise runs groupBy eagerly over a List to tally events by kind into
KindCount rows — the same query vocabulary the agent runs lazily over its storage
Log.
The HTTP service records with POST /events and reads with GET /events/recent
(last 20). The windowed routes read the clock at the boundary and do Instant minus
Duration arithmetic: GET /events/last-hour passes now - 1.hours into
breakdown, and GET /events/last-day passes now - 24.hours into countSince.