Skip to content

Commit

Permalink
Merge pull request Aiven-Open#190 from aiven/fix-rest-get-topics-sube…
Browse files Browse the repository at this point in the history
…rror-code

rest: fix error code
  • Loading branch information
tvainika authored Mar 26, 2021
2 parents d3e2dd3 + 736823d commit e18a059
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 16 deletions.
76 changes: 63 additions & 13 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,11 @@ def validate_partition_id(partition_id: str, content_type: str) -> int:
try:
return int(partition_id)
except ValueError:
KafkaRest.not_found(message=f"Partition {partition_id} not found", content_type=content_type, sub_code=404)
KafkaRest.not_found(
message=f"Partition {partition_id} not found",
content_type=content_type,
sub_code=RESTErrorCodes.HTTP_NOT_FOUND.value,
)

@staticmethod
def is_valid_schema_request(data: dict, prefix: str) -> bool:
Expand Down Expand Up @@ -483,18 +487,32 @@ def get_partition_info(self, topic: str, partition: str, content_type: str) -> d
for p in partitions:
if p["partition"] == partition:
return p
self.not_found(message=f"Partition {partition} not found", content_type=content_type, sub_code=40402)
self.not_found(
message=f"Partition {partition} not found",
content_type=content_type,
sub_code=RESTErrorCodes.PARTITION_NOT_FOUND.value,
)
except UnknownTopicOrPartitionError:
self.not_found(message=f"Partition {partition} not found", content_type=content_type, sub_code=40402)
self.not_found(
message=f"Partition {partition} not found",
content_type=content_type,
sub_code=RESTErrorCodes.PARTITION_NOT_FOUND.value,
)
except KeyError:
self.not_found(message=f"Topic {topic} not found", content_type=content_type, sub_code=40401)
self.not_found(
message=f"Topic {topic} not found",
content_type=content_type,
sub_code=RESTErrorCodes.TOPIC_NOT_FOUND.value,
)
return {}

def get_topic_info(self, topic: str, content_type: str) -> dict:
md = self.cluster_metadata()["topics"]
if topic not in md:
self.not_found(
message=f"Topic {topic} not found in {list(md.keys())}", content_type=content_type, sub_code=40401
message=f"Topic {topic} not found in {list(md.keys())}",
content_type=content_type,
sub_code=RESTErrorCodes.TOPIC_NOT_FOUND.value,
)
return md[topic]

