Skip to content

Commit

Permalink
Add support for timestamps with timezone in iceberg type converter
Browse files Browse the repository at this point in the history
  • Loading branch information
auden-woolfson committed Jan 30, 2025
1 parent dd1893c commit ec9e904
Show file tree
Hide file tree
Showing 19 changed files with 291 additions and 48 deletions.
4 changes: 4 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1775,6 +1775,10 @@ Map of Iceberg types to the relevant PrestoDB types:
- ``TIME``
* - ``TIMESTAMP``
- ``TIMESTAMP``
* - ``TIMESTAMP``
- ``TIMESTAMP_WITH_TIMEZONE``
* - ``STRING``
- ``VARCHAR``
* - ``UUID``
- ``UUID``
* - ``LIST``
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.parquet.format.Statistics;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.joda.time.DateTimeZone;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -999,8 +1000,12 @@ public void testDecimalBackedByINT32()
public void testTimestampMicrosBackedByINT64()
throws Exception
{
org.apache.parquet.schema.MessageType parquetSchema =
MessageTypeParser.parseMessageType("message ts_micros { optional INT64 test (TIMESTAMP_MICROS); }");
LogicalTypeAnnotation annotation = LogicalTypeAnnotation.timestampType(false, LogicalTypeAnnotation.TimeUnit.MICROS);
MessageType parquetSchema = Types.buildMessage()
.primitive(PrimitiveType.PrimitiveTypeName.INT64, OPTIONAL)
.as(annotation)
.named("test")
.named("ts_micros");
ContiguousSet<Long> longValues = longsBetween(1_000_000, 1_001_000);
ImmutableList.Builder<SqlTimestamp> expectedValues = new ImmutableList.Builder<>();
for (Long value : longValues) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.facebook.presto.common.type.RowType;
import com.facebook.presto.common.type.TimeType;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.TimestampWithTimeZoneType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.UuidType;
import com.facebook.presto.common.type.VarbinaryType;
Expand All @@ -47,6 +48,7 @@
import static com.facebook.presto.common.predicate.Marker.Bound.ABOVE;
import static com.facebook.presto.common.predicate.Marker.Bound.BELOW;
import static com.facebook.presto.common.predicate.Marker.Bound.EXACTLY;
import static com.facebook.presto.common.type.DateTimeEncoding.unpackMillisUtc;
import static com.facebook.presto.iceberg.IcebergColumnHandle.getPushedDownSubfield;
import static com.facebook.presto.iceberg.IcebergColumnHandle.isPushedDownSubfield;
import static com.facebook.presto.parquet.ParquetTypeUtils.columnPathFromSubfield;
Expand Down Expand Up @@ -203,6 +205,10 @@ private static Object getIcebergLiteralValue(Type type, Marker marker)
return MILLISECONDS.toMicros((Long) marker.getValue());
}

if (type instanceof TimestampWithTimeZoneType) {
return MILLISECONDS.toMicros(unpackMillisUtc((Long) marker.getValue()));
}

