Skip to content
This repository has been archived by the owner on Aug 17, 2023. It is now read-only.

Commit

Permalink
WIP: Server, Client, Connection and Stream Class.
Browse files Browse the repository at this point in the history
  • Loading branch information
zensh committed Jun 18, 2017
1 parent fc23bf0 commit 7d28bb7
Show file tree
Hide file tree
Showing 6 changed files with 241 additions and 37 deletions.
65 changes: 44 additions & 21 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,54 @@
//
// **License:** MIT

const dgram = require('dgram')
const EventEmitter = require('events')
// const connection = require('./connection')
const { createSocket } = require('dgram')
const { Connection } = require('./connection')
const { ConnectionID } = require('./protocol')
const { lookup } = require('./common')

class QuicClient extends EventEmitter {
constructor (options) {
super()
this.socket = dgram.createSocket(options)
}

address () {}

connect (options) {}

get connecting () {}
// Event: 'connect'

destroy () {}

end () {}
class Client extends Connection {
constructor () {
super()

ref () {}
this.id = ConnectionID.random()
this.isServer = false
}

send (data) {}
async connect (port, address) {
if (this._udp) throw new Error('Client connected')

let addr = await lookup(address || 'localhost')
this.remotePort = port
this.remoteAddress = addr.address
this.remoteFamily = 'IPv' + addr.family

this._udp = createSocket(this.remoteFamily === 'IPv6' ? 'udp6' : 'udp4')
this._udp
.on('error', (err) => this.emit('error', err))
.on('close', () => this._onclose())
.on('message', (msg, rinfo) => this._onmessage(msg, rinfo))

let res = new Promise((resolve, reject) => {
this._udp.once('listening', () => {
this._udp.removeListener('error', reject)

let addr = this._udp.address()
this.localFamily = addr.family
this.localAddress = addr.address
this.localPort = addr.port
this.emit('connect')
resolve()
})
this._udp.once('error', reject)
})
this._udp.bind({exclusive: true})
return res
}

unref () {}
_onmessage (msg, rinfo) {}
_onclose () {}
}

module.exports = QuicClient
exports.Client = Client
76 changes: 76 additions & 0 deletions lib/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
//
// **License:** MIT

const { promisify } = require('util')

exports.lookup = promisify(require('dns').lookup)

class Visitor {
constructor (start, end) {
this.start = start
Expand Down Expand Up @@ -59,3 +63,75 @@ exports.writeUFloat16 = function (value) {
buf.writeUInt16BE(res, 0)
return buf
}

class Queue {
constructor () {
this.tail = []
this.head = []
this.offset = 0
this.hLength = 0
}

get length () {
return this.hLength + this.tail.length - this.offset
}

first () {
return this.hLength === this.offset ? this.tail[0] : this.head[this.offset]
}

push (item) {
this.tail.push(item)
}

pop () {
if (this.tail.length) return this.tail.pop()
if (!this.hLength) return
this.hLength--
return this.head.pop()
}

unshift (item) {
if (!this.offset) {
this.hLength++
this.head.unshift(item)
} else {
this.offset--
this.head[this.offset] = item
}
}

shift () {
if (this.offset === this.hLength) {
if (!this.tail.length) return

let tmp = this.head
tmp.length = 0
this.head = this.tail
this.tail = tmp
this.offset = 0
this.hLength = this.head.length
}
return this.head[this.offset++]
}

reset () {
this.offset = 0
this.hLength = 0
this.tail.length = 0
this.head.length = 0
}

migrateTo (queue) {
let i = this.offset
let len = this.tail.length
while (i < this.hLength) queue.push(this.head[i++])

i = 0
while (i < len) queue.push(this.tail[i++])
this.offset = this.hLength = this.head.length = this.tail.length = 0
return queue
}
}

exports.Queue = Queue
33 changes: 30 additions & 3 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,37 @@

const EventEmitter = require('events')

class QuicConnection extends EventEmitter {
constructor (options) {
// Event: 'timeout'
// Event: 'close'
// Event: 'error'
// Event: 'stream'

class Connection extends EventEmitter {
constructor () {
super()

this.id = null
this._udp = null
this.bytesRead = 0
this.bytesWritten = 0
this.localFamily = ''
this.localAddress = ''
this.localPort = 0
this.remoteFamily = ''
this.remoteAddress = ''
this.remotePort = 0
this.isServer = false
this.streams = new Map()
}

_writePacket (packet) {}
close () {}
destroy () {}
address () {
return {port: this.localPort, family: this.localFamily, address: this.localAddress}
}
ref () {}
unref () {}
}

module.exports = QuicConnection
exports.Connection = Connection
65 changes: 53 additions & 12 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,70 @@
//
// **License:** MIT

const dgram = require('dgram')
const { createSocket } = require('dgram')
const EventEmitter = require('events')
// const connection = require('./connection')
const { lookup } = require('./common')

class QuicServer extends EventEmitter {
constructor (options) {
// Event: 'listening'
// Event: 'connection'

class Server extends EventEmitter {
constructor () {
super()
this.socket = dgram.createSocket(options)
this._udp = null
this.localFamily = ''
this.localAddress = ''
this.localPort = 0
this.listening = false
this.conns = new Map()
}

address () {}
address () {
return {port: this.localPort, family: this.localFamily, address: this.localAddress}
}

close () {}
async listen (port, address) {
if (this._udp) throw new Error('Server listening')

getConnections () {}
let type = 'upd4'
if (address) {
let addr = await lookup(address || 'localhost')
if (addr.family === 6) type = 'upd6'
}

listen (options) {}
this._udp = createSocket(type)
this._udp
.on('error', (err) => this.emit('error', err))
.on('close', () => this._onclose())
.on('message', (msg, rinfo) => this._onmessage(msg, rinfo))

get listening () {}
let res = new Promise((resolve, reject) => {
this._udp.once('listening', () => {
this._udp.removeListener('error', reject)

ref () {}
let addr = this._udp.address()
this.localFamily = addr.family
this.localAddress = addr.address
this.localPort = addr.port
this.emit('listening')
resolve()
})
this._udp.once('error', reject)
})
// Can't support cluster
this._udp.bind({port: port, address: address, exclusive: true})
return res
}

_onmessage (msg, rinfo) {}
_onclose () {}

close () {}
getConnections () {
return Promise.resolve(this.conns.size)
}
ref () {}
unref () {}
}

module.exports = QuicServer
exports.Server = Server
37 changes: 37 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,40 @@
// **Github:** https://github.com/toajs/quic
//
// **License:** MIT

const { Duplex } = require('stream')
const { Offset } = require('./protocol')

class Stream extends Duplex {
/**
* Returns a QUIC stream.
*
* @param {protocol.StreamID} stream id
* @param {connection.Connection} QUIC connection
* @return {Stream}
*/
constructor (id, conn) {
super({allowHalfOpen: true, readableObjectMode: false, writableObjectMode: false})

this.id = id
this.conn = conn
this.writeOffset = new Offset()
this.readOffset = new Offset()
}

_write (chunk, encoding, callback) {
// The underlying source only deals with strings
// if (Buffer.isBuffer(chunk))
// chunk = chunk.toString()
// this[kSource].writeSomeData(chunk)
// callback()
}

_read (size) {
// this[kSource].fetchSomeData(size, (data, encoding) => {
// this.push(Buffer.from(data, encoding))
// })
}
}

exports.Stream = Stream
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"authors": [
"Yan Qing <[email protected]>"
],
"version": "0.0.1",
"version": "0.0.2",
"main": "lib/index.js",
"license": "MIT",
"repository": {
Expand Down

0 comments on commit 7d28bb7

Please sign in to comment.