Skip to content

Commit

Permalink
workers: revision worker implementation
Browse files Browse the repository at this point in the history
WIP DO NOT MERGE
Commit message TBD

- add abstract Worker class (bug 1744327)
- add main worker flag and capacity/throttle flags
- add many to many fields + association to revisions/landing jobs
- add method to parse diff and list affected files
- add more test coverage for revision_worker.py
- add mots integration (bug 1740107)
- add new RevisionWorker that pre-processes revisions (bug 1788728)
- add new RevisionWorker that pre-processes revisions (bug 1788728)
- add new start/stop commands to manage workers
- add new flags to stop workers gracefully (*_WORKER_STOPPED)
- add patch caching on disk
- add proper loop/process functionality to workers
- add repo.use_revision_worker feature flag (bug 1788732)
- add mots hashes check
- improved edge search functionality
- implement stack hashes to detect changes in revisions (via get_stack_hashes)
- include new Lando revision info via API endpoint
- refactor dependency and stack fetching and parsing using networkx
- refactored revision worker and landing worker to use Worker class
- remove s3/boto/etc. dependencies (bug 1753728)
- rename old command lando-cli landing-worker to lando-cli start-landing-worker
- run pre/post mots query
- store mots output in revision model
  • Loading branch information
zzzeid committed May 11, 2023
1 parent ecd2abe commit fcfd98b
Show file tree
Hide file tree
Showing 38 changed files with 2,266 additions and 378 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[flake8]
max-line-length = 88
select = C,E,F,W,B,B9
ignore = E203, E501, W503, B006
ignore = E203, E501, W503, B006, E712, E711
exclude =
.hg,
.git,
Expand Down
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ RUN cd / && pip install --no-cache /app
ENV PYTHONPATH /app
RUN chown -R app:app /app

# Create repos directory for transplanting in landing-worker
# Create repos directory for landing-worker and revision worker.
RUN mkdir /repos
RUN chown -R app:app /repos

# Run as a non-privileged user
USER app
Expand Down
2 changes: 2 additions & 0 deletions Dockerfile-dev
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ ENV PYTHONUNBUFFERED=1
ENV FLASK_RUN_PORT=9000
ENV FLASK_RUN_HOST=0.0.0.0
ENV FLASK_DEBUG=1
ENV HTTP_ALLOWED=1

ENTRYPOINT ["lando-cli"]
CMD ["run"]
Expand Down Expand Up @@ -48,6 +49,7 @@ RUN cd / && pip install --no-cache /app
ENV PYTHONPATH /app
RUN chown -R app:app /app

# Create repos directory for landing worker and revision worker.
RUN mkdir /repos
RUN chown -R app:app /repos

Expand Down
24 changes: 12 additions & 12 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,25 +131,24 @@ services:
- smtp
lando-api.landing-worker:
image: lando-api
command: ["landing-worker"]
command: ["start-landing-worker"]
environment:
- ENV=localdev
- DATABASE_URL=postgresql://postgres:[email protected]/lando_api_dev
- SENTRY_DSN=
# See http://docs.celeryproject.org/en/stable/getting-started/brokers/redis.html#configuration
# for the full URL format.
- CELERY_BROKER_URL=redis://redis.queue/0
- OIDC_IDENTIFIER=https://lando-api.test
- OIDC_DOMAIN=https://auth0.test
- LANDO_UI_URL=https://lando.test
- REPO_CLONES_PATH=/repos
- REPOS_TO_LAND=localdev
CELERY_BROKER_URL: "redis://redis.queue/0"
DATABASE_URL: "postgresql://postgres:[email protected]/lando_api_dev"
ENV: "localdev"
LANDO_UI_URL: "https://lando.test"
OIDC_DOMAIN: "https://auth0.test"
OIDC_IDENTIFIER: "https://lando-api.test"
REPOS_TO_LAND: "localdev"
REPO_CLONES_PATH: "/repos"
SENTRY_DSN: ""
user: root
volumes:
- ./:/app
- ./migrations/:/migrations/
# Prevent writing python cache to the host.
- caches_cache:/app/.cache/
- repos:/repos
depends_on:
- lando-api.db
- redis.queue
Expand Down Expand Up @@ -177,3 +176,4 @@ volumes:
caches_pycache:
caches_cache:
caches_pytest_cache:
repos:
13 changes: 13 additions & 0 deletions landoapi/api/revisions.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from landoapi.decorators import require_phabricator_api_key
from landoapi.models import SecApprovalRequest
from landoapi.phabricator import PhabricatorClient
from landoapi.models.revisions import Revision
from landoapi.projects import get_secure_project_phid
from landoapi.revisions import revision_is_secure
from landoapi.secapproval import send_sanitized_commit_message_for_review
Expand Down Expand Up @@ -88,3 +89,15 @@ def request_sec_approval(phab: PhabricatorClient, data: dict):
db.session.commit()

