Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send high priority, transactional messages before bulk messages #25

Merged
merged 11 commits into from
Nov 14, 2024
4 changes: 4 additions & 0 deletions src/smpp_gateway/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(
backend: Backend,
hc_worker: HealthchecksIoWorker,
submit_sm_params: dict,
set_priority_flag: bool,
mt_messages_per_second: int,
*args,
**kwargs,
Expand All @@ -78,6 +79,7 @@ def __init__(
self.backend = backend
self.hc_worker = hc_worker
self.submit_sm_params = submit_sm_params
self.set_priority_flag = set_priority_flag
self.mt_messages_per_second = mt_messages_per_second
super().__init__(*args, **kwargs)
self._pg_conn = pg_listen(self.backend.name)
Expand Down Expand Up @@ -182,6 +184,8 @@ def send_mt_messages(self):
submit_sm_resps = []
for sms in smses:
params = {**self.submit_sm_params, **sms["params"]}
if self.set_priority_flag and sms["priority_flag"] is not None:
params["priority_flag"] = sms["priority_flag"]
pdus = self.split_and_send_message(sms["short_message"], **params)
# Create placeholder MTMessageStatus objects in the DB, which
# the message_sent handler will later update with the actual command_status
Expand Down
10 changes: 10 additions & 0 deletions src/smpp_gateway/management/commands/smpp_client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import argparse
import os

from django.core.management.base import BaseCommand
Expand Down Expand Up @@ -85,6 +86,15 @@ def add_arguments(self, parser):
help="Pings healthchecks.io with the specified ping key and check slug. "
"If set, --hc-ping-key must also be set.",
)
parser.add_argument(
"--set-priority-flag",
action=argparse.BooleanOptionalAction,
default=False,
help="Whether to set the `priority_flag` param in the PDU, if one "
"is provided for a message. If a priority_flag is included in "
"--submit-sm-params, the priority_flag set on the individual "
"message will take precedence.",
)

def handle(self, *args, **options):
start_smpp_client(options)
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Generated by Django 4.2.16 on 2024-11-14 09:35

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("smpp_gateway", "0007_momessage_error_alter_momessage_status"),
]

operations = [
migrations.RemoveIndex(
model_name="mtmessage",
name="mt_message_status_idx",
),
migrations.AddField(
model_name="mtmessage",
name="priority_flag",
field=models.IntegerField(
choices=[
(0, "Level 0 (lowest) priority"),
(1, "Level 1 priority"),
(2, "Level 2 priority"),
(3, "Level 3 (highest) priority"),
],
null=True,
verbose_name="priority flag",
),
),
migrations.AddIndex(
model_name="mtmessage",
index=models.Index(
models.F("status"),
models.OrderBy(
models.F("priority_flag"), descending=True, nulls_last=True
),
condition=models.Q(("status", "new")),
name="mt_message_status_idx",
),
),
]
14 changes: 13 additions & 1 deletion src/smpp_gateway/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,24 @@ class Status(models.TextChoices):
DELIVERED = "delivered", _("Delivered")
ERROR = "error", _("Error")

class PriorityFlag(models.IntegerChoices):
# Based on the priority_flag values in the SMPP Spec
# https://smpp.org/SMPP_v3_4_Issue1_2.pdf
LEVEL_0 = 0, _("Level 0 (lowest) priority")
LEVEL_1 = 1, _("Level 1 priority")
LEVEL_2 = 2, _("Level 2 priority")
LEVEL_3 = 3, _("Level 3 (highest) priority")

backend = models.ForeignKey(
Backend, on_delete=models.PROTECT, verbose_name=_("backend")
)
# SMPP client will decide how to encode it
short_message = models.TextField(_("short message"))
params = models.JSONField(_("params"))
status = models.CharField(_("status"), max_length=32, choices=Status.choices)
priority_flag = models.IntegerField(
_("priority flag"), choices=PriorityFlag.choices, null=True
)

def save(self, *args, **kwargs):
super().save(*args, **kwargs)
Expand All @@ -106,7 +117,8 @@ class Meta:
indexes = (
models.Index(
# Allow for quick filtering of messages that need to be processed
fields=["status"],
"status",
models.F("priority_flag").desc(nulls_last=True),
name="mt_message_status_idx",
condition=models.Q(status="new"), # No way to access Status.NEW here?
),
Expand Down
1 change: 1 addition & 0 deletions src/smpp_gateway/outgoing.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def prepare_request(self, id_, text, identities, context):
"short_message": text,
"params": params,
"status": MTMessage.Status.NEW,
"priority_flag": context.get("priority_flag"),
}