if (type instanceof VarcharType) {
return ((Slice) marker.getValue()).toStringUtf8();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@
import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetSchema;
import static com.facebook.presto.iceberg.IcebergUtil.validateTableMode;
import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported;
import static com.facebook.presto.iceberg.PartitionFields.getPartitionColumnName;
import static com.facebook.presto.iceberg.PartitionFields.getTransformTerm;
import static com.facebook.presto.iceberg.PartitionFields.toPartitionFields;
Expand Down Expand Up @@ -693,10 +692,6 @@ public void addColumn(ConnectorSession session, ConnectorTableHandle tableHandle

Type columnType = toIcebergType(column.getType());

if (columnType.equals(Types.TimestampType.withZone())) {
throw new PrestoException(NOT_SUPPORTED, format("Iceberg column type %s is not supported", columnType));
}

IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
verify(handle.getIcebergTableName().getTableType() == DATA, "only the data table can have columns added");
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
Expand Down Expand Up @@ -754,8 +749,6 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
Table icebergTable = getIcebergTable(session, table.getSchemaTableName());
validateTableMode(session, icebergTable);

verifyTypeSupported(icebergTable.schema());

return beginIcebergTableInsert(session, table, icebergTable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@
import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties;
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties;
import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported;
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema;
Expand Down Expand Up @@ -307,8 +306,6 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con

Schema schema = toIcebergSchema(tableMetadata.getColumns());

verifyTypeSupported(schema);

PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties()));

MetastoreContext metastoreContext = getMetastoreContext(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergView;
import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties;
import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported;
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
import static com.facebook.presto.iceberg.SchemaConverter.toPrestoSchema;
Expand Down Expand Up @@ -308,8 +307,6 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con

Schema schema = toIcebergSchema(tableMetadata.getColumns());

verifyTypeSupported(schema);

PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties()));
FileFormat fileFormat = getFileFormat(tableMetadata.getProperties());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,13 +457,6 @@ public static void validateTableMode(ConnectorSession session, org.apache.iceber
}
}

public static void verifyTypeSupported(Schema schema)
{
if (schema.columns().stream().anyMatch(column -> Types.TimestampType.withZone().equals(column.type()))) {
throw new PrestoException(NOT_SUPPORTED, format("Iceberg column type %s is not supported", Types.TimestampType.withZone()));
}
}

