Skip to content

Commit 0609ec7

Browse files
committed
restrict ready to valid. change Stream to use subtype. expository stream examples
1 parent b9605e2 commit 0609ec7

File tree

3 files changed

+293
-107
lines changed

3 files changed

+293
-107
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,5 @@ duckdb_cli-*
3131
duckdb/
3232
tmp-*.duckdb
3333
data/
34+
35+
*.svg

Etch/StreamFusion/Sequence.lean

Lines changed: 187 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,209 @@
1-
namespace SeqStream
2-
inductive Step (σ α : Type) where
1+
/-
2+
this file contains several expository stream definition
3+
with code samples for the paper
4+
-/
5+
import Mathlib.Tactic
6+
import Mathlib.Data.Pi.Algebra
7+
8+
namespace one
9+
10+
inductive Step (σ α : Type)
311
| done
4-
| ready : σ → α → Step σ α
5-
| skip : σ → Step σ α
12+
| skip (state : σ)
13+
| emit (state : σ) (value : α)
614

715
structure Stream (α : Type) where
816
σ : Type
9-
next : (x : σ) → Step σ α
1017
q : σ
18+
next : σ → Step σ α
1119

1220
namespace Stream
1321

22+
open Step
23+
def map (f : α → β) (s : Stream α) : Stream β where
24+
q := s.q
25+
next state := match s.next state with
26+
| done => done
27+
| skip state => skip state
28+
| emit state value => emit state (f value)
29+
30+
partial def eval : Stream α → List α := fun {q, next} =>
31+
match next q with
32+
| done => []
33+
| skip q => eval {q, next}
34+
| emit q value => value :: eval {q, next}
35+
36+
partial def eval' : Stream α → Array α :=
37+
let rec go (acc : Array α) (s : Stream α) : Array α :=
38+
let {q, next} := s
39+
match next q with
40+
| done => acc
41+
| skip state => go acc {q := state, next}
42+
| emit state value => go (acc.push value) {q := state, next}
43+
go #[]
44+
1445
@[inline] partial def fold (f : β → α → β) (s : Stream α) (q : s.σ) (acc : β) : β :=
1546
let rec @[specialize] go f (next : (x : s.σ) → Step s.σ α) (acc : β) q :=
1647
match next q with
1748
| .done => acc
1849
| .skip s => go f next acc s
19-
| .ready s v => go f next (f acc v) s
50+
| .emit s v => go f next (f acc v) s
2051
go f s.next acc q
2152
end Stream
2253

