Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast performance #617

Closed
diegoaguilar opened this issue Nov 12, 2015 · 23 comments
Closed

Broadcast performance #617

diegoaguilar opened this issue Nov 12, 2015 · 23 comments

Comments

@diegoaguilar
Copy link

I just got curious on how broadcast would work, I see a forEach loop after .clients is this really iterating over an array of clients? What if they're too many clients.

@Eeems
Copy link

Eeems commented Nov 12, 2015

Explain too many clients?
Would you have any suggestions for broadcasting to all clients other then this? I can't think of any other way to handle sending the same information to multiple different socket connections.

If you have a large number of connections there will be some slowdown. In that instance the developer should look into having multiple ws servers, and limit the number of clients per instance. You could also start looking into a more distributed style (ws servers connecting to each other for broadcasting) for this which would add a little lag to some clients for broadcasts, but it would overall increate performance.

@fitraditya
Copy link

Socket.io is also use for loop to handle broadcast feature. CMIIW.

https://github.com/socketio/socket.io-adapter/blob/master/index.js#L127

@lpinca
Copy link
Member

lpinca commented Nov 13, 2016

Broadcast performance could be optimized quite a bit.
Calling ws.send() in a loop will frame the message to broadcast n times where n is the number of connected clients.

We could optimize this by framing the message only once and then send that frame to all connected clients. The only problem is that there isn't an API to do this right now.

@mafrost
Copy link

mafrost commented Nov 13, 2016

Such an api would be great. What I would really like is to see:

  • the ability to broadcast to every connected client
  • the ability to broadcast to an array of clients (websocket handles)

or maybe just supply the ability to prepare a message, and then an alternative to send called sendPrepared.

@lpinca
Copy link
Member

lpinca commented Nov 13, 2016

@mafrost yeah, the first step is to extract frameAndSend from Sender to have a standalone function to frame a message. A little work in this direction has already been done in recent changes.

Once this is done a very basic "optimized" broadcast could be as simple as this:

const frame = frameMessage(1, 'message', true, true, true, false);
for (const client of wss.clients) client._socket.write(frame);

@mafrost
Copy link

mafrost commented Nov 13, 2016

Loving it!

@lpinca
Copy link
Member

lpinca commented Feb 28, 2017

The Sender class now ([email protected]) has a "public" static method to frame data. The method returns an array of buffers.

This method allows to implement a slightly more efficient broadcast.

const data = Buffer.from('message');
const list = WebSocket.Sender.frame(data, {
  readOnly: false,
  mask: false,
  rsv1: false,
  opcode: 1,
  fin: true
});

wss.clients.forEach((ws) => {
  if (ws.readyState === WebSocket.OPEN) {
    list.forEach((buf) => ws._socket.write(buf));
  }
});

The example assumes that permessage-deflate is disabled. When permessage-deflate is enabled data can be queued so it is not safe to write to the socket directly as this can change the order of messages or even put a spurious frame in the middle of a fragmented message.

Use this only if know what you are doing.

@lpinca lpinca closed this as completed May 21, 2017
@ssljivic
Copy link

I know that this is an old thread, but I just wanted to give my 2 cents.

@lpinca , I believe that what @diegoaguilar was trying to ask is that in case of a large number of clients, forEach will block the thread in Node preventing it from doing anything else.

Consider the app with a lot of clients, all listening to the messages coming from the server and only few of them pushing changes to the server. With blocking broadcast forEach, node would be blocked to receive updates most of the time. Doing some kind of async forEach for broadcasting would hep here I guess.

It would be also nice to be able to define priorities for sending and receiving, since in many cases I would be interested in getting the message from the client as fast as possible and broadcasting the message to all registered clients with lower priority.

@lpinca
Copy link
Member

lpinca commented Aug 25, 2017

@ssljivic this shouldn't be an issue. A loop with 500k clients on a single server is already unrealistic.

@ssljivic
Copy link

@lpinca I agree that single server has its capacity and can handle max N operations per second. That is not an issue.

The issue is that broadcasting with for loop is blocking the node until the for loop is done. In a better implementation broadcast iteration over client would be async, thus allowing other ops to be processed by Node in between.

