Skip to content

Commit 0f28ef0

Browse files
committed
redis lua
1 parent 7f35a86 commit 0f28ef0

File tree

10 files changed

+1188
-0
lines changed

10 files changed

+1188
-0
lines changed

example/c#/Client.cs

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Collections.Generic;
4+
using System.Net;
5+
using System.Net.Sockets;
6+
using System.Threading;
7+
8+
namespace net
9+
{
10+
class Program
11+
{
12+
struct Message
13+
{
14+
public UInt16 cmd;
15+
public String data;
16+
public Socket client;
17+
}
18+
19+
private delegate bool MessageCallback(Socket client, UInt16 cmd,String data);
20+
private static Dictionary<int, MessageCallback> dict = new Dictionary<int, MessageCallback>();
21+
22+
private static int BUFFER = 65536;
23+
private static byte[] buffer = new byte[BUFFER];
24+
private static int writeIndex = 0;
25+
private static ConcurrentQueue<Message> messageQueue = new ConcurrentQueue<Message>();
26+
27+
private static void ConnectCallback(Socket clientSocket)
28+
{
29+
Console.WriteLine("server connect ");
30+
}
31+
private static void DisconnectCallback(Socket clientSocket)
32+
{
33+
Console.WriteLine("server close");
34+
clientSocket.Shutdown(SocketShutdown.Both);
35+
clientSocket.Close();
36+
}
37+
private static void ReceiveCallback(Socket clientSocket,int bytesRead)
38+
{
39+
try
40+
{
41+
int readIndex = 0;
42+
writeIndex += bytesRead;
43+
while (bytesRead - readIndex >= sizeof(UInt16) * 3)
44+
{
45+
UInt16 len = BitConverter.ToUInt16(buffer, readIndex);
46+
if (bytesRead - readIndex - (sizeof(UInt16) * 3) < len)
47+
{
48+
Array.Copy(buffer, readIndex, buffer, 0,writeIndex);
49+
break;
50+
}
51+
52+
UInt16 flag = BitConverter.ToUInt16(buffer, readIndex + sizeof(UInt16));
53+
UInt16 cmd = BitConverter.ToUInt16(buffer, readIndex + sizeof(UInt16) * 2);
54+
55+
string data = System.Text.Encoding.Default.GetString(buffer, readIndex + sizeof(UInt16) * 3, len);
56+
Message msg;
57+
msg.client = clientSocket;
58+
msg.cmd = cmd;
59+
msg.data = data;
60+
messageQueue.Enqueue(msg);
61+
62+
writeIndex -= sizeof(UInt16) * 3 + len;
63+
readIndex += sizeof(UInt16) * 3 + len;
64+
}
65+
}
66+
catch (Exception e)
67+
{
68+
Console.WriteLine(e.ToString());
69+
}
70+
}
71+
72+
private static void EncodeTo(Socket client, UInt16 id, String data)
73+
{
74+
int writeIndex = 0;
75+
UInt16 len = (UInt16)(data.Length + sizeof(UInt16) * 3);
76+
UInt16 flag = 0;
77+
UInt16 cmd = id;
78+
79+
byte[] data1 = BitConverter.GetBytes(data.Length);
80+
byte[] data2 = BitConverter.GetBytes(flag);
81+
byte[] data3 = BitConverter.GetBytes(cmd);
82+
byte[] data4 = System.Text.Encoding.UTF8.GetBytes(data);
83+
84+
byte[] buffer = new byte[len];
85+
86+
data1.CopyTo(buffer, writeIndex);
87+
data2.CopyTo(buffer, writeIndex + sizeof(UInt16));
88+
data3.CopyTo(buffer, writeIndex + sizeof(UInt16) * 2);
89+
data4.CopyTo(buffer, writeIndex + sizeof(UInt16) * 3);
90+
client.Send(buffer, buffer.Length, 0);
91+
}
92+
93+
private static bool OnLogin(Socket client, UInt16 id, String data)
94+
{
95+
return true;
96+
}
97+
98+
private static void RegisterMsg()
99+
{
100+
dict.Add(1, OnLogin);
101+
}
102+
103+
public static void DoLogicThread()
104+
{
105+
while(true)
106+
{
107+
Message msg;
108+
while (messageQueue.TryDequeue(out msg))
109+
{
110+
MessageCallback callback;
111+
dict.TryGetValue(msg.cmd,out callback);
112+
callback(msg.client,msg.cmd,msg.data);
113+
}
114+
Thread.Sleep(10);
115+
}
116+
}
117+
118+
static void Main(string[] args)
119+
{
120+
IPAddress ip = IPAddress.Parse("10.128.2.117");
121+
Socket clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
122+
try
123+
{
124+
clientSocket.Connect(new IPEndPoint(ip, 8010));
125+
ConnectCallback(clientSocket);
126+
}
127+
catch (Exception e)
128+
{
129+
Console.WriteLine(e.ToString());
130+
return;
131+
}
132+
133+
RegisterMsg();
134+
135+
Thread t1 = new Thread(new ThreadStart(DoLogicThread));
136+
UInt16 cmd = 2;
137+
EncodeTo(clientSocket, cmd, "test");
138+
139+
while (true)
140+
{
141+
try
142+
{
143+
int len = clientSocket.Receive(buffer, writeIndex, BUFFER - writeIndex, 0);
144+
if (len <=0)
145+
{
146+
DisconnectCallback(clientSocket);
147+
break;
148+
}
149+
ReceiveCallback(clientSocket, len);
150+
}
151+
catch (Exception e)
152+
{
153+
Console.WriteLine(e.ToString());
154+
DisconnectCallback(clientSocket);
155+
break;
156+
}
157+
}
158+
}
159+
}
160+
}
161+

