Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,29 @@ primus.on('connection', function (spark) {
})
```

### Emitting events from client to server and vice versa

It is also possible to emit events from client to server and the other way round.
The functionality is the same offered by [`primus-emit`][primus-emit], but in
this case you have to use the `trigger` method.

```js
primus.on('connection', function (spark) {
var foo = spark.substream('foo');

foo.on('custom-event', function (data) {
console.log('foo received:', data);
});
})
```

```js
var primus = new Primus(url);
, foo = primus.substream('foo');

foo.trigger('custom-event', 'bar');
```

### FYI's

- When the spark/connection closes all SubStreams will automatically close, this
Expand All @@ -153,3 +176,4 @@ primus.on('connection', function (spark) {
MIT

[Primus]: http://github.com/primus/primus
[primus-emit]: https://github.com/primus/primus-emit
44 changes: 42 additions & 2 deletions substream.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,20 @@
function factory(Stream) {
'use strict';

var toString = Object.prototype.toString
, slice = Array.prototype.slice;

/**
* Check if the given `value` is an `Array`.
*
* @param {Mixed} value The value to check.
* @return {Boolean}
* @api private
*/
var isArray = Array.isArray || function isArray(value) {
return '[object Array]' === toString.call(value);
};

/**
* Streams provides a streaming, namespaced interface on top of a regular
* stream.
Expand Down Expand Up @@ -86,6 +100,20 @@ function factory(Stream) {
};
};

/**
* Emit an event from client to server and vice versa.
*
* @param {String} event Name of the event to emit.
* @param {...Mixed} [args] The arguments to pass to the event listeners.
* @returns {Boolean}
* @api public
*/
SubStream.prototype.trigger = function trigger() {
if (!arguments.length || !this.stream) return false;

return this._write({ emit: slice.call(arguments) });
};

/**
* Write a new message to the streams.
*
Expand Down Expand Up @@ -161,9 +189,21 @@ function factory(Stream) {
*/
SubStream.prototype.mine = function mine(packet) {
if ('object' !== typeof packet || packet.substream !== this.name) return false;
if ('substream::end' === packet.args) return this.end(null, true), true;

this.stream.transforms(this.primus, this, 'incoming', packet.args);
packet = packet.args;

if ('object' === typeof packet && isArray(packet.emit)) {
//
// We've received an emit packet. If the event is not reserved, emit it.
//
if (!this.stream.reserved(packet.emit[0])) {
this.emit.apply(this, packet.emit);
}
} else if ('substream::end' === packet) {
this.end(null, true);
} else {
this.stream.transforms(this.primus, this, 'incoming', packet);
}

return true;
};
Expand Down
77 changes: 64 additions & 13 deletions test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
describe('multi-stream', function test() {
describe('SubStream', function test() {
'use strict';

var Primus = require('primus')
Expand All @@ -11,7 +11,9 @@ describe('multi-stream', function test() {

beforeEach(function (done) {
var server = http.createServer();
primus = new Primus(server, { transformer: 'websockets' });
primus = new Primus(server, {
plugin: { 'substream': plugin }
});

port++;
server.listen(port, done);
Expand All @@ -22,19 +24,14 @@ describe('multi-stream', function test() {
});

it('is compatible as a server-side plugin', function () {
primus.use('multi-stream', plugin);
primus.save(__dirname+ '/primus.js');
});

it('is compatible as a client-side plugin', function () {
primus.use('multi-stream', plugin);

var Socket = primus.Socket;
primus.Socket;
});

it('exposes the substream function', function (done) {
primus.use('substream', plugin);

primus.on('connection', function (spark) {
assume(spark.substream).to.be.a('function');
spark.end();
Expand All @@ -48,10 +45,6 @@ describe('multi-stream', function test() {
});

describe('communication', function () {
beforeEach(function () {
primus.use('substream', plugin);
});

it('doesnt complain about leaking events', function (done) {
this.timeout(30000);

Expand Down Expand Up @@ -363,9 +356,67 @@ describe('multi-stream', function test() {
});
});

describe('trigger', function () {
it('emits custom events from client to server', function (done) {
primus.on('connection', function (spark) {
var foo = spark.substream('foo');

foo.on('custom', function (arg) {
assume(arg).to.equal('bar');
done();
});

foo.on('end', spark.end.bind(spark));
});

var socket = new primus.Socket('http://localhost:'+ port)
, foo = socket.substream('foo');

foo.trigger('custom', 'bar');
});

it('emits custom events from server to client', function (done) {
primus.on('connection', function (spark) {
var foo = spark.substream('foo');

foo.trigger('custom', 'bar');
});

var socket = new primus.Socket('http://localhost:'+ port)
, foo = socket.substream('foo');

foo.on('custom', function (arg) {
assume(arg).to.equal('bar');
done();
});
});

it('prevents reserved events from being emitted', function (done) {
primus.on('connection', function (spark) {
var foo = spark.substream('foo');

foo.trigger('readyStateChange', 'bar');
foo.end();
});

var socket = new primus.Socket('http://localhost:'+ port)
, foo = socket.substream('foo')
, count = 0;

foo.on('readyStateChange', function (arg) {
assume(arg).to.not.equal('bar');
count++;
});

foo.on('end', function () {
assume(count).is.gt(0);
done();
});
});
});

describe('transform', function () {
it('runs message transformers', function (done) {
primus.use('substream', plugin);
primus.transform('incoming', function (packet) {
var data = packet.data;

Expand Down