You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

572 lines
11 KiB

  1. /**
  2. * Module dependencies.
  3. */
  4. var Emitter = require('events').EventEmitter;
  5. var parser = require('socket.io-parser');
  6. var hasBin = require('has-binary2');
  7. var url = require('url');
  8. var debug = require('debug')('socket.io:socket');
  9. /**
  10. * Module exports.
  11. */
  12. module.exports = exports = Socket;
  13. /**
  14. * Blacklisted events.
  15. *
  16. * @api public
  17. */
  18. exports.events = [
  19. 'error',
  20. 'connect',
  21. 'disconnect',
  22. 'disconnecting',
  23. 'newListener',
  24. 'removeListener'
  25. ];
  26. /**
  27. * Flags.
  28. *
  29. * @api private
  30. */
  31. var flags = [
  32. 'json',
  33. 'volatile',
  34. 'broadcast',
  35. 'local'
  36. ];
  37. /**
  38. * `EventEmitter#emit` reference.
  39. */
  40. var emit = Emitter.prototype.emit;
  41. /**
  42. * Interface to a `Client` for a given `Namespace`.
  43. *
  44. * @param {Namespace} nsp
  45. * @param {Client} client
  46. * @api public
  47. */
  48. function Socket(nsp, client, query){
  49. this.nsp = nsp;
  50. this.server = nsp.server;
  51. this.adapter = this.nsp.adapter;
  52. this.id = nsp.name !== '/' ? nsp.name + '#' + client.id : client.id;
  53. this.client = client;
  54. this.conn = client.conn;
  55. this.rooms = {};
  56. this.acks = {};
  57. this.connected = true;
  58. this.disconnected = false;
  59. this.handshake = this.buildHandshake(query);
  60. this.fns = [];
  61. this.flags = {};
  62. this._rooms = [];
  63. }
  64. /**
  65. * Inherits from `EventEmitter`.
  66. */
  67. Socket.prototype.__proto__ = Emitter.prototype;
  68. /**
  69. * Apply flags from `Socket`.
  70. */
  71. flags.forEach(function(flag){
  72. Object.defineProperty(Socket.prototype, flag, {
  73. get: function() {
  74. this.flags[flag] = true;
  75. return this;
  76. }
  77. });
  78. });
  79. /**
  80. * `request` engine.io shortcut.
  81. *
  82. * @api public
  83. */
  84. Object.defineProperty(Socket.prototype, 'request', {
  85. get: function() {
  86. return this.conn.request;
  87. }
  88. });
  89. /**
  90. * Builds the `handshake` BC object
  91. *
  92. * @api private
  93. */
  94. Socket.prototype.buildHandshake = function(query){
  95. var self = this;
  96. function buildQuery(){
  97. var requestQuery = url.parse(self.request.url, true).query;
  98. //if socket-specific query exist, replace query strings in requestQuery
  99. return Object.assign({}, query, requestQuery);
  100. }
  101. return {
  102. headers: this.request.headers,
  103. time: (new Date) + '',
  104. address: this.conn.remoteAddress,
  105. xdomain: !!this.request.headers.origin,
  106. secure: !!this.request.connection.encrypted,
  107. issued: +(new Date),
  108. url: this.request.url,
  109. query: buildQuery()
  110. };
  111. };
  112. /**
  113. * Emits to this client.
  114. *
  115. * @return {Socket} self
  116. * @api public
  117. */
  118. Socket.prototype.emit = function(ev){
  119. if (~exports.events.indexOf(ev)) {
  120. emit.apply(this, arguments);
  121. return this;
  122. }
  123. var args = Array.prototype.slice.call(arguments);
  124. var packet = {
  125. type: (this.flags.binary !== undefined ? this.flags.binary : hasBin(args)) ? parser.BINARY_EVENT : parser.EVENT,
  126. data: args
  127. };
  128. // access last argument to see if it's an ACK callback
  129. if (typeof args[args.length - 1] === 'function') {
  130. if (this._rooms.length || this.flags.broadcast) {
  131. throw new Error('Callbacks are not supported when broadcasting');
  132. }
  133. debug('emitting packet with ack id %d', this.nsp.ids);
  134. this.acks[this.nsp.ids] = args.pop();
  135. packet.id = this.nsp.ids++;
  136. }
  137. var rooms = this._rooms.slice(0);
  138. var flags = Object.assign({}, this.flags);
  139. // reset flags
  140. this._rooms = [];
  141. this.flags = {};
  142. if (rooms.length || flags.broadcast) {
  143. this.adapter.broadcast(packet, {
  144. except: [this.id],
  145. rooms: rooms,
  146. flags: flags
  147. });
  148. } else {
  149. // dispatch packet
  150. this.packet(packet, flags);
  151. }
  152. return this;
  153. };
  154. /**
  155. * Targets a room when broadcasting.
  156. *
  157. * @param {String} name
  158. * @return {Socket} self
  159. * @api public
  160. */
  161. Socket.prototype.to =
  162. Socket.prototype.in = function(name){
  163. if (!~this._rooms.indexOf(name)) this._rooms.push(name);
  164. return this;
  165. };
  166. /**
  167. * Sends a `message` event.
  168. *
  169. * @return {Socket} self
  170. * @api public
  171. */
  172. Socket.prototype.send =
  173. Socket.prototype.write = function(){
  174. var args = Array.prototype.slice.call(arguments);
  175. args.unshift('message');
  176. this.emit.apply(this, args);
  177. return this;
  178. };
  179. /**
  180. * Writes a packet.
  181. *
  182. * @param {Object} packet object
  183. * @param {Object} opts options
  184. * @api private
  185. */
  186. Socket.prototype.packet = function(packet, opts){
  187. packet.nsp = this.nsp.name;
  188. opts = opts || {};
  189. opts.compress = false !== opts.compress;
  190. this.client.packet(packet, opts);
  191. };
  192. /**
  193. * Joins a room.
  194. *
  195. * @param {String|Array} room or array of rooms
  196. * @param {Function} fn optional, callback
  197. * @return {Socket} self
  198. * @api private
  199. */
  200. Socket.prototype.join = function(rooms, fn){
  201. debug('joining room %s', rooms);
  202. var self = this;
  203. if (!Array.isArray(rooms)) {
  204. rooms = [rooms];
  205. }
  206. rooms = rooms.filter(function (room) {
  207. return !self.rooms.hasOwnProperty(room);
  208. });
  209. if (!rooms.length) {
  210. fn && fn(null);
  211. return this;
  212. }
  213. this.adapter.addAll(this.id, rooms, function(err){
  214. if (err) return fn && fn(err);
  215. debug('joined room %s', rooms);
  216. rooms.forEach(function (room) {
  217. self.rooms[room] = room;
  218. });
  219. fn && fn(null);
  220. });
  221. return this;
  222. };
  223. /**
  224. * Leaves a room.
  225. *
  226. * @param {String} room
  227. * @param {Function} fn optional, callback
  228. * @return {Socket} self
  229. * @api private
  230. */
  231. Socket.prototype.leave = function(room, fn){
  232. debug('leave room %s', room);
  233. var self = this;
  234. this.adapter.del(this.id, room, function(err){
  235. if (err) return fn && fn(err);
  236. debug('left room %s', room);
  237. delete self.rooms[room];
  238. fn && fn(null);
  239. });
  240. return this;
  241. };
  242. /**
  243. * Leave all rooms.
  244. *
  245. * @api private
  246. */
  247. Socket.prototype.leaveAll = function(){
  248. this.adapter.delAll(this.id);
  249. this.rooms = {};
  250. };
  251. /**
  252. * Called by `Namespace` upon successful
  253. * middleware execution (ie: authorization).
  254. * Socket is added to namespace array before
  255. * call to join, so adapters can access it.
  256. *
  257. * @api private
  258. */
  259. Socket.prototype.onconnect = function(){
  260. debug('socket connected - writing packet');
  261. this.nsp.connected[this.id] = this;
  262. this.join(this.id);
  263. var skip = this.nsp.name === '/' && this.nsp.fns.length === 0;
  264. if (skip) {
  265. debug('packet already sent in initial handshake');
  266. } else {
  267. this.packet({ type: parser.CONNECT });
  268. }
  269. };
  270. /**
  271. * Called with each packet. Called by `Client`.
  272. *
  273. * @param {Object} packet
  274. * @api private
  275. */
  276. Socket.prototype.onpacket = function(packet){
  277. debug('got packet %j', packet);
  278. switch (packet.type) {
  279. case parser.EVENT:
  280. this.onevent(packet);
  281. break;
  282. case parser.BINARY_EVENT:
  283. this.onevent(packet);
  284. break;
  285. case parser.ACK:
  286. this.onack(packet);
  287. break;
  288. case parser.BINARY_ACK:
  289. this.onack(packet);
  290. break;
  291. case parser.DISCONNECT:
  292. this.ondisconnect();
  293. break;
  294. case parser.ERROR:
  295. this.onerror(new Error(packet.data));
  296. }
  297. };
  298. /**
  299. * Called upon event packet.
  300. *
  301. * @param {Object} packet object
  302. * @api private
  303. */
  304. Socket.prototype.onevent = function(packet){
  305. var args = packet.data || [];
  306. debug('emitting event %j', args);
  307. if (null != packet.id) {
  308. debug('attaching ack callback to event');
  309. args.push(this.ack(packet.id));
  310. }
  311. this.dispatch(args);
  312. };
  313. /**
  314. * Produces an ack callback to emit with an event.
  315. *
  316. * @param {Number} id packet id
  317. * @api private
  318. */
  319. Socket.prototype.ack = function(id){
  320. var self = this;
  321. var sent = false;
  322. return function(){
  323. // prevent double callbacks
  324. if (sent) return;
  325. var args = Array.prototype.slice.call(arguments);
  326. debug('sending ack %j', args);
  327. self.packet({
  328. id: id,
  329. type: hasBin(args) ? parser.BINARY_ACK : parser.ACK,
  330. data: args
  331. });
  332. sent = true;
  333. };
  334. };
  335. /**
  336. * Called upon ack packet.
  337. *
  338. * @api private
  339. */
  340. Socket.prototype.onack = function(packet){
  341. var ack = this.acks[packet.id];
  342. if ('function' == typeof ack) {
  343. debug('calling ack %s with %j', packet.id, packet.data);
  344. ack.apply(this, packet.data);
  345. delete this.acks[packet.id];
  346. } else {
  347. debug('bad ack %s', packet.id);
  348. }
  349. };
  350. /**
  351. * Called upon client disconnect packet.
  352. *
  353. * @api private
  354. */
  355. Socket.prototype.ondisconnect = function(){
  356. debug('got disconnect packet');
  357. this.onclose('client namespace disconnect');
  358. };
  359. /**
  360. * Handles a client error.
  361. *
  362. * @api private
  363. */
  364. Socket.prototype.onerror = function(err){
  365. if (this.listeners('error').length) {
  366. this.emit('error', err);
  367. } else {
  368. console.error('Missing error handler on `socket`.');
  369. console.error(err.stack);
  370. }
  371. };
  372. /**
  373. * Called upon closing. Called by `Client`.
  374. *
  375. * @param {String} reason
  376. * @throw {Error} optional error object
  377. * @api private
  378. */
  379. Socket.prototype.onclose = function(reason){
  380. if (!this.connected) return this;
  381. debug('closing socket - reason %s', reason);
  382. this.emit('disconnecting', reason);
  383. this.leaveAll();
  384. this.nsp.remove(this);
  385. this.client.remove(this);
  386. this.connected = false;
  387. this.disconnected = true;
  388. delete this.nsp.connected[this.id];
  389. this.emit('disconnect', reason);
  390. };
  391. /**
  392. * Produces an `error` packet.
  393. *
  394. * @param {Object} err error object
  395. * @api private
  396. */
  397. Socket.prototype.error = function(err){
  398. this.packet({ type: parser.ERROR, data: err });
  399. };
  400. /**
  401. * Disconnects this client.
  402. *
  403. * @param {Boolean} close if `true`, closes the underlying connection
  404. * @return {Socket} self
  405. * @api public
  406. */
  407. Socket.prototype.disconnect = function(close){
  408. if (!this.connected) return this;
  409. if (close) {
  410. this.client.disconnect();
  411. } else {
  412. this.packet({ type: parser.DISCONNECT });
  413. this.onclose('server namespace disconnect');
  414. }
  415. return this;
  416. };
  417. /**
  418. * Sets the compress flag.
  419. *
  420. * @param {Boolean} compress if `true`, compresses the sending data
  421. * @return {Socket} self
  422. * @api public
  423. */
  424. Socket.prototype.compress = function(compress){
  425. this.flags.compress = compress;
  426. return this;
  427. };
  428. /**
  429. * Sets the binary flag
  430. *
  431. * @param {Boolean} Encode as if it has binary data if `true`, Encode as if it doesnt have binary data if `false`
  432. * @return {Socket} self
  433. * @api public
  434. */
  435. Socket.prototype.binary = function (binary) {
  436. this.flags.binary = binary;
  437. return this;
  438. };
  439. /**
  440. * Dispatch incoming event to socket listeners.
  441. *
  442. * @param {Array} event that will get emitted
  443. * @api private
  444. */
  445. Socket.prototype.dispatch = function(event){
  446. debug('dispatching an event %j', event);
  447. var self = this;
  448. function dispatchSocket(err) {
  449. process.nextTick(function(){
  450. if (err) {
  451. return self.error(err.data || err.message);
  452. }
  453. emit.apply(self, event);
  454. });
  455. }
  456. this.run(event, dispatchSocket);
  457. };
  458. /**
  459. * Sets up socket middleware.
  460. *
  461. * @param {Function} middleware function (event, next)
  462. * @return {Socket} self
  463. * @api public
  464. */
  465. Socket.prototype.use = function(fn){
  466. this.fns.push(fn);
  467. return this;
  468. };
  469. /**
  470. * Executes the middleware for an incoming event.
  471. *
  472. * @param {Array} event that will get emitted
  473. * @param {Function} last fn call in the middleware
  474. * @api private
  475. */
  476. Socket.prototype.run = function(event, fn){
  477. var fns = this.fns.slice(0);
  478. if (!fns.length) return fn(null);
  479. function run(i){
  480. fns[i](event, function(err){
  481. // upon error, short-circuit
  482. if (err) return fn(err);
  483. // if no middleware left, summon callback
  484. if (!fns[i + 1]) return fn(null);
  485. // go on to next
  486. run(i + 1);
  487. });
  488. }
  489. run(0);
  490. };