Expand Down Expand Up @@ -531,11 +549,19 @@ async def validate_publish_request_format(self, data: dict, formats: dict, conte

# disallow missing or non empty 'records' key , plus any other keys
if "records" not in data or set(data.keys()).difference(PUBLISH_KEYS) or not data["records"]:
self.unprocessable_entity(message="Invalid request format", content_type=content_type, sub_code=422)
self.unprocessable_entity(
message="Invalid request format",
content_type=content_type,
sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value,
)
for r in data["records"]:
convert_to_int(r, "partition", content_type)
if set(r.keys()).difference(RECORD_KEYS):
self.unprocessable_entity(message="Invalid request format", content_type=content_type, sub_code=422)
self.unprocessable_entity(
message="Invalid request format",
content_type=content_type,
sub_code=RESTErrorCodes.HTTP_UNPROCESSABLE_ENTITY.value,
)
# disallow missing id and schema for any key/value list that has at least one populated element
if formats["embedded_format"] in {"avro", "jsonschema"}:
for prefix, code in zip(RECORD_KEYS, RECORD_CODES):
Expand All @@ -551,7 +577,11 @@ async def validate_publish_request_format(self, data: dict, formats: dict, conte
try:
await self.validate_schema_info(data, prefix, content_type, topic, formats["embedded_format"])
except InvalidMessageSchema as e:
self.unprocessable_entity(message=str(e), content_type=content_type, sub_code=42205)
self.unprocessable_entity(
message=str(e),
content_type=content_type,
sub_code=RESTErrorCodes.INVALID_DATA.value,
)

async def produce_message(self, *, topic: str, key: bytes, value: bytes, partition: int = None) -> dict:
prod = None
Expand Down Expand Up @@ -594,21 +624,33 @@ def topic_details(self, content_type: str, *, topic: str):
metadata = self.cluster_metadata([topic])
config = self.get_topic_config(topic)
if topic not in metadata["topics"]:
self.not_found(message=f"Topic {topic} not found", content_type=content_type, sub_code=40401)
self.not_found(
message=f"Topic {topic} not found",
content_type=content_type,
sub_code=RESTErrorCodes.TOPIC_NOT_FOUND.value,
)
data = metadata["topics"][topic]
data["name"] = topic
data["configs"] = config
self.r(data, content_type)
except UnknownTopicOrPartitionError:
self.not_found(message=f"Topic {topic} not found", content_type=content_type, sub_code=40401)
self.not_found(
message=f"Topic {topic} not found",
content_type=content_type,
sub_code=RESTErrorCodes.UNKNOWN_TOPIC_OR_PARTITION.value,
)

def list_partitions(self, content_type: str, *, topic: Optional[str]):
self.log.info("Retrieving partition details for topic %s", topic)
try:
topic_details = self.cluster_metadata([topic])["topics"]
self.r(topic_details[topic]["partitions"], content_type)
except (UnknownTopicOrPartitionError, KeyError):
self.not_found(message=f"Topic {topic} not found", content_type=content_type, sub_code=40401)
self.not_found(
message=f"Topic {topic} not found",
content_type=content_type,
sub_code=RESTErrorCodes.TOPIC_NOT_FOUND.value,
)

def partition_details(self, content_type: str, *, topic: str, partition_id: str):
self.log.info("Retrieving partition details for topic %s and partition %s", topic, partition_id)
Expand All @@ -623,8 +665,16 @@ def partition_offsets(self, content_type: str, *, topic: str, partition_id: str)
except UnknownTopicOrPartitionError as e:
# Do a topics request on failure, figure out faster ways once we get correctness down
if topic not in self.cluster_metadata()["topics"]:
self.not_found(message=f"Topic {topic} not found: {e}", content_type=content_type, sub_code=40401)
self.not_found(message=f"Partition {partition_id} not found: {e}", content_type=content_type, sub_code=40402)
self.not_found(
message=f"Topic {topic} not found: {e}",
content_type=content_type,
sub_code=RESTErrorCodes.TOPIC_NOT_FOUND.value,
)
self.not_found(
message=f"Partition {partition_id} not found: {e}",
content_type=content_type,
sub_code=RESTErrorCodes.PARTITION_NOT_FOUND.value,
)

def list_brokers(self, content_type: str):
metadata = self.cluster_metadata()
Expand Down
5 changes: 3 additions & 2 deletions karapace/kafka_rest_apis/error_codes.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from enum import Enum, unique
from enum import Enum
from http import HTTPStatus


@unique
class RESTErrorCodes(Enum):
HTTP_BAD_REQUEST = HTTPStatus.BAD_REQUEST.value
HTTP_NOT_FOUND = HTTPStatus.NOT_FOUND.value
HTTP_INTERNAL_SERVER_ERROR = HTTPStatus.INTERNAL_SERVER_ERROR.value
HTTP_UNPROCESSABLE_ENTITY = HTTPStatus.UNPROCESSABLE_ENTITY.value
TOPIC_NOT_FOUND = 40401
PARTITION_NOT_FOUND = 40402
CONSUMER_NOT_FOUND = 40403
UNKNOWN_TOPIC_OR_PARTITION = 40403
UNSUPPORTED_FORMAT = 40601
SCHEMA_RETRIEVAL_ERROR = 40801
CONSUMER_ALREADY_EXISTS = 40902
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ async def test_topics(rest_async_client, admin_client):
assert data["partitions"][0]["replicas"][0]["in_sync"], "Replica should be in sync"
res = await rest_async_client.get(f"/topics/{topic_foo}")
assert res.status_code == 404, f"Topic {topic_foo} should not exist, status_code={res.status_code}"
assert res.json()["error_code"] == 40401, "Error code does not match"
assert res.json()["error_code"] == 40403, "Error code does not match"


async def test_publish(rest_async_client, admin_client):
Expand Down

0 comments on commit e18a059

Please sign in to comment.