return {}, 200


def get_stack_hashes(revision_id: int) -> tuple:
"""
Given a revision, returns revision stack hashes.
A stack hash is used to detect a change in a revision.
"""
revision = Revision.query.filter(Revision.id == revision_id).one_or_none()
if revision:
return revision.stack_hashes, 200
return {}, 404
30 changes: 20 additions & 10 deletions landoapi/api/stacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from flask import current_app
from landoapi.commit_message import format_commit_message
from landoapi.decorators import require_phabricator_api_key
from landoapi.models.revisions import Revision
from landoapi.phabricator import PhabricatorClient
from landoapi.projects import (
get_release_managers,
Expand Down Expand Up @@ -116,19 +117,25 @@ def get(phab: PhabricatorClient, revision_id: str):
}

revisions_response = []
for revision_phid, revision in stack_data.revisions.items():
fields = PhabricatorClient.expect(revision, "fields")
for _phid, phab_revision in stack_data.revisions.items():
lando_revision = Revision.query.filter(
Revision.revision_id == phab_revision["id"]
).one_or_none()
revision_phid = PhabricatorClient.expect(phab_revision, "phid")
fields = PhabricatorClient.expect(phab_revision, "fields")
diff_phid = PhabricatorClient.expect(fields, "diffPHID")
repo_phid = PhabricatorClient.expect(fields, "repositoryPHID")
diff = stack_data.diffs[diff_phid]
human_revision_id = "D{}".format(PhabricatorClient.expect(revision, "id"))
human_revision_id = "D{}".format(PhabricatorClient.expect(phab_revision, "id"))
revision_url = urllib.parse.urljoin(
current_app.config["PHABRICATOR_URL"], human_revision_id
)
secure = revision_is_secure(revision, secure_project_phid)
commit_description = find_title_and_summary_for_display(phab, revision, secure)
bug_id = get_bugzilla_bug(revision)
reviewers = get_collated_reviewers(revision)
secure = revision_is_secure(phab_revision, secure_project_phid)
commit_description = find_title_and_summary_for_display(
phab, phab_revision, secure
)
bug_id = get_bugzilla_bug(phab_revision)
reviewers = get_collated_reviewers(phab_revision)
accepted_reviewers = reviewers_for_commit_message(
reviewers, users, projects, sec_approval_project_phid
)
Expand Down Expand Up @@ -163,16 +170,16 @@ def get(phab: PhabricatorClient, revision_id: str):
{
"id": human_revision_id,
"phid": revision_phid,
"status": serialize_status(revision),
"status": serialize_status(phab_revision),
"blocked_reason": blocked.get(revision_phid, ""),
"bug_id": bug_id,
"title": commit_description.title,
"url": revision_url,
"date_created": PhabricatorClient.to_datetime(
PhabricatorClient.expect(revision, "fields", "dateCreated")
PhabricatorClient.expect(phab_revision, "fields", "dateCreated")
).isoformat(),
"date_modified": PhabricatorClient.to_datetime(
PhabricatorClient.expect(revision, "fields", "dateModified")
PhabricatorClient.expect(phab_revision, "fields", "dateModified")
).isoformat(),
"summary": commit_description.summary,
"commit_message_title": commit_message_title,
Expand All @@ -183,6 +190,9 @@ def get(phab: PhabricatorClient, revision_id: str):
"reviewers": serialize_reviewers(reviewers, users, projects, diff_phid),
"is_secure": secure,
"is_using_secure_commit_message": commit_description.sanitized,
"lando_revision": lando_revision.serialize()
if lando_revision
else None,
}
)