The overall server capacity would still be the same, but this would allow some other ops to be executed earlier than later.

@lpinca
Copy link
Member

lpinca commented Aug 26, 2017

@ssljivic assume that you have 100k clients. It will take ~2 ms to iterate through them

const arr = new Array(100000).fill();

const time = process.hrtime();

for (let i = 0; i < arr.length; i++) {
  arr[i] = 0;
}

const diff = process.hrtime(time);

console.log('%d ns', diff[0] * 1e9 + diff[1]);
// => 2440435 ns

my point is that blocking the event loop for such a small time is not an issue.

The library does not force you to use a blocking loop. You can implement an async loop and use it if it's better for your use case.

@adamkaplan
Copy link

adamkaplan commented May 31, 2018

The original topic of this issue is great. I wish I had found it in February before doing a pretty deep investigation to reach the same conclusions: the frame does not change and can be pre-comupted.

Below is the code that I am using to power a realtime financial market data feed with many tens of thousands of clients for yahoo finance.

I will try to submit as a PR when I have a chance, but sharing in case anybody else wants to use it or make the PR + Tests...

  /**
   * Send the same message to a multiple websockets.
   *
   * This method is more efficient than calling
   *
   * websockets.map(ws => { ws.send(message) });
   *
   * because it is optimized to avoid duplication of the work
   * required to package a message into an RFC-6455 websockets
   * wrapper. Basically it does the same intermediate work that                                                                                               
   * is done in WS.websocket.send and WS.sender.send.
   *
   * @param websockets An array of websockets
   * @param message A string (text only) message to send
   */
  broadcast(websockets, message) {
    // Since the message will be the same for all clients, the work to
    // generate an RFC-compliant websockets frame can be perfomed just
    // once, yielding potential exponential optimization O(N*MSG) to O(N)

    let data = Buffer.from(message); // only text messages are supported

    let frames = WS.Sender.frame(data, {
      fin: true, // sending a single fragment message
      rsv1: false, // don"t set rsv1 bit (no compression)
      opcode: 1, // opcode for a text frame
      mask: false, // set false for client-side
      readOnly: false // the data can be modified as needed
    });

    return websockets.map( (ws) => {
      // Bail if the websocket is not marked as opened. The socket
      // cannot be assumed to be able to handle messages at this time.
      if (ws.readyState !== WS.OPEN) {
        return Promise.resolve();
      }

      return new Promise( (fulfill, reject) => {
        ws._sender.sendFrame(frames, (error) => {
          if (error) { // catch asynchronous socket write errors
            this.disconnectWebsocket(error, ws);  // <--- custom error logic for your app here.
            return reject(error);
          }
          fulfill();
        });
      });
    });
  }

I did benchmark (in microseconds) Promise vs straight callback and did not find that the Promise construction created any significant overhead relative to socket writing.

@adamkaplan
Copy link

Non-portable Linux notes for over-optimizers with scaling problems like myself...

Separately, I have a proof-of-concept implementation of a "copy-once" broadcast. It is similar to the above code that I shared, except that it avoids copying that same byte array (the RFC-6455 frame) to the Kernel send buffers for each client.

If you're using broadcast(..) to send a 5KB message to 10,000 clients, you're actually copying that message from User Space (Node/LibUV) to the Kernel buffer every time... so that's actually 50 MB of data transfer. Not huge, but if we're talking about a high frequency broadcasting system, it adds up quickly.

The idea is to copy the data to the kernel buffer one time, and then instruct the kernel to send that kernel-data to each client. This is how Apache sends large files to clients, and how Kafka achieves high I/O. Basically you use sendfile(2) instead of write(2) to transmit to a socket. The flow is as follows:

  1. Open a temp file
  2. Write the frame binary into the file handle
  3. Sendfile, like websockets.forEach(ws => { sendfile(ws, file, length, etc....); });
  4. Close file
  5. Unlink file

Of course all of these are chained together via callback/promise and the file is never really written to disk because it's deleted too quickly. It just lives in the OS page cache for a few milliseconds.

I ran this in production for a week and it worked fine. It is implemented as a small Add-on in C++. I was not able to do a proper benchmark while it was live, though it appeared ~20% faster. It's still on my TODO list to benchmark it. Let me know if anybody is interested in the code...

