Skip to content

Commit 401c9d6

Browse files
author
Yann Diorcet
committed
Support continious aggregate on view
1 parent 9ecd4c2 commit 401c9d6

File tree

1 file changed

+61
-24
lines changed

1 file changed

+61
-24
lines changed

sqlalchemy_timescaledb/dialect.py

Lines changed: 61 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
import textwrap
2+
from typing import Optional, Mapping, Any
23

3-
from sqlalchemy import schema, event, DDL
4+
from sqlalchemy import schema, event, DDL, Table, Dialect, ExecutableDDLElement
45
from sqlalchemy.dialects.postgresql.asyncpg import PGDialect_asyncpg
56
from sqlalchemy.dialects.postgresql.base import PGDDLCompiler
67
from sqlalchemy.dialects.postgresql.psycopg2 import PGDialect_psycopg2
8+
from sqlalchemy.engine.interfaces import SchemaTranslateMapType
9+
from sqlalchemy.ext import compiler
10+
from sqlalchemy_utils.view import CreateView, compile_create_materialized_view
711

812
try:
913
import alembic
@@ -16,8 +20,44 @@ class TimescaledbImpl(postgresql.PostgresqlImpl):
1620
__dialect__ = 'timescaledb'
1721

1822

23+
def _get_interval(value):
24+
if isinstance(value, str):
25+
return f"INTERVAL '{value}'"
26+
elif isinstance(value, int):
27+
return str(value)
28+
else:
29+
return "NULL"
30+
31+
32+
def _create_map(mapping: dict):
33+
return ", ".join([f'{key} => {value}' for key, value in mapping.items()])
34+
35+
36+
@compiler.compiles(CreateView, 'timescaledb')
37+
def compile_create_view(create, compiler, **kw):
38+
return compiler.visit_create_view(create, **kw)
39+
1940
class TimescaledbDDLCompiler(PGDDLCompiler):
20-
def post_create_table(self, table):
41+
42+
def visit_create_view(self, create, **kw):
43+
ret = compile_create_materialized_view(create, self, **kw)
44+
view = create.element
45+
continuous = view.kwargs.get('timescaledb_continuous', {})
46+
if continuous:
47+
event.listen(
48+
view,
49+
'after_create',
50+
self.ddl_add_continuous(
51+
view.name, continuous
52+
).execute_if(
53+
dialect='timescaledb'
54+
)
55+
)
56+
return ret
57+
58+
def visit_create_table(self, create, **kw):
59+
ret = super().visit_create_table(create, **kw)
60+
table = create.element
2161
hypertable = table.kwargs.get('timescaledb_hypertable', {})
2262
compress = table.kwargs.get('timescaledb_compress', {})
2363

@@ -52,28 +92,15 @@ def post_create_table(self, table):
5292
)
5393
)
5494

55-
56-
return super().post_create_table(table)
95+
return ret
5796

5897
@staticmethod
5998
def ddl_hypertable(table_name, hypertable):
6099
time_column_name = hypertable['time_column_name']
61-
chunk_time_interval = hypertable.get('chunk_time_interval', '7 days')
62-
63-
if isinstance(chunk_time_interval, str):
64-
if chunk_time_interval.isdigit():
65-
chunk_time_interval = int(chunk_time_interval)
66-
else:
67-
chunk_time_interval = f"INTERVAL '{chunk_time_interval}'"
100+
chunk_time_interval = _get_interval(hypertable.get('chunk_time_interval', '7 days'))
68101

69-
return DDL(textwrap.dedent(f"""
70-
SELECT create_hypertable(
71-
'{table_name}',
72-
'{time_column_name}',
73-
chunk_time_interval => {chunk_time_interval},
74-
if_not_exists => TRUE
75-
)
76-
"""))
102+
parameters = _create_map(dict(chunk_time_interval=chunk_time_interval, if_not_exists="TRUE"))
103+
return DDL(textwrap.dedent(f"""SELECT create_hypertable('{table_name}','{time_column_name}',{parameters})"""))
77104

78105
@staticmethod
79106
def ddl_compress(table_name, compress):
@@ -85,11 +112,20 @@ def ddl_compress(table_name, compress):
85112

86113
@staticmethod
87114
def ddl_compression_policy(table_name, compress):
88-
compression_policy_interval = compress.get('compression_policy_interval', '7 days')
115+
schedule_interval = _get_interval(compress.get('compression_policy_schedule_interval', '7 days'))
89116

90-
return DDL(textwrap.dedent(f"""
91-
SELECT add_compression_policy('{table_name}', INTERVAL '{compression_policy_interval}')
92-
"""))
117+
parameters = _create_map(dict(schedule_interval=schedule_interval))
118+
return DDL(textwrap.dedent(f"""SELECT add_compression_policy('{table_name}', {parameters}')"""))
119+
120+
@staticmethod
121+
def ddl_add_continuous(table_name, continuous):
122+
start_offset = _get_interval(continuous.get('continuous_aggregate_policy_start_offset', None))
123+
end_offset = _get_interval(continuous.get('continuous_aggregate_policy_end_offset', None))
124+
schedule_interval = _get_interval(continuous.get('continuous_aggregate_policy_schedule_interval', None))
125+
126+
parameters = _create_map(
127+
dict(start_offset=start_offset, end_offset=end_offset, schedule_interval=schedule_interval))
128+
return DDL(textwrap.dedent(f"""SELECT add_continuous_aggregate_policy('{table_name}', {parameters}')"""))
93129

94130

95131
class TimescaledbDialect:
@@ -99,7 +135,8 @@ class TimescaledbDialect:
99135
(
100136
schema.Table, {
101137
"hypertable": {},
102-
"compress": {}
138+
"compress": {},
139+
"continuous": {},
103140
}
104141
)
105142
]

0 commit comments

Comments
 (0)