Skip to content

Commit fa8f3ba

Browse files
committed
Implement plugin
1 parent 69e8e2a commit fa8f3ba

File tree

2 files changed

+194
-0
lines changed

2 files changed

+194
-0
lines changed

__init__.py

Whitespace-only changes.

main.py

+194
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
import sys
2+
import asyncio
3+
import typing
4+
from plugin.v1.plugin_grpc import GatewayDPluginServiceBase
5+
from google.protobuf.struct_pb2 import ListValue, Struct, Value
6+
from google.protobuf.json_format import MessageToDict
7+
import grpclib
8+
from grpclib.server import Server, Stream
9+
from grpclib.reflection.service import ServerReflection
10+
from grpclib.health.service import Health
11+
from grpclib.health.check import ServiceStatus
12+
import logging
13+
14+
logging.basicConfig(filename="plugin.log", level=logging.INFO)
15+
16+
17+
async def defaults(stream: Stream[Struct, Struct]) -> None:
18+
"""Default handler for all hooks.
19+
20+
Note:
21+
The GatewayDPluginServiceBase class implements all hooks as abstract
22+
methods, so we need to implement them all. This is a default handler
23+
to use for all hooks that we don't need to implement.
24+
"""
25+
req = await stream.recv_message()
26+
if req:
27+
await stream.send_message(req)
28+
else:
29+
await stream.send_message(Struct())
30+
31+
32+
class Plugin(GatewayDPluginServiceBase):
33+
async def GetPluginConfig(self, stream: Stream[Struct, Struct]) -> None:
34+
# Ignore the request, as it is empty.
35+
await stream.recv_message()
36+
await stream.send_message(
37+
Struct(
38+
fields={
39+
"id": Value(
40+
struct_value=Struct(
41+
fields={
42+
"name": Value(string_value="plugin-template-python"),
43+
"version": Value(string_value="0.1.0"),
44+
"remoteUrl": Value(
45+
string_value="github.com/gatewayd-io/plugin-template-python"
46+
),
47+
}
48+
)
49+
),
50+
"hooks": Value(
51+
list_value=ListValue(
52+
values=[
53+
# The list of hooks that the plugin implements.
54+
Value(number_value=16), # OnTrafficFromClient
55+
]
56+
)
57+
),
58+
}
59+
)
60+
)
61+
62+
async def OnBooted(self, stream: Stream[Struct, Struct]) -> None:
63+
await defaults(stream)
64+
65+
async def OnBooting(self, stream: Stream[Struct, Struct]) -> None:
66+
await defaults(stream)
67+
68+
async def OnClosed(self, stream: Stream[Struct, Struct]) -> None:
69+
await defaults(stream)
70+
71+
async def OnClosing(self, stream: Stream[Struct, Struct]) -> None:
72+
await defaults(stream)
73+
74+
async def OnConfigLoaded(self, stream: Stream[Struct, Struct]) -> None:
75+
await defaults(stream)
76+
77+
async def OnHook(self, stream: Stream[Struct, Struct]) -> None:
78+
await defaults(stream)
79+
80+
async def OnNewClient(self, stream: Stream[Struct, Struct]) -> None:
81+
await defaults(stream)
82+
83+
async def OnNewLogger(self, stream: Stream[Struct, Struct]) -> None:
84+
await defaults(stream)
85+
86+
async def OnNewPool(self, stream: Stream[Struct, Struct]) -> None:
87+
await defaults(stream)
88+
89+
async def OnNewProxy(self, stream: Stream[Struct, Struct]) -> None:
90+
await defaults(stream)
91+
92+
async def OnNewServer(self, stream: Stream[Struct, Struct]) -> None:
93+
await defaults(stream)
94+
95+
async def OnOpened(self, stream: Stream[Struct, Struct]) -> None:
96+
await defaults(stream)
97+
98+
async def OnOpening(self, stream: Stream[Struct, Struct]) -> None:
99+
await defaults(stream)
100+
101+
async def OnRun(self, stream: Stream[Struct, Struct]) -> None:
102+
await defaults(stream)
103+
104+
async def OnShutdown(self, stream: Stream[Struct, Struct]) -> None:
105+
await defaults(stream)
106+
107+
async def OnSignal(self, stream: Stream[Struct, Struct]) -> None:
108+
await defaults(stream)
109+
110+
async def OnTick(self, stream: Stream[Struct, Struct]) -> None:
111+
await defaults(stream)
112+
113+
async def OnTraffic(self, stream: Stream[Struct, Struct]) -> None:
114+
await defaults(stream)
115+
116+
async def OnTrafficFromClient(self, stream: Stream[Struct, Struct]) -> None:
117+
"""
118+
This is an example of how to use the OnTrafficFromClient hook to
119+
intercept traffic from the client and modify it before it is sent to
120+
the server. In this example, we simply log the request and send it
121+
to the server.
122+
"""
123+
req = await stream.recv_message()
124+
if req:
125+
logging.info(MessageToDict(req))
126+
await stream.send_message(req)
127+
else:
128+
await stream.send_message(Struct())
129+
130+
async def OnTrafficFromServer(self, stream: Stream[Struct, Struct]) -> None:
131+
await defaults(stream)
132+
133+
async def OnTrafficToClient(self, stream: Stream[Struct, Struct]) -> None:
134+
await defaults(stream)
135+
136+
async def OnTrafficToServer(self, stream: Stream[Struct, Struct]) -> None:
137+
await defaults(stream)
138+
139+
def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]:
140+
"""Mapping of service methods to handlers.
141+
142+
Note:
143+
This method is used to remap the service name to the plugin name,
144+
so that the plugin health check works properly.
145+
"""
146+
# Get the mapping from the base class
147+
mapping = super().__mapping__()
148+
remapping = {}
149+
150+
# Replace the service name with the plugin name
151+
for path in mapping:
152+
repath = path.replace("plugin.v1.GatewayDPluginService", "plugin")
153+
remapping[repath] = mapping[path]
154+
155+
# Merge the two mappings
156+
remapping.update(mapping)
157+
158+
return remapping
159+
160+
161+
async def ping() -> bool:
162+
return True
163+
164+
165+
async def serve() -> None:
166+
# Instantiate the plugin.
167+
plugin = Plugin()
168+
169+
# Create a health check for the plugin.
170+
plugin_health = ServiceStatus()
171+
plugin_health.set(True)
172+
health = Health({plugin: [plugin_health]})
173+
174+
# Add reflection for the plugin and health check.
175+
services = ServerReflection.extend([plugin, health])
176+
177+
# Instantiate the server.
178+
server = Server(services)
179+
180+
# Start the server.
181+
await server.start("127.0.0.1", 12345)
182+
await server.wait_closed()
183+
184+
185+
if __name__ == "__main__":
186+
loop = asyncio.new_event_loop()
187+
asyncio.set_event_loop(loop)
188+
try:
189+
# This is a special message that tells gatewayd to start the plugin.
190+
print("1|0|tcp|127.0.0.1:12345|grpc")
191+
sys.stdout.flush()
192+
asyncio.run(serve())
193+
except KeyboardInterrupt:
194+
pass

0 commit comments

Comments
 (0)