@lpinca
Copy link
Member

lpinca commented May 31, 2018

@adamkaplan nice. There is no API for an optimised broadcast because it really depends on how the data should be broadcasted (#617 (comment)).

  1. permessage-deflate may be on and in this case there one compressor per socket, so you can't simply broadcast the same frame to all sockets. Also it's tricky to preserve the order of messages unless a queue (like it's done in the Sender class) or sync deflate is used.

  2. The data may need to be changed based on the current socket you are writing to. For example if you are broadcasting data for an exchange, you may need to add additional metadata to specify if any of fills or the placed/closed orders belong to the owner of the socket. In this case you can't blindly broadcast the same frame to all sockets but of course, you can do it for "anonymous" sockets.

Thank you for sharing your experience and code.

I think the idea behind #617 (comment) is great and an easy to use module to do that (even if it's not cross platform) would be useful to many people.

@adamkaplan
Copy link

adamkaplan commented May 31, 2018

Point taken. If it is not destined to become an API feature, then at least a doc/wiki entry explaining this option. I understand the concern against giving people the loaded gun to shoot themselves. Use requires a very specific set of requirements are met.

In my case, the idea is certainly that the message is identical per client (i.e. the price of Apple on Nasdaq is the same for everyone). The data is not changed at all. Your point about per message deflate is very good – and I'm going to check this ASAP. I think it's set to off.

@lpinca
Copy link
Member

lpinca commented May 31, 2018

Agreed, Sender.frame() should be documented (with all caveats), it was added for this exact reason.
Yes permessage-deflate is disabled by default and I suggest to keep it off, see #1369 if you are interested.

@adamkaplan
Copy link

adamkaplan commented Jun 1, 2018

Well that was a painful thread.

Yes, per-message deflate is off. My messages, being market data, are extremely compact protobufs anyway (60-100 B). My scaling issues are of the "firehose" sort... compression would just slow it down.

If 50,000 clients connect, probably 40,000 of them want boring stuff like Apple and S&P500 updates (out of the 100k+ available securities). That's why I'm so interested in broadcast functionality.

@ggazzo
Copy link

ggazzo commented Jun 7, 2019

I ran this in production for a week and it worked fine. It is implemented as a small Add-on in C++. I was not able to do a proper benchmark while it was live, though it appeared ~20% faster. It's still on my TODO list to benchmark it. Let me know if anybody is interested in the code...
@adamkaplan actually I'm interested, do you still have the code? :)

@adamkaplan
Copy link

adamkaplan commented Jun 10, 2019

@ggazzo I want to caution that after running in limited production, I don't broadcast with this method anymore. The example code above (#617 (comment)) worked well enough with some minor tweaks since then. It is also a lot less complex and scary.

I am running a fairly massive realtime market data deployment here with over 200,000 active connections on 175 cores per data center, and around 35% CPU utilization.

If you still think that the C++ add-on is worth exploring given the scale above, let me know. It will take some effort/time to decouple the add-on code from the larger proprietary system which cannot be shared (even though it isn't being used, it is integrated).

@ggazzo
Copy link

ggazzo commented Jun 12, 2019

@adamkaplan I reached a similar code by my self before finding this thread, so I was wondering how much performant the C++ add-on was, I probably could develop one following your tips, but I was afraid to waste time (and sure its a little by scary :p)... thanks for advice me...

sorry but I'm not sure if I got what you mean with 175 cores per data center how many data centers do you have? btw probably 200,000 users should be enough to me :)

@adamkaplan
Copy link

The system is running on in the cloud so the system is measured in CPU (and RAM). So 175 instances of 1 docker container with 1 CPU core each.

@adamkaplan
Copy link

@ggazzo Since you just want to compare, here is the sample code without any of the glue and configuration to make it compile and integrate. There is 1 version using only C++ and another version using V8 directly.

https://gist.github.com/adamkaplan/8a6a35a5a914fdf1890ec29865c5ccb1

@ggazzo
Copy link

ggazzo commented Jun 14, 2019

thanks @adamkaplan

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants