From 4dec0b3f9ea26c6e4dd9d722e896ae741cbb77ea Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Wed, 6 Apr 2022 22:01:18 +0200 Subject: [PATCH 1/2] pylint: fix linting issues --- tests/integration/conftest.py | 7 +++---- tests/integration/test_schema_reader.py | 3 ++- tests/unit/conftest.py | 14 +++++++------- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 47da27f3f..46abb58fc 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -88,8 +88,7 @@ def fixture_kafka_server( lock_file = lock_path_for(transfer_file) with ExitStack() as stack: - # There is an issue with pylint here, see https:/github.com/tox-dev/py-filelock/issues/102 - with FileLock(str(lock_file)): # pylint: disable=abstract-class-instantiated + with FileLock(str(lock_file)): if transfer_file.exists(): config_data = ujson.loads(transfer_file.read_text()) zk_config = ZKConfig.from_dict(config_data["zookeeper"]) @@ -147,7 +146,7 @@ def fixture_kafka_server( yield kafka_servers # Signal the worker finished - with FileLock(str(lock_file)): # pylint: disable=abstract-class-instantiated + with FileLock(str(lock_file)): assert transfer_file.exists(), "transfer_file disappeared" config_data = ujson.loads(transfer_file.read_text()) config_data[WORKER_COUNTER_KEY] -= 1 @@ -156,7 +155,7 @@ def fixture_kafka_server( # Wait until every worker finished before stopping the servers worker_counter = float("inf") while worker_counter > 0: - with FileLock(str(lock_file)): # pylint: disable=abstract-class-instantiated + with FileLock(str(lock_file)): assert transfer_file.exists(), "transfer_file disappeared" config_data = ujson.loads(transfer_file.read_text()) worker_counter = config_data[WORKER_COUNTER_KEY] diff --git a/tests/integration/test_schema_reader.py b/tests/integration/test_schema_reader.py index d15bc1a96..f0d2c806b 100644 --- a/tests/integration/test_schema_reader.py +++ b/tests/integration/test_schema_reader.py @@ -5,8 +5,9 @@ from karapace.master_coordinator import MasterCoordinator from karapace.schema_reader import KafkaSchemaReader from karapace.utils import json_encode +from tests.integration.utils.kafka_server import KafkaServers from tests.schemas.json_schemas import TRUE_SCHEMA -from tests.utils import create_group_name_factory, create_subject_name_factory, KafkaServers, new_random_name +from tests.utils import create_group_name_factory, create_subject_name_factory, new_random_name import time diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 24bff1780..17504b1d4 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -9,13 +9,13 @@ class MockClient: def __init__(self, *args, **kwargs): pass - async def get_schema_for_id(self, *args, **kwargs): # pylint: disable=unused-argument,no-self-use + async def get_schema_for_id(self, *args, **kwargs): # pylint: disable=unused-argument return ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json) - async def get_latest_schema(self, *args, **kwargs): # pylint: disable=unused-argument,no-self-use + async def get_latest_schema(self, *args, **kwargs): # pylint: disable=unused-argument return 1, ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json) - async def post_new_schema(self, *args, **kwargs): # pylint: disable=unused-argument,no-self-use + async def post_new_schema(self, *args, **kwargs): # pylint: disable=unused-argument return 1 @@ -23,18 +23,18 @@ class MockProtobufClient: def __init__(self, *args, **kwargs): pass - async def get_schema_for_id2(self, *args, **kwargs): # pylint: disable=unused-argument,no-self-use + async def get_schema_for_id2(self, *args, **kwargs): # pylint: disable=unused-argument return ValidatedTypedSchema.parse(SchemaType.PROTOBUF, trim_margin(schema_protobuf2)) - async def get_schema_for_id(self, *args, **kwargs): # pylint: disable=unused-argument,no-self-use + async def get_schema_for_id(self, *args, **kwargs): # pylint: disable=unused-argument if args[0] != 1: return None return ValidatedTypedSchema.parse(SchemaType.PROTOBUF, trim_margin(schema_protobuf)) - async def get_latest_schema(self, *args, **kwargs): # pylint: disable=unused-argument,no-self-use + async def get_latest_schema(self, *args, **kwargs): # pylint: disable=unused-argument return 1, ValidatedTypedSchema.parse(SchemaType.PROTOBUF, trim_margin(schema_protobuf)) - async def post_new_schema(self, *args, **kwargs): # pylint: disable=unused-argument,no-self-use + async def post_new_schema(self, *args, **kwargs): # pylint: disable=unused-argument return 1 From 880dddc00a0426c4780ddfd16f788f044ce58673 Mon Sep 17 00:00:00 2001 From: "Augusto F. Hack" Date: Mon, 4 Apr 2022 16:31:08 +0200 Subject: [PATCH 2/2] schema_registry: made lock private The lock is used only internally by the schema_registry --- karapace/karapace.py | 2 -- karapace/schema_registry_apis.py | 3 ++- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/karapace/karapace.py b/karapace/karapace.py index 9a310d6aa..61ed3d9be 100644 --- a/karapace/karapace.py +++ b/karapace/karapace.py @@ -11,7 +11,6 @@ from karapace.rapu import HTTPResponse, RestApp from typing import NoReturn, Union -import asyncio import logging @@ -23,7 +22,6 @@ def __init__(self, config: Config) -> None: self.route("/", callback=self.root_get, method="GET") self.log = logging.getLogger("Karapace") self.app.on_startup.append(self.create_http_client) - self.master_lock = asyncio.Lock() self.log.info("Karapace initialized") self.app.on_shutdown.append(self.close_by_app) diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 287499544..746e3b733 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -57,6 +57,7 @@ def __init__(self, config: Config) -> None: self.ksr = KafkaSchemaReader(config=self.config, master_coordinator=self.mc) self.ksr.start() self.schema_lock = asyncio.Lock() + self._master_lock = asyncio.Lock() def _create_producer(self) -> KafkaProducer: while True: @@ -643,7 +644,7 @@ async def subject_versions_list(self, content_type, *, subject): self.r(list(subject_data["schemas"]), content_type, status=HTTPStatus.OK) async def get_master(self) -> Tuple[bool, Optional[str]]: - async with self.master_lock: + async with self._master_lock: while True: are_we_master, master_url = self.mc.get_master_info() if are_we_master is None: