Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.avro.SchemaValidator;
import org.apache.avro.SchemaValidatorBuilder;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.validator.StructSchemaDataValidator;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;

Expand All @@ -51,11 +52,12 @@ public void checkCompatible(Iterable<SchemaData> from, SchemaData to, SchemaComp
checkArgument(from != null, "check compatibility list is null");
try {
for (SchemaData schemaData : from) {
Schema.Parser parser = new Schema.Parser();
Schema.Parser parser =
new Schema.Parser(StructSchemaDataValidator.COMPATIBLE_NAME_VALIDATOR);
parser.setValidateDefaults(false);
fromList.addFirst(parser.parse(new String(schemaData.getData(), UTF_8)));
}
Schema.Parser parser = new Schema.Parser();
Schema.Parser parser = new Schema.Parser(StructSchemaDataValidator.COMPATIBLE_NAME_VALIDATOR);
parser.setValidateDefaults(false);
Schema toSchema = parser.parse(new String(to.getData(), UTF_8));
SchemaValidator schemaValidator = createSchemaValidator(strategy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
import org.apache.pulsar.broker.service.schema.validator.StructSchemaDataValidator;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaHash;
Expand Down Expand Up @@ -414,12 +415,13 @@ public CompletableFuture<SchemaVersion> getSchemaVersionBySchemaData(
final CompletableFuture<SchemaVersion> completableFuture = new CompletableFuture<>();
SchemaVersion schemaVersion;
if (isUsingAvroSchemaParser(schemaData.getType())) {
Schema.Parser parser = new Schema.Parser();
Schema.Parser parser = new Schema.Parser(StructSchemaDataValidator.COMPATIBLE_NAME_VALIDATOR);
Schema newSchema = parser.parse(new String(schemaData.getData(), UTF_8));

for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
if (isUsingAvroSchemaParser(schemaAndMetadata.schema.getType())) {
Schema.Parser existParser = new Schema.Parser();
Schema.Parser existParser =
new Schema.Parser(StructSchemaDataValidator.COMPATIBLE_NAME_VALIDATOR);
Schema existSchema = existParser.parse(new String(schemaAndMetadata.schema.getData(), UTF_8));
if (newSchema.equals(existSchema) && schemaAndMetadata.schema.getType() == schemaData.getType()) {
schemaVersion = schemaAndMetadata.version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import java.io.IOException;
import org.apache.avro.NameValidator;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
Expand All @@ -32,13 +33,14 @@
/**
* Validate if the struct schema is in expected form.
*/
class StructSchemaDataValidator implements SchemaDataValidator {
public class StructSchemaDataValidator implements SchemaDataValidator {

public static StructSchemaDataValidator of() {
return INSTANCE;
}

private static final StructSchemaDataValidator INSTANCE = new StructSchemaDataValidator();
public static final NameValidator COMPATIBLE_NAME_VALIDATOR = new CompatibleNameValidator();

private StructSchemaDataValidator() {}

Expand All @@ -49,7 +51,7 @@ public void validate(SchemaData schemaData) throws InvalidSchemaDataException {
byte[] data = schemaData.getData();

try {
Schema.Parser avroSchemaParser = new Schema.Parser();
Schema.Parser avroSchemaParser = new Schema.Parser(COMPATIBLE_NAME_VALIDATOR);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Pulsar code base includes new Schema.Parser() in many locations. Wouldn't we have to use the custom name validator in all cases?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll change them also.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some factory class to pulsar-common? There's no avro dependency currently, but I guess it could justified.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after the evaluation. I think this PR should focus solely on fixing the issue and eliminating breaking changes in the broker versions. To align the usage. We could try another PR on the client side to support it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pulsar-common is also used on broker side, so how could it be separated? I would assume that it would make sense to cover both client + broker to use the COMPATIBLE_NAME_VALIDATOR in the avro schema parser in all locations. It should be a task that Claude Code could handle without much intervention.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, pulsar-common doesn't currently depend on avro libraries (which could be a useful thing) so it would require a solution to share the name validator across broker and client.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to handle broker-side first. Please rename the PR title so that it captures this detail.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

applied.

avroSchemaParser.setValidateDefaults(false);
Schema schema = avroSchemaParser.parse(new String(data, UTF_8));
if (SchemaType.AVRO.equals(schemaData.getType())) {
Expand Down Expand Up @@ -97,4 +99,30 @@ private static void throwInvalidSchemaDataException(SchemaData schemaData,
throw new InvalidSchemaDataException("Invalid schema definition data for "
+ schemaData.getType() + " schema", cause);
}

static class CompatibleNameValidator implements NameValidator {

@Override
public Result validate(String name) {
if (name == null) {
return new Result("Null name");
}
final int length = name.length();
if (length == 0) {
return new Result("Empty name");
}
final char first = name.charAt(0);
if (!(Character.isLetter(first) || first == '_' || first == '$')) {
return new Result("Illegal initial character: " + name);
}
for (int i = 1; i < length; i++) {
final char c = name.charAt(i);
// we need to allow $ for the special case
if (!(Character.isLetterOrDigit(c) || c == '_' || c == '$')) {
return new Result("Illegal character in: " + name);
}
}
return OK;
}
}
}
35 changes: 35 additions & 0 deletions pulsar-broker/src/main/proto/DataRecord.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
syntax = "proto3";

package pulsar.schema;
option java_package = "org.apache.pulsar.broker.service.schema.proto";


message DataRecord {
string field1 = 1;
int64 field2 = 2;
NestedDataRecord field3 = 3;
repeated NestedDataRecord fields4 = 4;

message NestedDataRecord {
string field1 = 1;
int64 field2 = 2;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
import org.apache.avro.NameValidator;
import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
import org.apache.pulsar.broker.service.schema.proto.DataRecordOuterClass;
import org.apache.pulsar.broker.service.schema.validator.StructSchemaDataValidator.CompatibleNameValidator;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.ProtobufSchema;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -148,4 +153,119 @@ public void testJsonSchemaTypeWithJsonSchemaData() throws Exception {
}
}

@Test
public void testCompatibleNameValidatorValidNames() {
CompatibleNameValidator validator = new CompatibleNameValidator();

String[] validNames = {
"validName",
"ValidName",
"valid_name",
"valid$name",
"_validName",
"$validName",
"name123",
"Name_123$",
"a",
"A",
"_",
"$",
"validNameWithMultiple$ymbols_and_numbers123"
};

for (String name : validNames) {
NameValidator.Result result = validator.validate(name);
Assert.assertTrue(result.isOK(),
"Expected validation to pass for name: '" + name + "', but got error: " + result.getErrors());
}
}

@Test
public void testCompatibleNameValidatorInvalidNames() {
CompatibleNameValidator validator = new CompatibleNameValidator();

String[] invalidNames = {
null,
"",
"123name",
"1name",
"name-with-dash",
"name with space",
"name.with.dot",
"name@symbol",
"name#hash",
"name%percent",
"name&ampersand",
"name*asterisk",
"name(parentheses)",
"name+plus",
"name=equals",
"name[brackets]",
"name{braces}",
"name|pipe",
"name\\backslash",
"name:colon",
"name;semicolon",
"name\"quote",
"name'apostrophe",
"name<greater>",
"name,comma",
"name?question",
"name!exclamation",
"name`backtick",
"name~tilde",
"name^caret"
};

for (String name : invalidNames) {
NameValidator.Result result = validator.validate(name);
Assert.assertFalse(result.isOK(), "Expected validation to fail for name: '" + name + "'");
}
}

@Test
public void testCompatibleNameValidatorSpecificErrorMessages() throws Exception {
CompatibleNameValidator validator = new CompatibleNameValidator();

NameValidator.Result nullResult = validator.validate(null);
Assert.assertFalse(nullResult.isOK());
Assert.assertEquals(nullResult.getErrors(), "Null name");

NameValidator.Result emptyResult = validator.validate("");
Assert.assertFalse(emptyResult.isOK());
Assert.assertEquals(emptyResult.getErrors(), "Empty name");

NameValidator.Result invalidFirstCharResult = validator.validate("123name");
Assert.assertFalse(invalidFirstCharResult.isOK());
Assert.assertTrue(invalidFirstCharResult.getErrors().contains("Illegal initial character"));

NameValidator.Result invalidCharResult = validator.validate("name-with-dash");
Assert.assertFalse(invalidCharResult.isOK());
Assert.assertTrue(invalidCharResult.getErrors().contains("Illegal character in"));
}

@Test
public void testCompatibleNameValidatorEdgeCases() throws Exception {
CompatibleNameValidator validator = new CompatibleNameValidator();

Assert.assertTrue(validator.validate("a").isOK());
Assert.assertTrue(validator.validate("A").isOK());
Assert.assertTrue(validator.validate("_").isOK());
Assert.assertTrue(validator.validate("$").isOK());

NameValidator.Result longNameResult = validator.validate("a".repeat(1000));
Assert.assertTrue(longNameResult.isOK());

NameValidator.Result nameWithOnlyDigits = validator.validate("123");
Assert.assertFalse(nameWithOnlyDigits.isOK());
Assert.assertTrue(nameWithOnlyDigits.getErrors().contains("Illegal initial character"));
}

@Test
public void testAvroCompatible() throws InvalidSchemaDataException {
final ProtobufSchema<DataRecordOuterClass.DataRecord> protobufSchema =
ProtobufSchema.of(DataRecordOuterClass.DataRecord.class);
StructSchemaDataValidator.of().validate(SchemaData.fromSchemaInfo(protobufSchema.getSchemaInfo()));
}

}
Loading