54+
end one
55+
56+
variable {ι : Type} [DecidableEq ι]
57+
58+
def zero : ι → Option α := fun _ => none
59+
60+
def singleton (i : ι) (v : α) : ι → Option α :=
61+
fun x => if x = i then some v else none
62+
63+
namespace two
64+
inductive Step (σ ι α : Type) where
65+
| done
66+
| skip (state : σ)
67+
| emit (state : σ) (index : ι) (value : α)
68+
open Step
69+
70+
structure Stream (ι : Type) (α : Type) where
71+
σ : Type
72+
q : σ
73+
next : σ → Step σ ι α
74+
75+
def ofArray (arr : Array α) : Stream Nat α where
76+
q := 0
77+
next q := if h : q < arr.size then .emit (q+1) q arr[q] else .done
78+
79+
instance [Add α] : Add (Option α) where
80+
add a b := match a, b with
81+
| none, b => b
82+
| a, none => a
83+
| some a, some b => some (a + b)
84+
85+
variable [Ord ι] [Mul α]
86+
87+
def mul [Mul α] (a b : Stream ι α) : Stream ι α where
88+
q := (a.q, b.q)
89+
next q := match a.next q.fst, b.next q.snd with
90+
| done, _ => done
91+
| _, done => done
92+
| skip q₁, _ => skip (q₁, q.snd)
93+
| _, skip q₂ => skip (q.fst, q₂)
94+
| emit q₁ i₁ v₁,
95+
emit q₂ i₂ v₂ =>
96+
match compare i₁ i₂ with
97+
| .lt => skip (q₁, q.snd)
98+
| .gt => skip (q.fst, q₂)
99+
| .eq => emit (q₁, q₂) i₁ (v₁ * v₂)
100+
101+
instance : Mul (Stream ι α) := ⟨mul⟩
102+
variable [Zero α] [Add α]
103+
104+
partial def eval [Add α] : Stream ι α → (ι → Option α) :=
105+
fun {q, next} =>
106+
match next q with
107+
| done => zero
108+
| skip q => eval {q, next}
109+
| emit q i v => (singleton i v) + eval {q, next}
110+
111+
#eval eval (ofArray #[1,3] * ofArray #[2,6]) 1
112+
113+
end two
114+
115+
namespace three
116+
117+
-- maybe
118+
structure Stream' (ι : Type) (α : Type u) where
119+
σ : Type
120+
-- the `valid` states are those at which `next` and `index` are defined
121+
valid : σ → Bool
122+
-- the `ready` states are those states where additionally `value` is defined
123+
ready : σ → Bool
124+
-- the next function splits into three parts:
125+
next : (q : σ) → valid q → σ
126+
index : (q : σ) → valid q → ι
127+
value : (q : σ) → ready q → α
128+
129+
structure Stream (ι α : Type) where
130+
σ : Type
131+
q : σ
132+
valid : σ → Bool
133+
ready : {x // valid x} → Bool
134+
next : {x // valid x} → σ
135+
index : {x // valid x} → ι
136+
value : {x // ready x} → α
137+
138+
139+
end three
140+
141+
namespace four
142+
structure Stream (ι α : Type) where
143+
σ : Type
144+
q : σ
145+
valid : σ → Bool
146+
ready : {x // valid x} → Bool
147+
index : {x // valid x} → ι
148+
value : {x // ready x} → α
149+
150+
seek : {x // valid x} → ι → Bool → σ
151+
152+
partial def eval [Add α] (s : Stream ι α) : (ι → Option α) :=
153+
if valid : s.valid s.q then
154+
let q := ⟨s.q, valid⟩
155+
let ready := s.ready q
156+
let index := s.index q
157+
-- next state
158+
let q' := s.seek q index ready
159+
-- current value to emit
160+
let current :=
161+
if ready : ready
162+
then let value := (s.value ⟨q, ready⟩)
163+
singleton index value
164+
else zero
165+
-- evaluate the remaining stream
166+
let rest := eval {s with q := q'}
167+
-- end result:
168+
current + rest
169+
else zero
170+
171+
end four
172+
173+
#exit
174+
175+
inductive Step (σ ι α : Type) where
176+
| done
177+
| skip (state : σ)
178+
| emit (state : σ) (index : ι) (value : α)
179+
open Step
180+
181+
structure Stream (ι : Type) (α : Type) where
182+
σ : Type
183+
q : σ
184+
seek : σ → ι → Bool → Step σ ι α
185+
186+
partial def eval [Add α] : Stream ι α → (ι → Option α) :=
187+
fun {q, seek} =>
188+
match seek q _ _ with
189+
| done => zero
190+
| skip q => eval {q, seek}
191+
| emit q i v => (singleton i v) + eval {q, seek}
192+
193+
end three
194+
195+
#exit
196+
197+
-- In analogy with streams representing sequences, we define the type of streams
198+
-- representing sequences of (ι × α) pairs, ordered by ι,
199+
-- which admit efficient `seek` up to or past a given index.
200+
201+
-- This pair of definitions is not used, but they are one logical step between "Stream Fusion" and Stream below
202+
23203
def ofArray (arr : Array α) : Stream α where
24204
q := 0
25-
next q := if h : q < arr.size then .ready (q+1) arr[q] else .done
205+
next q := if h : q < arr.size then .emit (q+1) arr[q] else .done
26206

27207
def eg1 (arr : Array Nat) :=
28208
let s := ofArray arr
29209
s.fold (fun a b => a+b) s.q 0
30-
31-
end SeqStream

0 commit comments

Comments
 (0)