def send(self, id_, text, identities, context=None):
Expand Down
8 changes: 5 additions & 3 deletions src/smpp_gateway/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import psycopg2.extensions

from django.db import connection, transaction
from django.db.models import QuerySet
from django.db.models import F, QuerySet
from rapidsms.models import Backend

from smpp_gateway.models import MOMessage, MTMessage
Expand Down Expand Up @@ -40,13 +40,15 @@ def pg_notify(channel: str):

def get_mt_messages_to_send(limit: int, backend: Backend) -> list[dict[str, Any]]:
"""Fetches up to `limit` messages intended for `backend`, updates their
status to SENDING, and returns select fields from the model.
status to SENDING, and returns select fields from the model. The messages
are sorted by descending `priority_flag`.
"""
with transaction.atomic():
smses = list(
MTMessage.objects.filter(status=MTMessage.Status.NEW, backend=backend)
.select_for_update(skip_locked=True)
.values("id", "short_message", "params")[:limit]
.order_by(F("priority_flag").desc(nulls_last=True))
.values("id", "short_message", "params", "priority_flag")[:limit]
)
if smses:
pks = [sms["id"] for sms in smses]
Expand Down
46 changes: 46 additions & 0 deletions src/smpp_gateway/router.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from rapidsms.messages.incoming import IncomingMessage
from rapidsms.messages.outgoing import OutgoingMessage
from rapidsms.router.blocking import BlockingRouter

from smpp_gateway.models import MTMessage


class PriorityIncomingMessage(IncomingMessage):
default_priority_flag = MTMessage.PriorityFlag.LEVEL_2

def respond(self, text, **kwargs):
fields = kwargs.get("fields", {})
if "priority_flag" not in fields:
fields["priority_flag"] = self.default_priority_flag.value
kwargs["fields"] = fields
return super().respond(text, **kwargs)


class PriorityOutgoingMessage(OutgoingMessage):
default_priority_flag = MTMessage.PriorityFlag.LEVEL_1

def extra_backend_context(self):
context = super().extra_backend_context()
tobiasmcnulty marked this conversation as resolved.
Show resolved Hide resolved
context["priority_flag"] = self.fields.get(
"priority_flag", self.default_priority_flag.value
)
return context


class PriorityBlockingRouter(BlockingRouter):
tobiasmcnulty marked this conversation as resolved.
Show resolved Hide resolved
incoming_message_class = PriorityIncomingMessage
outgoing_message_class = PriorityOutgoingMessage

def new_incoming_message(
self, text, connections, class_=incoming_message_class, **kwargs
):
return super().new_incoming_message(
text, connections, class_=PriorityIncomingMessage, **kwargs
)
simonkagwi marked this conversation as resolved.
Show resolved Hide resolved