Expand Down
4 changes: 1 addition & 3 deletions landoapi/api/transplants.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,9 @@ def get_list(phab: PhabricatorClient, stack_revision_id: str):
limit=len(revision_phids),
)

# Return both transplants and landing jobs, since for repos that were switched
# both or either of these could be populated.

rev_ids = [phab.expect(r, "id") for r in phab.expect(revs, "data")]

# Find landing jobs based on related revisions or legacy revision_to_diff_id field.
landing_jobs = LandingJob.revisions_query(rev_ids).all()
legacy_jobs = LandingJob.legacy_revisions_query(rev_ids).all()

Expand Down
8 changes: 3 additions & 5 deletions landoapi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ def load_config() -> dict[str, Any]:
}

config_keys = (
"AWS_ACCESS_KEY",
"AWS_SECRET_KEY",
"BUGZILLA_API_KEY",
"BUGZILLA_URL",
"CACHE_REDIS_DB",
Expand All @@ -82,15 +80,15 @@ def load_config() -> dict[str, Any]:
"MAIL_USERNAME",
"OIDC_DOMAIN",
"OIDC_IDENTIFIER",
"PATCH_BUCKET_NAME",
"PHABRICATOR_ADMIN_API_KEY",
"PHABRICATOR_UNPRIVILEGED_API_KEY",
"PHABRICATOR_URL",
"REPO_CLONES_PATH",
"PINGBACK_ENABLED",
"REPOS_TO_LAND",
"REPO_CLONES_PATH",
"SENTRY_DSN",
"TRANSPLANT_PASSWORD",
"TRANSPLANT_API_KEY",
"TRANSPLANT_PASSWORD",
"TRANSPLANT_URL",
"TRANSPLANT_USERNAME",
"TREESTATUS_URL",
Expand Down
10 changes: 6 additions & 4 deletions landoapi/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ class CacheSubsystem(Subsystem):

def init_app(self, app):
super().init_app(app)

host = self.flask_app.config.get("CACHE_REDIS_HOST")
if not host:
if self.flask_app.config.get("CACHE_DISABLED"):
# Default to not caching for testing.
logger.warning("Cache initialized in null mode, caching disabled.")
cache_config = {"CACHE_TYPE": "null", "CACHE_NO_NULL_WARNING": True}
logger.warning("Cache initialized in null mode.")
cache_config = {"CACHE_TYPE": "NullCache"}
elif not host:
logger.warning("Cache initialized in filesystem mode.")
cache_config = {"CACHE_TYPE": "FileSystemCache", "CACHE_DIR": "/tmp/cache"}
else:
cache_config = {"CACHE_TYPE": "redis", "CACHE_REDIS_HOST": host}
config_keys = ("CACHE_REDIS_PORT", "CACHE_REDIS_PASSWORD", "CACHE_REDIS_DB")
Expand Down
50 changes: 47 additions & 3 deletions landoapi/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,64 @@ def worker(celery_arguments):
celery.worker_main((sys.argv[0],) + celery_arguments)


@cli.command(name="landing-worker")
def landing_worker():
@cli.command(name="start-landing-worker")
def start_landing_worker():
from landoapi.app import auth0_subsystem, lando_ui_subsystem
from landoapi.workers.landing_worker import LandingWorker

exclusions = [auth0_subsystem, lando_ui_subsystem]
for system in get_subsystems(exclude=exclusions):
system.ensure_ready()

from landoapi.workers.landing_worker import LandingWorker
ConfigurationVariable.set(LandingWorker.STOP_KEY, VariableType.BOOL, "0")

worker = LandingWorker()
worker.start()


@cli.command(name="stop-landing-worker")
def stop_landing_worker():
from landoapi.workers.landing_worker import LandingWorker
from landoapi.storage import db_subsystem

db_subsystem.ensure_ready()
ConfigurationVariable.set(LandingWorker.STOP_KEY, VariableType.BOOL, "1")


@cli.command(name="start-revision-worker")
@click.argument("role")
def start_revision_worker(role):
from landoapi.app import auth0_subsystem, lando_ui_subsystem
from landoapi.workers.revision_worker import RevisionWorker, Supervisor, Processor

roles = {
"processor": Processor,
"supervisor": Supervisor,
}

if role not in roles:
raise ValueError(f"Unknown worker role specified ({role}).")

exclusions = [auth0_subsystem, lando_ui_subsystem]
for system in get_subsystems(exclude=exclusions):
system.ensure_ready()

ConfigurationVariable.set(RevisionWorker.STOP_KEY, VariableType.BOOL, "0")

worker = roles[role]()
worker.start()


@cli.command(name="stop-revision-worker")
def stop_revision_worker():
"""Stops all revision workers (supervisor and processors)."""
from landoapi.workers.revision_worker import RevisionWorker
from landoapi.storage import db_subsystem

db_subsystem.ensure_ready()
RevisionWorker.stop()


@cli.command(name="run-pre-deploy-sequence")
def run_pre_deploy_sequence():
"""Runs the sequence of commands required before a deployment."""
Expand Down
5 changes: 4 additions & 1 deletion landoapi/commit_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
IRC_NICK = r"[a-zA-Z0-9\-\_.]*[a-zA-Z0-9\-\_]+"

# fmt: off
REVIEWERS_RE = re.compile( # noqa: E131
REVIEWERS_RE = re.compile(
r"([\s\(\.\[;,])" # before "r" delimiter
+ r"(" + SPECIFIER + r")" # flag
+ r"(" # capture all reviewers
Expand Down Expand Up @@ -209,3 +209,6 @@ def bug_list_to_commit_string(bug_ids: Iterable[str]) -> str:
return "No bug"

return f"Bug {', '.join(sorted(set(bug_ids)))}"


# flake8: noqa: E131
12 changes: 11 additions & 1 deletion landoapi/hg.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import copy
import configparser
import copy
import logging
import os
import shlex
Expand Down Expand Up @@ -650,3 +650,13 @@ def read_checkout_file(self, path: str) -> str:

with checkout_file_path.open() as f:
return f.read()

def has_incoming(self, source: str) -> bool:
"""Check if there are any incoming changes from the remote repo."""
try:
self.run_hg(["incoming", source, "--limit", "1"])
except hglib.error.CommandError as e:
if b"no changes found" not in e.out:
logger.error(e)
return False
return True
4 changes: 2 additions & 2 deletions landoapi/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from landoapi.models.landing_job import LandingJob
from landoapi.models.revisions import Revision
from landoapi.models.secapproval import SecApprovalRequest
from landoapi.models.transplant import Transplant
from landoapi.models.configuration import ConfigurationVariable
from landoapi.models.revisions import DiffWarning
from landoapi.models.revisions import DiffWarning, Revision

__all__ = [
"LandingJob",
Expand All @@ -16,4 +15,5 @@
"Transplant",
"ConfigurationVariable",
"DiffWarning",
"Revision",
]
3 changes: 3 additions & 0 deletions landoapi/models/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ class ConfigurationKey(enum.Enum):

LANDING_WORKER_PAUSED = "LANDING_WORKER_PAUSED"
LANDING_WORKER_STOPPED = "LANDING_WORKER_STOPPED"
REVISION_WORKER_PAUSED = "REVISION_WORKER_PAUSED"
REVISION_WORKER_STOPPED = "REVISION_WORKER_STOPPED"
REVISION_WORKER_CAPACITY = "REVISION_WORKER_CAPACITY"
API_IN_MAINTENANCE = "API_IN_MAINTENANCE"
WORKER_THROTTLE_SECONDS = "WORKER_THROTTLE_SECONDS"

Expand Down
Loading

0 comments on commit fcfd98b

Please sign in to comment.