Skip to content

Commit

Permalink
Merge branch 'aiven:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
amrutha-shanbhag authored Apr 10, 2022
2 parents 7552049 + dff56df commit ae0e52d
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 15 deletions.
2 changes: 0 additions & 2 deletions karapace/karapace.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from karapace.rapu import HTTPResponse, RestApp
from typing import NoReturn, Union

import asyncio
import logging


Expand All @@ -23,7 +22,6 @@ def __init__(self, config: Config) -> None:
self.route("/", callback=self.root_get, method="GET")
self.log = logging.getLogger("Karapace")
self.app.on_startup.append(self.create_http_client)
self.master_lock = asyncio.Lock()
self.log.info("Karapace initialized")
self.app.on_shutdown.append(self.close_by_app)

Expand Down
3 changes: 2 additions & 1 deletion karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(self, config: Config) -> None:
self.ksr = KafkaSchemaReader(config=self.config, master_coordinator=self.mc)
self.ksr.start()
self.schema_lock = asyncio.Lock()
self._master_lock = asyncio.Lock()

def _create_producer(self) -> KafkaProducer:
while True:
Expand Down Expand Up @@ -643,7 +644,7 @@ async def subject_versions_list(self, content_type, *, subject):
self.r(list(subject_data["schemas"]), content_type, status=HTTPStatus.OK)

async def get_master(self) -> Tuple[bool, Optional[str]]:
async with self.master_lock:
async with self._master_lock:
while True:
are_we_master, master_url = self.mc.get_master_info()
if are_we_master is None:
Expand Down
7 changes: 3 additions & 4 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ def fixture_kafka_server(
lock_file = lock_path_for(transfer_file)

with ExitStack() as stack:
# There is an issue with pylint here, see https:/github.com/tox-dev/py-filelock/issues/102
with FileLock(str(lock_file)): # pylint: disable=abstract-class-instantiated
with FileLock(str(lock_file)):
if transfer_file.exists():
config_data = ujson.loads(transfer_file.read_text())
zk_config = ZKConfig.from_dict(config_data["zookeeper"])
Expand Down Expand Up @@ -147,7 +146,7 @@ def fixture_kafka_server(
yield kafka_servers

# Signal the worker finished
with FileLock(str(lock_file)): # pylint: disable=abstract-class-instantiated
with FileLock(str(lock_file)):
assert transfer_file.exists(), "transfer_file disappeared"
config_data = ujson.loads(transfer_file.read_text())
config_data[WORKER_COUNTER_KEY] -= 1
Expand All @@ -156,7 +155,7 @@ def fixture_kafka_server(
# Wait until every worker finished before stopping the servers
worker_counter = float("inf")
while worker_counter > 0:
with FileLock(str(lock_file)): # pylint: disable=abstract-class-instantiated
with FileLock(str(lock_file)):
assert transfer_file.exists(), "transfer_file disappeared"
config_data = ujson.loads(transfer_file.read_text())
worker_counter = config_data[WORKER_COUNTER_KEY]
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/test_schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
from karapace.master_coordinator import MasterCoordinator
from karapace.schema_reader import KafkaSchemaReader
from karapace.utils import json_encode
from tests.integration.utils.kafka_server import KafkaServers
from tests.schemas.json_schemas import TRUE_SCHEMA
from tests.utils import create_group_name_factory, create_subject_name_factory, KafkaServers, new_random_name
from tests.utils import create_group_name_factory, create_subject_name_factory, new_random_name

import time

Expand Down
14 changes: 7 additions & 7 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,32 @@ class MockClient:
def __init__(self, *args, **kwargs):
pass

async def get_schema_for_id(self, *args, **kwargs): # pylint: disable=unused-argument,no-self-use
async def get_schema_for_id(self, *args, **kwargs): # pylint: disable=unused-argument
return ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json)

async def get_latest_schema(self, *args, **kwargs): # pylint: disable=unused-argument,no-self-use
async def get_latest_schema(self, *args, **kwargs): # pylint: disable=unused-argument
return 1, ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json)

async def post_new_schema(self, *args, **kwargs): # pylint: disable=unused-argument,no-self-use
async def post_new_schema(self, *args, **kwargs): # pylint: disable=unused-argument
return 1


class MockProtobufClient:
def __init__(self, *args, **kwargs):
pass

async def get_schema_for_id2(self, *args, **kwargs): # pylint: disable=unused-argument,no-self-use
async def get_schema_for_id2(self, *args, **kwargs): # pylint: disable=unused-argument
return ValidatedTypedSchema.parse(SchemaType.PROTOBUF, trim_margin(schema_protobuf2))

async def get_schema_for_id(self, *args, **kwargs): # pylint: disable=unused-argument,no-self-use
async def get_schema_for_id(self, *args, **kwargs): # pylint: disable=unused-argument
if args[0] != 1:
return None
return ValidatedTypedSchema.parse(SchemaType.PROTOBUF, trim_margin(schema_protobuf))

async def get_latest_schema(self, *args, **kwargs): # pylint: disable=unused-argument,no-self-use
async def get_latest_schema(self, *args, **kwargs): # pylint: disable=unused-argument
return 1, ValidatedTypedSchema.parse(SchemaType.PROTOBUF, trim_margin(schema_protobuf))

async def post_new_schema(self, *args, **kwargs): # pylint: disable=unused-argument,no-self-use
async def post_new_schema(self, *args, **kwargs): # pylint: disable=unused-argument
return 1


Expand Down

0 comments on commit ae0e52d

Please sign in to comment.