def new_outgoing_message(
self, text, connections, class_=outgoing_message_class, **kwargs
):
return super().new_incoming_message(
text, connections, class_=PriorityOutgoingMessage, **kwargs
)
3 changes: 3 additions & 0 deletions src/smpp_gateway/smpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def get_smpplib_client(
notify_mo_channel: str,
backend: Backend,
submit_sm_params: dict,
set_priority_flag: bool,
mt_messages_per_second: int,
hc_check_uuid: str,
hc_ping_key: str,
Expand All @@ -35,6 +36,7 @@ def get_smpplib_client(
backend,
hc_worker,
submit_sm_params,
set_priority_flag,
mt_messages_per_second,
host,
port,
Expand Down Expand Up @@ -69,6 +71,7 @@ def start_smpp_client(options):
options["notify_mo_channel"],
backend,
json.loads(options["submit_sm_params"]),
options["set_priority_flag"],
options["mt_messages_per_second"],
options["hc_check_uuid"],
options["hc_ping_key"],
Expand Down
10 changes: 9 additions & 1 deletion tests/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from django.utils.timezone import now
from factory.django import DjangoModelFactory
from faker import Faker
from rapidsms.models import Backend
from rapidsms.models import Backend, Connection

from smpp_gateway.models import MOMessage, MTMessage, MTMessageStatus

Expand Down Expand Up @@ -59,3 +59,11 @@ class Meta:
command_status = smpplib.consts.SMPP_ESME_ROK
message_id = ""
delivery_report = b""


class ConnectionFactory(DjangoModelFactory):
class Meta:
model = Connection

backend = factory.SubFactory(BackendFactory)
identity = factory.Faker("word")
103 changes: 102 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from unittest import mock

import pytest

from smpplib import consts as smpplib_consts
from smpplib.command import DeliverSM, SubmitSMResp

from smpp_gateway.models import MOMessage, MTMessage
from smpp_gateway.queries import pg_listen
from smpp_gateway.smpp import get_smpplib_client
from smpp_gateway.smpp import PgSmppClient, get_smpplib_client
from tests.factories import BackendFactory, MTMessageFactory, MTMessageStatusFactory


Expand All @@ -22,6 +24,7 @@ def test_received_mo_message(self):
"notify_mo_channel",
backend,
{}, # submit_sm_params
False, # set_priority_flag
20, # mt_messages_per_second
"", # hc_check_uuid
"", # hc_ping_key
Expand Down Expand Up @@ -58,6 +61,7 @@ def test_received_message_receipt(self):
"notify_mo_channel",
backend,
{}, # submit_sm_params
False, # set_priority_flag
20, # mt_messages_per_second
"", # hc_check_uuid
"", # hc_ping_key
Expand Down Expand Up @@ -100,6 +104,7 @@ def test_received_null_short_message(self):
"notify_mo_channel",
backend,
{}, # submit_sm_params
False, # set_priority_flag
20, # mt_messages_per_second
"", # hc_check_uuid
"", # hc_ping_key
Expand Down Expand Up @@ -138,6 +143,7 @@ def test_message_sent_handler():
"notify_mo_channel",
backend,
{}, # submit_sm_params
False, # set_priority_flag
20, # mt_messages_per_second
"", # hc_check_uuid
"", # hc_ping_key
Expand All @@ -157,3 +163,98 @@ def test_message_sent_handler():

assert outbound_msg_status.command_status == smpplib_consts.SMPP_ESME_RSUBMITFAIL
assert outbound_msg_status.message_id == "qwerty"


@pytest.mark.django_db(transaction=True)
@mock.patch.object(PgSmppClient, "send_message", return_value=mock.Mock(sequence=1))
class TestSetPriorityFlag:
def get_client_and_message(
self,
submit_sm_params=None,
set_priority_flag=True,
message_priority_flag=MTMessage.PriorityFlag.LEVEL_1,
):
backend = BackendFactory()
client = get_smpplib_client(
"127.0.0.1",
8000,
"notify_mo_channel",
backend,
submit_sm_params or {},
set_priority_flag,
20, # mt_messages_per_second
"", # hc_check_uuid
"", # hc_ping_key
"", # hc_check_slug
)
message = MTMessageFactory(
status=MTMessage.Status.NEW,
backend=backend,
priority_flag=message_priority_flag,
)
return client, message

def test_set_priority_flag_is_true(self, mock_send_message):
"""If set_priority_flag is True and the priority_flag is set on a MTMessage
object, the priority_flag param should be set in the PDU.
"""
client, message = self.get_client_and_message()
client.receive_pg_notify()

mock_send_message.assert_called_once()
assert (
mock_send_message.call_args.kwargs["priority_flag"] == message.priority_flag
)

def test_set_priority_flag_is_true_and_priority_in_submit_sm_params(
self, mock_send_message
):
"""If set_priority_flag is True and the priority_flag is set on a MTMessage
object and also in the submit_sm_params dictionary, the priority_flag from
the message object should take precendence.
"""
client, message = self.get_client_and_message(
{"priority_flag": MTMessage.PriorityFlag.LEVEL_0}
)
client.receive_pg_notify()

mock_send_message.assert_called_once()
assert (
mock_send_message.call_args.kwargs["priority_flag"] == message.priority_flag
)

def test_set_priority_flag_is_true_but_priority_not_set(self, mock_send_message):
"""If set_priority_flag is True and but the priority_flag is not set on a
MTMessage object, the priority_flag param should NOT be set in the PDU.
"""
client = self.get_client_and_message(message_priority_flag=None)[0]
client.receive_pg_notify()

mock_send_message.assert_called_once()
assert "priority_flag" not in mock_send_message.call_args.kwargs

def test_set_priority_flag_is_false(self, mock_send_message):
"""If set_priority_flag is False and the priority_flag is set on a
MTMessage object, the priority_flag param should NOT be set in the PDU.
"""
client = self.get_client_and_message(set_priority_flag=False)[0]
client.receive_pg_notify()

mock_send_message.assert_called_once()
assert "priority_flag" not in mock_send_message.call_args.kwargs

def test_set_priority_flag_is_false_but_priority_in_submit_sm_params(
self, mock_send_message
):
"""If set_priority_flag is False and but a priority_flag was set in the
submit_sm_params dictionary, the priority_flag from submit_sm_params
should still be set in the PDU.
"""
priority = MTMessage.PriorityFlag.LEVEL_0
client, message = self.get_client_and_message(
{"priority_flag": priority}, False
)
client.receive_pg_notify()

mock_send_message.assert_called_once()
assert mock_send_message.call_args.kwargs["priority_flag"] == priority
Loading
Loading