public static Map<String, String> createIcebergViewProperties(ConnectorSession session, String prestoVersion)
{
return ImmutableMap.<String, String>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.SmallintType.SMALLINT;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.hive.HiveType.HIVE_BINARY;
Expand Down Expand Up @@ -118,6 +119,10 @@ public static Type toPrestoType(org.apache.iceberg.types.Type type, TypeManager
case TIME:
return TimeType.TIME;
case TIMESTAMP:
Types.TimestampType timestampType = (Types.TimestampType) type.asPrimitiveType();
if (timestampType.shouldAdjustToUTC()) {
return TIMESTAMP_WITH_TIME_ZONE;
}
return TimestampType.TIMESTAMP;
case STRING:
return VarcharType.createUnboundedVarcharType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ public void testTimestamp()
@Test
public void testTimestampWithTimeZone()
{
assertQueryFails("CREATE TABLE test_timestamp_with_timezone (x timestamp with time zone)", "Iceberg column type timestamptz is not supported");
assertQueryFails("CREATE TABLE test_timestamp_with_timezone (x) AS SELECT TIMESTAMP '1969-12-01 00:00:00.000000 UTC'", "Iceberg column type timestamptz is not supported");
assertUpdate("CREATE TABLE test_timestamp_with_timezone (x timestamp)");
assertQueryFails("ALTER TABLE test_timestamp_with_timezone ADD COLUMN y timestamp with time zone", "Iceberg column type timestamptz is not supported");
assertQuerySucceeds("CREATE TABLE test_timestamp_with_timezone (x) AS SELECT TIMESTAMP '1969-12-01 00:00:00.000000 UTC'");
assertQuerySucceeds("ALTER TABLE test_timestamp_with_timezone ADD COLUMN y timestamp with time zone");
dropTable(getSession(), "test_timestamp_with_timezone");

assertQueryFails("CREATE TABLE test_timestamp_with_timezone (x) WITH ( format = 'ORC') AS SELECT TIMESTAMP '1969-12-01 00:00:00.000000 UTC'", "Unsupported Type: timestamp with time zone");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import com.facebook.presto.Session;
import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.TimestampWithTimeZoneType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.MaterializedRow;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.List;

import static com.facebook.presto.hive.HiveCommonSessionProperties.PARQUET_BATCH_READ_OPTIMIZATION_ENABLED;
import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

public class TestIcebergTypes
extends AbstractTestQueryFramework
{
protected QueryRunner createQueryRunner() throws Exception
{
return createIcebergQueryRunner(ImmutableMap.of(), ImmutableMap.of());
}

@DataProvider(name = "testTimestampWithTimezone")
public Object[][] createTestTimestampWithTimezoneData()
{
return new Object[][] {
{Session.builder(getSession())
.setCatalogSessionProperty("iceberg", PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, "true")
.build()},
{Session.builder(getSession())
.setCatalogSessionProperty("iceberg", PARQUET_BATCH_READ_OPTIMIZATION_ENABLED, "false")
.build()}
};
}

@Test(dataProvider = "testTimestampWithTimezone")
public void testTimestampWithTimezone(Session session)
{
QueryRunner runner = getQueryRunner();
String timestamptz = "TIMESTAMP '1984-12-08 00:10:00 America/Los_Angeles'";
String timestamp = "TIMESTAMP '1984-12-08 00:10:00'";

dropTableIfExists(runner, session.getCatalog().get(), session.getSchema().get(), "test_timestamptz");
assertQuerySucceeds(session, "CREATE TABLE test_timestamptz(a TIMESTAMP WITH TIME ZONE, b TIMESTAMP, c TIMESTAMP WITH TIME ZONE)");

String row = "(" + timestamptz + ", " + timestamp + ", " + timestamptz + ")";
for (int i = 0; i < 10; i++) {
assertUpdate(session, "INSERT INTO test_timestamptz values " + row, 1);
}

MaterializedResult initialRows = runner.execute(session, "SELECT * FROM test_timestamptz");

List<Type> types = initialRows.getTypes();
assertTrue(types.get(0) instanceof TimestampWithTimeZoneType);
assertTrue(types.get(1) instanceof TimestampType);

List<MaterializedRow> rows = initialRows.getMaterializedRows();
for (int i = 0; i < 10; i++) {
assertEquals("[1984-12-08T08:10Z[UTC], 1984-12-08T00:10, 1984-12-08T08:10Z[UTC]]", rows.get(i).toString());
}

dropTableIfExists(runner, session.getCatalog().get(), session.getSchema().get(), "test_timestamptz_partition");
assertQuerySucceeds(session, "CREATE TABLE test_timestamptz_partition(a TIMESTAMP WITH TIME ZONE, b TIMESTAMP, c TIMESTAMP WITH TIME ZONE) " +
"WITH (PARTITIONING = ARRAY['b'])");
assertUpdate(session, "INSERT INTO test_timestamptz_partition (a, b, c) SELECT a, b, c FROM test_timestamptz", 10);

MaterializedResult partitionRows = runner.execute(session, "SELECT * FROM test_timestamptz");

List<Type> partitionTypes = partitionRows.getTypes();
assertTrue(partitionTypes.get(0) instanceof TimestampWithTimeZoneType);
assertTrue(partitionTypes.get(1) instanceof TimestampType);

rows = partitionRows.getMaterializedRows();
for (int i = 0; i < 10; i++) {
assertEquals("[1984-12-08T08:10Z[UTC], 1984-12-08T00:10, 1984-12-08T08:10Z[UTC]]", rows.get(i).toString());
}

String earlyTimestamptz = "TIMESTAMP '1980-12-08 00:10:00 America/Los_Angeles'";
dropTableIfExists(runner, session.getCatalog().get(), session.getSchema().get(), "test_timestamptz_filter");
assertQuerySucceeds(session, "CREATE TABLE test_timestamptz_filter(a TIMESTAMP WITH TIME ZONE)");

for (int i = 0; i < 5; i++) {
assertUpdate(session, "INSERT INTO test_timestamptz_filter VALUES (" + earlyTimestamptz + ")", 1);
}
for (int i = 0; i < 5; i++) {
assertUpdate(session, "INSERT INTO test_timestamptz_filter VALUES (" + timestamptz + ")", 1);
}

MaterializedResult lateRows = runner.execute(session, "SELECT a FROM test_timestamptz_filter WHERE a > " + earlyTimestamptz);
assertEquals(lateRows.getMaterializedRows().size(), 5);

MaterializedResult lateRowsFromEquals = runner.execute(session, "SELECT a FROM test_timestamptz_filter WHERE a = " + timestamptz);
com.facebook.presto.testing.assertions.Assert.assertEquals(lateRows, lateRowsFromEquals);

MaterializedResult earlyRows = runner.execute(session, "SELECT a FROM test_timestamptz_filter WHERE a < " + timestamptz);
assertEquals(earlyRows.getMaterializedRows().size(), 5);

MaterializedResult earlyRowsFromEquals = runner.execute(session, "SELECT a FROM test_timestamptz_filter WHERE a = " + earlyTimestamptz);
com.facebook.presto.testing.assertions.Assert.assertEquals(earlyRows, earlyRowsFromEquals);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.common.type.IntegerType.INTEGER;
import static com.facebook.presto.common.type.RealType.REAL;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.common.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.iceberg.NestedFieldConverter.toIcebergNestedField;
Expand Down Expand Up @@ -176,6 +178,12 @@ protected static PrestoIcebergNestedField prestoIcebergNestedField(
case "date":
prestoType = DATE;
break;
case "timestamp":
prestoType = TIMESTAMP;
break;
case "timestamptz":
prestoType = TIMESTAMP_WITH_TIME_ZONE;
break;
case "nested":
prestoType = RowType.from(ImmutableList.of(
RowType.field("int", INTEGER),
Expand Down Expand Up @@ -239,6 +247,12 @@ protected static Types.NestedField nestedField(int id, String name)
case "date":
icebergType = Types.DateType.get();
break;
case "timestamp":
icebergType = Types.TimestampType.withoutZone();
break;
case "timestamptz":
icebergType = Types.TimestampType.withZone();
break;
case "nested":
icebergType = nested();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ protected static PrestoIcebergSchema prestoIcebergSchema(TypeManager typeManager
prestoIcebergNestedField(9, "varchar", typeManager),
prestoIcebergNestedField(10, "varbinary", typeManager),
prestoIcebergNestedField(11, "row", typeManager),
prestoIcebergNestedField(12, "date", typeManager)));
prestoIcebergNestedField(12, "date", typeManager),
prestoIcebergNestedField(13, "timestamp", typeManager),
prestoIcebergNestedField(14, "timestamptz", typeManager)));

Map<String, Integer> columnNameToIdMapping = getColumnNameToIdMapping();

Expand All @@ -114,11 +116,13 @@ private static Map<String, Integer> getColumnNameToIdMapping()
columnNameToIdMapping.put("varbinary", 10);
columnNameToIdMapping.put("row", 11);
columnNameToIdMapping.put("date", 12);
columnNameToIdMapping.put("array.element", 13);
columnNameToIdMapping.put("map.key", 14);
columnNameToIdMapping.put("map.value", 15);
columnNameToIdMapping.put("row.int", 16);
columnNameToIdMapping.put("row.varchar", 17);
columnNameToIdMapping.put("timestamp", 13);
columnNameToIdMapping.put("timestamptz", 14);
columnNameToIdMapping.put("array.element", 15);
columnNameToIdMapping.put("map.key", 16);
columnNameToIdMapping.put("map.value", 17);
columnNameToIdMapping.put("row.int", 18);
columnNameToIdMapping.put("row.varchar", 19);

return columnNameToIdMapping;
}
Expand All @@ -137,7 +141,9 @@ protected static Schema schema()
nestedField(9, "varchar"),
nestedField(10, "varbinary"),
nestedField(11, "row"),
nestedField(12, "date")));
nestedField(12, "date"),
nestedField(13, "timestamp"),
nestedField(14, "timestamptz")));

Type schemaAsStruct = Types.StructType.of(fields);
AtomicInteger nextFieldId = new AtomicInteger(1);
Expand Down
Loading

0 comments on commit ec9e904

Please sign in to comment.