Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev mac experiments #713

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ examples/k8s/charts
target
wheelhouse
requirements.*
.DS_Store
3 changes: 2 additions & 1 deletion examples/exampledata/config/http_pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ process_count: 4
config_refresh_interval: 5
profile_pipelines: false
restart_count: 3
error_backlog_size: 150
logger:
level: INFO
loggers:
Expand All @@ -29,7 +30,7 @@ metrics:
input:
httpinput:
type: http_input
message_backlog_size: 1500000
message_backlog_size: 150
collect_meta: true
metafield_name: "@metadata"
uvicorn_config:
Expand Down
4 changes: 2 additions & 2 deletions examples/exampledata/config/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ process_count: 2
timeout: 0.1
restart_count: 2
config_refresh_interval: 5
error_backlog_size: 1500000
error_backlog_size: 150
logger:
level: INFO
level: DEBUG
format: "%(asctime)-15s %(hostname)-5s %(name)-10s %(levelname)-8s: %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S"
loggers:
Expand Down
2 changes: 1 addition & 1 deletion logprep/connector/http/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ def _get_asgi_app(endpoints_config: dict) -> falcon.asgi.App:

def _get_event(self, timeout: float) -> Tuple:
"""Returns the first message from the queue"""
self.metrics.message_backlog_size += self.messages.qsize()
#self.metrics.message_backlog_size += self.messages.qsize()
try:
message = self.messages.get(timeout=timeout)
raw_message = str(message).encode("utf8")
Expand Down
2 changes: 1 addition & 1 deletion logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ def _drain_input_queues(self) -> None:
if not hasattr(self._input, "messages"):
return
if isinstance(self._input.messages, multiprocessing.queues.Queue):
while self._input.messages.qsize():
while not self._input.messages.empty():
self.process_pipeline()

def stop(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def throttle(self, batch_size=1):

def put(self, obj, block=True, timeout=None, batch_size=1):
"""Put an obj into the queue."""
self.throttle(batch_size)
#self.throttle(batch_size)
super().put(obj, block=block, timeout=timeout)


Expand Down
4 changes: 2 additions & 2 deletions logprep/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ def _iterate(self):
self.exit_code = EXITCODES.PIPELINE_ERROR.value
self._logger.error("Restart count exceeded. Exiting.")
sys.exit(self.exit_code)
if self._manager.error_queue is not None:
self.metrics.number_of_events_in_error_queue += self._manager.error_queue.qsize()
#if self._manager.error_queue is not None:
# self.metrics.number_of_events_in_error_queue += self._manager.error_queue.qsize()
self._manager.restart_failed_pipeline()

def reload_configuration(self):
Expand Down
6 changes: 6 additions & 0 deletions logprep/util/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
import multiprocessing as mp
from logging.handlers import QueueListener
from socket import gethostname
import platform
if platform.system() != "Linux":
from multiprocessing import set_start_method
set_start_method("fork")



logqueue = mp.Queue(-1)

Expand Down
Loading