example/c#/Server.cs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
5+
namespace Server
6+
{
7+
class Program
8+
{
9+
private static async Task onCron(UInt16 id, TcpSession session, byte[] buffer)
10+
{
11+
String buf = System.Text.Encoding.Default.GetString(buffer);
12+
await session.send(buf, id);
13+
}
14+
private static void disConnectCallback(TcpSession session)
15+
{
16+
System.Console.WriteLine("my close : {0}", session.ID);
17+
}
18+
19+
private static void connectCallback(TcpSession session)
20+
{
21+
System.Console.WriteLine("connect close : {0}", session.ID);
22+
}
23+
24+
static void Main(string[] args)
25+
{
26+
var service = new TcpService();
27+
service.registerMsg(1, onCron);
28+
service.startListen("127.0.0.1", 9999, connectCallback, disConnectCallback);
29+
30+
while (true)
31+
{
32+
Thread.Sleep(1000);
33+
}
34+
}
35+
}
36+
}

example/c#/TcpService.cs

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using System.Net.Sockets;
5+
using System.Net;
6+
using System.Threading.Tasks;
7+
using System.Threading;
8+
9+
namespace Server
10+
{
11+
class TcpService
12+
{
13+
public delegate Task MsgCallback(UInt16 id, TcpSession client, byte[] buffer);
14+
private Dictionary<UInt16, MsgCallback> handlers = new Dictionary<UInt16, MsgCallback>();
15+
private Dictionary<long, TcpSession> sessions = new Dictionary<long, TcpSession>();
16+
private ReaderWriterLockSlim sessionLocks = new ReaderWriterLockSlim();
17+
private int nextId = 0;
18+
private List<TcpListener> listeners = new List<TcpListener>();
19+
private List<Task> listenerTasks = new List<Task>();
20+
private ReaderWriterLockSlim listenerLocks = new ReaderWriterLockSlim();
21+
public void registerMsg(UInt16 id, MsgCallback callback)
22+
{
23+
handlers.Add(id, callback);
24+
}
25+
public TcpSession findSession(long id)
26+
{
27+
TcpSession ret = null;
28+
sessionLocks.EnterReadLock();
29+
sessions.TryGetValue(id, out ret);
30+
sessionLocks.ExitReadLock();
31+
return ret;
32+
}
33+
34+
public void removeSession(TcpSession session)
35+
{
36+
bool success = false;
37+
var disConnnectCallback = session.disCallback;
38+
39+
sessionLocks.EnterWriteLock();
40+
success = sessions.Remove(session.ID);
41+
sessionLocks.ExitWriteLock();
42+
43+
if (success && disConnnectCallback != null)
44+
{
45+
disConnnectCallback(session);
46+
}
47+
}
48+
49+
public void waitCloseAll()
50+
{
51+
sessionLocks.EnterWriteLock();
52+
53+
foreach (var listener in listeners)
54+
{
55+
listener.Stop();
56+
}
57+
58+
foreach (var listenerTask in listenerTasks)
59+
{
60+
try
61+
{
62+
listenerTask.Wait();
63+
}
64+
catch (AggregateException)
65+
{
66+
67+
}
68+
}
69+
70+
foreach (var session in sessions)
71+
{
72+
session.Value.close();
73+
session.Value.wait();
74+
}
75+
76+
sessions.Clear();
77+
sessionLocks.ExitWriteLock();
78+
}
79+
public async Task processPacket(TcpSession session, byte[] buffer, UInt16 id)
80+
{
81+
MsgCallback callback;
82+
if (handlers.TryGetValue(id, out callback))
83+
{
84+
await callback(id, session, buffer);
85+
}
86+
}
87+
public void startListen(string ip, int port, Action<TcpSession> enterCallback, Action<TcpSession> disconnectCallback)
88+
{
89+
Console.WriteLine("listen {0}:{1}", ip, port);
90+
var server = new TcpListener(IPAddress.Parse(ip), port);
91+
server.Start();
92+
93+
listenerLocks.EnterWriteLock();
94+
listeners.Add(server);
95+
listenerTasks.Add(Task.Run(async () =>
96+
{
97+
while (true)
98+
{
99+
var client = await server.AcceptTcpClientAsync();
100+
if (client != null)
101+
{
102+
newSessionTask(client, enterCallback, disconnectCallback);
103+
}
104+
else
105+
{
106+
break;
107+
}
108+
}
109+
}));
110+
listenerLocks.ExitWriteLock();
111+
}
112+
private void newSessionTask(TcpClient client, Action<TcpSession> enterCallback, Action<TcpSession> disconnectCallback)
113+
{
114+
TcpSession session = null;
115+
sessionLocks.EnterWriteLock();
116+
nextId++;
117+
long id = (long)nextId << 32;
118+
id |= (long)dateTimeToUnixTimestamp(DateTime.Now);
119+
session = new TcpSession(this, client, id);
120+
sessions[id] = session;
121+
sessionLocks.ExitWriteLock();
122+
123+
session.disCallback = disconnectCallback;
124+
session.run();
125+
126+
if (enterCallback != null)
127+
{
128+
enterCallback(session);
129+
}
130+
}
131+
public static int dateTimeToUnixTimestamp(DateTime dateTime)
132+
{
133+
var start = new DateTime(1970, 1, 1, 0, 0, 0, dateTime.Kind);
134+
return Convert.ToInt32((dateTime - start).TotalSeconds);
135+
}
136+
}
137+
}

0 commit comments

Comments
 (0)