Skip to content

Commit

Permalink
Merge pull request cloudwu#212 from cloudwu/dev
Browse files Browse the repository at this point in the history
release 0.9.2
  • Loading branch information
cloudwu committed Dec 8, 2014
2 parents 463a789 + 24a2df9 commit f15cd1d
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 68 deletions.
8 changes: 7 additions & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
v0.9.0 (2014-11-17)
v0.9.2 (2014-12-8)
-----------
* Simplify the message queue
* Add create_index in mongo driver
* Fix a bug in big-endian architecture (sproto)

v0.9.0 / v0.9.1 (2014-11-17)
-----------
* Add UDP support
* Add IPv6 support
Expand Down
1 change: 1 addition & 0 deletions lualib-src/lua-socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <lua.h>
#include <lauxlib.h>

#include <sys/socket.h>
#include <arpa/inet.h>

#include "skynet_socket.h"
Expand Down
10 changes: 5 additions & 5 deletions lualib-src/sproto/lsproto.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ struct encode_ud {
int deep;
};

static int
static int
encode(void *ud, const char *tagname, int type, int index, struct sproto_type *st, void *value, int length) {
struct encode_ud *self = ud;
lua_State *L = self->L;
Expand Down Expand Up @@ -189,7 +189,7 @@ expand_buffer(lua_State *L, int osz, int nsz) {
lightuserdata sproto_type
table source
return string
return string
*/
static int
lencode(lua_State *L) {
Expand Down Expand Up @@ -229,7 +229,7 @@ struct decode_ud {
int deep;
};

static int
static int
decode(void *ud, const char *tagname, int type, int index, struct sproto_type *st, void *value, int length) {
struct decode_ud * self = ud;
lua_State *L = self->L;
Expand All @@ -252,12 +252,12 @@ decode(void *ud, const char *tagname, int type, int index, struct sproto_type *s
switch (type) {
case SPROTO_TINTEGER: {
// notice: in lua 5.2, 52bit integer support (not 64)
lua_Integer v = *(lua_Integer *)value;
lua_Integer v = *(uint64_t*)value;
lua_pushinteger(L, v);
break;
}
case SPROTO_TBOOLEAN: {
int v = *(lua_Integer*)value;
int v = *(uint64_t*)value;
lua_pushboolean(L,v);
break;
}
Expand Down
20 changes: 10 additions & 10 deletions lualib-src/sproto/sproto.c
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ sproto_dump(struct sproto *s) {
}

// query
int
int
sproto_prototag(struct sproto *sp, const char * name) {
int i;
for (i=0;i<sp->protocol_n;i++) {
Expand Down Expand Up @@ -529,7 +529,7 @@ query_proto(struct sproto *sp, int tag) {
return NULL;
}

struct sproto_type *
struct sproto_type *
sproto_protoquery(struct sproto *sp, int proto, int what) {
struct protocol * p;
if (what <0 || what >1) {
Expand All @@ -542,7 +542,7 @@ sproto_protoquery(struct sproto *sp, int proto, int what) {
return NULL;
}

const char *
const char *
sproto_protoname(struct sproto *sp, int proto) {
struct protocol * p = query_proto(sp, proto);
if (p) {
Expand All @@ -551,7 +551,7 @@ sproto_protoname(struct sproto *sp, int proto) {
return NULL;
}

struct sproto_type *
struct sproto_type *
sproto_type(struct sproto *sp, const char * type_name) {
int i;
for (i=0;i<sp->type_n;i++) {
Expand Down Expand Up @@ -806,7 +806,7 @@ encode_array(sproto_callback cb, void *ud, struct field *f, uint8_t *data, int s
return fill_size(data, sz);
}

int
int
sproto_encode(struct sproto_type *st, void * buffer, int size, sproto_callback cb, void *ud) {
uint8_t * header = buffer;
uint8_t * data;
Expand All @@ -830,7 +830,7 @@ sproto_encode(struct sproto_type *st, void * buffer, int size, sproto_callback c
sz = encode_array(cb,ud, f, data, size);
} else {
switch(type) {
case SPROTO_TINTEGER:
case SPROTO_TINTEGER:
case SPROTO_TBOOLEAN: {
union {
uint64_t u64;
Expand Down Expand Up @@ -971,7 +971,7 @@ decode_array(sproto_callback cb, void *ud, struct field *f, uint8_t * stream) {
}
case SPROTO_TBOOLEAN:
for (i=0;i<sz;i++) {
int value = stream[i];
uint64_t value = stream[i];
cb(ud, f->name, SPROTO_TBOOLEAN, i+1, NULL, &value, sizeof(value));
}
break;
Expand Down Expand Up @@ -1050,7 +1050,7 @@ sproto_decode(struct sproto_type *st, const void * data, int size, sproto_callba
}
break;
}
case SPROTO_TSTRING:
case SPROTO_TSTRING:
case SPROTO_TSTRUCT: {
uint32_t sz = todword(currentdata);
if (cb(ud, f->name, f->type, 0, f->st, currentdata+SIZEOF_LENGTH, sz))
Expand Down Expand Up @@ -1124,7 +1124,7 @@ write_ff(const uint8_t * src, uint8_t * des, int n) {
}
}

int
int
sproto_pack(const void * srcv, int srcsz, void * bufferv, int bufsz) {
uint8_t tmp[8];
int i;
Expand Down Expand Up @@ -1181,7 +1181,7 @@ sproto_pack(const void * srcv, int srcsz, void * bufferv, int bufsz) {
return size;
}

int
int
sproto_unpack(const void * srcv, int srcsz, void * bufferv, int bufsz) {
const uint8_t * src = srcv;
uint8_t * buffer = bufferv;
Expand Down
48 changes: 47 additions & 1 deletion lualib/mongo.lua
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,11 @@ function mongo_collection:insert(doc)
sock:request(pack)
end

function mongo_collection:batch_insert(docs)
function mongo_collection:safe_insert(doc)
return self.database:runCommand("insert", self.name, "documents", {bson_encode(doc)})
end

function mongo_collection:batch_insert(docs)
for i=1,#docs do
if docs[i]._id == nil then
docs[i]._id = bson.objectid()
Expand Down Expand Up @@ -278,6 +282,48 @@ function mongo_collection:find(query, selector)
} , cursor_meta)
end

-- collection:createIndex({username = 1}, {unique = true})
function mongo_collection:createIndex(keys, option)
local name
for k, v in pairs(keys) do
assert(v == 1)
name = (name == nil) and k or (name .. "_" .. k)
end

local doc = {};
doc.name = name
doc.key = keys
for k, v in pairs(option) do
if v then
doc[k] = true
end
end
return self.database:runCommand("createIndexes", self.name, "indexes", {doc})
end

mongo_collection.ensureIndex = mongo_collection.createIndex;

-- collection:findAndModify({query = {name = "userid"}, update = {["$inc"] = {nextid = 1}}, })
-- keys, value type
-- query, table
-- sort, table
-- remove, bool
-- update, table
-- new, bool
-- fields, bool
-- upsert, boolean
function mongo_collection:findAndModify(doc)
assert(doc.query)
assert(doc.update or doc.remove)

local cmd = {"findAndModify", self.name};
for k, v in pairs(doc) do
table.insert(cmd, k)
table.insert(cmd, v)
end
return self.database:runCommand(unpack(cmd))
end

function mongo_cursor:hasNext()
if self.__ptr == nil then
if self.__document == nil then
Expand Down
70 changes: 20 additions & 50 deletions skynet-src/skynet_mq.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,9 @@ struct message_queue {
};

struct global_queue {
uint32_t head;
uint32_t tail;
struct message_queue ** queue;
// We use a separated flag array to ensure the mq is pushed.
// See the comments below.
struct message_queue *list;
struct message_queue *head;
struct message_queue *tail;
int lock;
};

static struct global_queue *Q = NULL;
Expand All @@ -51,57 +48,32 @@ void
skynet_globalmq_push(struct message_queue * queue) {
struct global_queue *q= Q;

uint32_t tail = GP(__sync_fetch_and_add(&q->tail,1));

// only one thread can set the slot (change q->queue[tail] from NULL to queue)
if (!__sync_bool_compare_and_swap(&q->queue[tail], NULL, queue)) {
// The queue may full seldom, save queue in list
assert(queue->next == NULL);
struct message_queue * last;
do {
last = q->list;
queue->next = last;
} while(!__sync_bool_compare_and_swap(&q->list, last, queue));

return;
LOCK(q)
assert(queue->next == NULL);
if(q->tail) {
q->tail->next = queue;
q->tail = queue;
} else {
q->head = q->tail = queue;
}
UNLOCK(q)
}

struct message_queue *
skynet_globalmq_pop() {
struct global_queue *q = Q;
uint32_t head = q->head;

if (head == q->tail) {
// The queue is empty.
return NULL;
}

uint32_t head_ptr = GP(head);

struct message_queue * list = q->list;
if (list) {
// If q->list is not empty, try to load it back to the queue
struct message_queue *newhead = list->next;
if (__sync_bool_compare_and_swap(&q->list, list, newhead)) {
// try load list only once, if success , push it back to the queue.
list->next = NULL;
skynet_globalmq_push(list);
LOCK(q)
struct message_queue *mq = q->head;
if(mq) {
q->head = mq->next;
if(q->head == NULL) {
assert(mq == q->tail);
q->tail = NULL;
}
mq->next = NULL;
}

struct message_queue * mq = q->queue[head_ptr];
if (mq == NULL) {
// globalmq push not complete
return NULL;
}
if (!__sync_bool_compare_and_swap(&q->head, head, head+1)) {
return NULL;
}
// only one thread can get the slot (change q->queue[head_ptr] to NULL)
if (!__sync_bool_compare_and_swap(&q->queue[head_ptr], mq, NULL)) {
return NULL;
}
UNLOCK(q)

return mq;
}
Expand Down Expand Up @@ -243,8 +215,6 @@ void
skynet_mq_init() {
struct global_queue *q = skynet_malloc(sizeof(*q));
memset(q,0,sizeof(*q));
q->queue = skynet_malloc(MAX_GLOBAL_MQ * sizeof(struct message_queue *));
memset(q->queue, 0, sizeof(struct message_queue *) * MAX_GLOBAL_MQ);
Q=q;
}

Expand Down
2 changes: 1 addition & 1 deletion skynet-src/socket_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ append_sendbuffer_(struct socket_server *ss, struct wb_list *s, struct request_s
struct write_buffer * buf = MALLOC(size);
struct send_object so;
buf->userobject = send_object_init(ss, &so, request->buffer, request->sz);
buf->ptr = so.buffer+n;
buf->ptr = (char*)so.buffer+n;
buf->sz = so.sz - n;
buf->buffer = request->buffer;
buf->next = NULL;
Expand Down
10 changes: 10 additions & 0 deletions test/testdeadloop.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
local skynet = require "skynet"
local function dead_loop()
while true do
skynet.sleep(0)
end
end

skynet.start(function()
skynet.fork(dead_loop)
end)

0 comments on commit f15cd1d

Please sign in to comment.