在浏览器和nodejs中使用eventsource实现长连接

志晨 韩志晨 韩
2 min read

Table of contents

EventSource 接口是 web 内容与服务器发送事件通信的接口。

一个 EventSource 实例会对 HTTP 服务器开启一个持久化的连接,以 text/event-stream 格式发送事件,此连接会一直保持开启直到通过调用 EventSource.close() 关闭。

在我的本地示例中:

前端我使用vite启动了一个网页服务器,地址是http://localhost:5173

服务端我使用express启动了一个后端服务器,地址是http://localhost:8888

在浏览器端使用EventSouce

const sse = new EventSource('http://localhost:8888')

// 实例属性
console.log(sse.readyState, sse.url, sse.withCredentials)

// 实例方法
// sse.close() // 在系统退出回收资源时调用

// 事件
// sse.addEventListener('open', funtion(event) {})
sse.onopen = (event) => {
    console.log('sse open', event.data)
} 
sse.onmessage = (event) => {
    console.log('sse msg:', event.data)
}
sse.onerror = (err) => {
    console.log('sse err:', err)
}

在服务端使用:

服务端安装express、ssestream。由于客户端和服务端的端口不同,调用http请求会导致跨域问题,所以还需要安装cors模块。

// # index.js
"use strict";
const express = require("express");
const SseServer = require("./SseServer");
const cors = require("cors");
const app = express();

// cors要在请求处理中间件之前调用
app.use(cors());

// sse
const mySseServer = new SseServer({
  maxConnections: 3, // 设置最大链接数量
});
app.use("/sse", mySseServer.middleWare());

// 启动服务器,监听端口
const port = 8888;
app.listen(port, () => {
  console.log(`App is listening to port :${port}`);
});

// 模拟向客户端推送消息
setInterval(() => {
  // 当前链接的数量
  console.log("current connect number:", mySseServer.sseConnections.size);
  mySseServer.announce(`It is ${new Date()} now!`);
}, 2000);
const SseStream = require("ssestream").default;

class SseServer {
  constructor(options) {
    // 用来缓存当前所有的链接用来之后发送消息
    this.sseConnections = new Set();
    // 设置最大链接数
    this.maxConnections = options.maxConnections || Infinity;
    this.middleWare = this.middleWare.bind(this);
    this.announce = this.announce.bind(this);
  }
  middleWare() {
    return (req, res) => {
      const sseConnections = this.sseConnections;
      // 超过最大链接数的时候需要拒绝客户端请求
      if (sseConnections.size >= this.maxConnections) {
        return res.status(429).send();
      }
      const sse = new SseStream(req);
      // 详见 ssestream 的 api
      sse.pipe(res);
      const metaData = [sse, req, res];
      // 写入链接缓存
      sseConnections.add(metaData);
      // 与客户端链接断开时需要清除链接缓存
      req.on("close", function () {
        console.log("CONNECTION CLOSED!!!");
        sseConnections.delete(metaData);
      });
    };
  }
  // 向客户端广播消息
  announce(data) {
    this.sseConnections.forEach((meta) => {
      const [sse, req, res] = meta;
      const message = {
        data,
      };
      meta[0].write(message);
    });
  }
}

module.exports = SseServer;

ssestream 设置了 SSE 需要的 HTTP 响应 Header :

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
0
Subscribe to my newsletter

Read articles from 志晨 韩 directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

志晨 韩
志晨 韩