diff --git a/karapace/karapace.py b/karapace/karapace.py index 9a310d6aa..61ed3d9be 100644 --- a/karapace/karapace.py +++ b/karapace/karapace.py @@ -11,7 +11,6 @@ from karapace.rapu import HTTPResponse, RestApp from typing import NoReturn, Union -import asyncio import logging @@ -23,7 +22,6 @@ def __init__(self, config: Config) -> None: self.route("/", callback=self.root_get, method="GET") self.log = logging.getLogger("Karapace") self.app.on_startup.append(self.create_http_client) - self.master_lock = asyncio.Lock() self.log.info("Karapace initialized") self.app.on_shutdown.append(self.close_by_app) diff --git a/karapace/schema_registry_apis.py b/karapace/schema_registry_apis.py index 287499544..746e3b733 100644 --- a/karapace/schema_registry_apis.py +++ b/karapace/schema_registry_apis.py @@ -57,6 +57,7 @@ def __init__(self, config: Config) -> None: self.ksr = KafkaSchemaReader(config=self.config, master_coordinator=self.mc) self.ksr.start() self.schema_lock = asyncio.Lock() + self._master_lock = asyncio.Lock() def _create_producer(self) -> KafkaProducer: while True: @@ -643,7 +644,7 @@ async def subject_versions_list(self, content_type, *, subject): self.r(list(subject_data["schemas"]), content_type, status=HTTPStatus.OK) async def get_master(self) -> Tuple[bool, Optional[str]]: - async with self.master_lock: + async with self._master_lock: while True: are_we_master, master_url = self.mc.get_master_info() if are_we_master is None: diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 47da27f3f..46abb58fc 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -88,8 +88,7 @@ def fixture_kafka_server( lock_file = lock_path_for(transfer_file) with ExitStack() as stack: - # There is an issue with pylint here, see https:/github.com/tox-dev/py-filelock/issues/102 - with FileLock(str(lock_file)): # pylint: disable=abstract-class-instantiated + with FileLock(str(lock_file)): if transfer_file.exists(): config_data = ujson.loads(transfer_file.read_text()) zk_config = ZKConfig.from_dict(config_data["zookeeper"]) @@ -147,7 +146,7 @@ def fixture_kafka_server( yield kafka_servers # Signal the worker finished - with FileLock(str(lock_file)): # pylint: disable=abstract-class-instantiated + with FileLock(str(lock_file)): assert transfer_file.exists(), "transfer_file disappeared" config_data = ujson.loads(transfer_file.read_text()) config_data[WORKER_COUNTER_KEY] -= 1 @@ -156,7 +155,7 @@ def fixture_kafka_server( # Wait until every worker finished before stopping the servers worker_counter = float("inf") while worker_counter > 0: - with FileLock(str(lock_file)): # pylint: disable=abstract-class-instantiated + with FileLock(str(lock_file)): assert transfer_file.exists(), "transfer_file disappeared" config_data = ujson.loads(transfer_file.read_text()) worker_counter = config_data[WORKER_COUNTER_KEY] diff --git a/tests/integration/test_schema_reader.py b/tests/integration/test_schema_reader.py index d15bc1a96..f0d2c806b 100644 --- a/tests/integration/test_schema_reader.py +++ b/tests/integration/test_schema_reader.py @@ -5,8 +5,9 @@ from karapace.master_coordinator import MasterCoordinator from karapace.schema_reader import KafkaSchemaReader from karapace.utils import json_encode +from tests.integration.utils.kafka_server import KafkaServers from tests.schemas.json_schemas import TRUE_SCHEMA -from tests.utils import create_group_name_factory, create_subject_name_factory, KafkaServers, new_random_name +from tests.utils import create_group_name_factory, create_subject_name_factory, new_random_name import time diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 24bff1780..17504b1d4 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -9,13 +9,13 @@ class MockClient: def __init__(self, *args, **kwargs): pass - async def get_schema_for_id(self, *args, **kwargs): # pylint: disable=unused-argument,no-self-use + async def get_schema_for_id(self, *args, **kwargs): # pylint: disable=unused-argument return ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json) - async def get_latest_schema(self, *args, **kwargs): # pylint: disable=unused-argument,no-self-use + async def get_latest_schema(self, *args, **kwargs): # pylint: disable=unused-argument return 1, ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json) - async def post_new_schema(self, *args, **kwargs): # pylint: disable=unused-argument,no-self-use + async def post_new_schema(self, *args, **kwargs): # pylint: disable=unused-argument return 1 @@ -23,18 +23,18 @@ class MockProtobufClient: def __init__(self, *args, **kwargs): pass - async def get_schema_for_id2(self, *args, **kwargs): # pylint: disable=unused-argument,no-self-use + async def get_schema_for_id2(self, *args, **kwargs): # pylint: disable=unused-argument return ValidatedTypedSchema.parse(SchemaType.PROTOBUF, trim_margin(schema_protobuf2)) - async def get_schema_for_id(self, *args, **kwargs): # pylint: disable=unused-argument,no-self-use + async def get_schema_for_id(self, *args, **kwargs): # pylint: disable=unused-argument if args[0] != 1: return None return ValidatedTypedSchema.parse(SchemaType.PROTOBUF, trim_margin(schema_protobuf)) - async def get_latest_schema(self, *args, **kwargs): # pylint: disable=unused-argument,no-self-use + async def get_latest_schema(self, *args, **kwargs): # pylint: disable=unused-argument return 1, ValidatedTypedSchema.parse(SchemaType.PROTOBUF, trim_margin(schema_protobuf)) - async def post_new_schema(self, *args, **kwargs): # pylint: disable=unused-argument,no-self-use + async def post_new_schema(self, *args, **kwargs): # pylint: disable=unused-argument return 1