fix: revamped rts server architecture (#15870)

## Description

RTS node server was being used for Availability feature. Now we will be enhancing the server to utilise client side logic on the server side side like construction of AST, Evaluations logic etc. So, since our codebase was not structured for implementing such use case, so we had to revamp the structure of the codebase.
#### Architecture

- **Routes :** To define different paths to consume apis
- **Controllers :** To capture the routing and formulate the logic to provide response to a particular request
- **Middleware :** action functions that happen to be stitched in between the routes and controllers, or can be used as a part of logic that needs to be injected in between.
- **Constants :** To keep static variables as per the usage without declaring it in other parts of the codebase
- **Sockets :** Any socket apis exists inside this folder to properly structure every socket implementations
- **Utils :** Helper functions which can be used at any stage of the application

Fixes #15645

## Type of change

- New feature (non-breaking change which adds functionality)

## How Has This Been Tested?

> Please describe the tests that you ran to verify your changes. Provide instructions, so we can reproduce.
> Please also list any relevant details for your test configuration.

- Test A
- Test B

## Checklist:

- [ ] My code follows the style guidelines of this project
- [ ] I have performed a self-review of my own code
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] New and existing unit tests pass locally with my changes

Co-authored-by: Aishwarya UR <aishwarya@appsmith.com>
This commit is contained in:
Aman Agarwal 2022-08-11 15:06:02 +05:30 committed by GitHub
parent 50597d5d6f
commit fb752eaa9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 482 additions and 374 deletions

View File

@ -1,2 +1,3 @@
APPSMITH_MONGODB_URI=mongodb://localhost:27017/appsmith
APPSMITH_API_BASE_URL=http://localhost:8080/api/v1
APPSMITH_API_BASE_URL=http://localhost:8080/api/v1
PORT=8091

View File

@ -0,0 +1,10 @@
export const APP_ROOM_PREFIX = "app:";
export const PAGE_ROOM_PREFIX = "page:";
export const ROOT_NAMESPACE = "/";
export const PAGE_EDIT_NAMESPACE = "/page/edit";
export const EDITORS_EVENT_NAME = "collab:online_editors";
export const START_EDIT_EVENT_NAME = "collab:start_edit";
export const LEAVE_EDIT_EVENT_NAME = "collab:leave_edit";
export const MOUSE_POINTER_EVENT_NAME = "collab:mouse_pointer";
export const RELEASE_VERSION_EVENT_NAME = "info:release_version";

View File

@ -0,0 +1 @@
export const VERSION = "SNAPSHOT";

View File

@ -0,0 +1,128 @@
import { Server, Socket } from "socket.io";
import { tryAuth } from "../middlewares/socket-auth";
import {
START_EDIT_EVENT_NAME,
LEAVE_EDIT_EVENT_NAME,
MOUSE_POINTER_EVENT_NAME,
PAGE_EDIT_NAMESPACE,
PAGE_ROOM_PREFIX,
EDITORS_EVENT_NAME,
} from "../constants/socket";
import {
AppUser,
Policy,
CurrentEditorsEvent,
MousePointerEvent,
} from "../utils/models";
function subscribeToEditEvents(socket: Socket, appRoomPrefix: string) {
socket.on(START_EDIT_EVENT_NAME, (resourceId) => {
if (socket.data.email) {
// user is authenticated, join the room now
joinEditRoom(socket, resourceId, appRoomPrefix);
} else {
// user not authenticated yet, save the resource id and room prefix to join later after auth
socket.data.pendingRoomId = resourceId;
socket.data.pendingRoomPrefix = appRoomPrefix;
}
});
socket.on(LEAVE_EDIT_EVENT_NAME, (resourceId) => {
let roomName = appRoomPrefix + resourceId;
socket.leave(roomName); // remove this socket from room
});
}
async function onAppSocketConnected(socket: Socket) {
let isAuthenticated = await tryAuthAndJoinPendingRoom(socket);
if (isAuthenticated) {
socket.join("email:" + socket.data.email);
}
}
async function onPageSocketConnected(socket: Socket, socketIo: Server) {
let isAuthenticated = await tryAuthAndJoinPendingRoom(socket);
if (isAuthenticated) {
socket.on(MOUSE_POINTER_EVENT_NAME, (event: MousePointerEvent) => {
event.user = new AppUser(socket.data.name, socket.data.email);
event.socketId = socket.id;
socketIo
.of(PAGE_EDIT_NAMESPACE)
.to(PAGE_ROOM_PREFIX + event.pageId)
.emit(MOUSE_POINTER_EVENT_NAME, event);
});
}
}
async function tryAuthAndJoinPendingRoom(socket: Socket) {
const isAuthenticated = await tryAuth(socket);
if (socket.data.pendingRoomId) {
// an appId or pageId is pending for this socket, join now
joinEditRoom(
socket,
socket.data.pendingRoomId,
socket.data.pendingRoomPrefix
);
}
return isAuthenticated;
}
function joinEditRoom(socket: Socket, roomId: string, roomPrefix: string) {
// remove this socket from any other rooms with roomPrefix
if (socket.rooms) {
socket.rooms.forEach((roomName) => {
if (roomName.startsWith(roomPrefix)) {
socket.leave(roomName);
}
});
}
// add this socket to room with application id
let roomName = roomPrefix + roomId;
socket.join(roomName);
}
function findPolicyEmails(policies: Policy[], permission: string): string[] {
const emails: string[] = [];
for (const policy of policies) {
if (policy.permission === permission) {
for (const email of policy.users) {
emails.push(email);
}
break;
}
}
return emails;
}
function sendCurrentUsers(socketIo, roomName: string, roomPrefix: string) {
if (roomName.startsWith(roomPrefix)) {
socketIo
.in(roomName)
.fetchSockets()
.then((sockets) => {
let onlineUsernames = new Set<string>();
let onlineUsers = new Array<AppUser>();
if (sockets) {
sockets.forEach((s) => {
if (!onlineUsernames.has(s.data.email)) {
onlineUsers.push(new AppUser(s.data.name, s.data.email));
}
onlineUsernames.add(s.data.email);
});
}
let resourceId = roomName.replace(roomPrefix, ""); // get resourceId from room name by removing the prefix
let response = new CurrentEditorsEvent(resourceId, onlineUsers);
socketIo.to(roomName).emit(EDITORS_EVENT_NAME, response);
});
}
}
export {
subscribeToEditEvents,
onAppSocketConnected,
onPageSocketConnected,
sendCurrentUsers,
findPolicyEmails,
};

View File

@ -0,0 +1,69 @@
import { Socket } from "socket.io";
import log from "loglevel";
import axios from "axios";
const API_BASE_URL = process.env.APPSMITH_API_BASE_URL;
export async function tryAuth(socket: Socket) {
/* ********************************************************* */
// TODO: This change is not being used at the moment. Instead of using the environment variable API_BASE_URL
// we should be able to derive the API_BASE_URL from the host header. This will make configuration simpler
// for the user. The problem with this implementation is that Axios doesn't work for https endpoints currently.
// This needs to be debugged.
/* ********************************************************* */
// const host = socket.handshake.headers.host;
const connectionCookie = socket?.handshake?.headers?.cookie;
if (
connectionCookie === undefined ||
connectionCookie === null ||
connectionCookie === ""
) {
return false;
}
const matchedCookie = connectionCookie.match(/\bSESSION=\S+/);
if (!matchedCookie) {
return false;
}
const sessionCookie = matchedCookie[0];
let response;
try {
response = await axios.request({
method: "GET",
url: API_BASE_URL + "/users/me",
headers: {
Cookie: sessionCookie,
},
});
} catch (error) {
if (error.response?.status === 401) {
console.info(
"401 received when authenticating user with cookie: " + sessionCookie
);
} else if (error.response) {
log.error(
"Error response received while authentication: ",
error.response
);
} else {
log.error("Error authenticating", error);
}
return false;
}
const email = response?.data?.data?.email;
const name = response?.data?.data?.name ?? email;
// If the session check API succeeds & the email/name is anonymousUser, then the user is not authenticated
// and we should not allow them to join any rooms
if (email == null || email === "anonymousUser" || name === "anonymousUser") {
return false;
}
socket.data.email = email;
socket.data.name = name;
return true;
}

View File

@ -1,394 +1,59 @@
import http from "http";
import path from "path";
import express from "express";
import {Server, Socket} from "socket.io";
import type mongodb from "mongodb";
import {MongoClient} from "mongodb";
import axios from "axios";
import log, {LogLevelDesc} from "loglevel";
import {AppUser, Comment, CommentThread, CurrentEditorsEvent, MousePointerEvent, Policy} from "./models";
import {VERSION as buildVersion} from "./version"; // release version of the api
import { Server } from "socket.io";
import log, { LogLevelDesc } from "loglevel";
import { VERSION as buildVersion } from "./constants/version"; // release version of the api
import { initializeSockets } from "./sockets";
const RTS_BASE_PATH = "/rts";
const APP_ROOM_PREFIX = "app:";
const PAGE_ROOM_PREFIX = "page:";
const ROOT_NAMESPACE = "/";
const PAGE_EDIT_NAMESPACE = "/page/edit";
const EDITORS_EVENT_NAME = "collab:online_editors";
const START_EDIT_EVENT_NAME = "collab:start_edit";
const LEAVE_EDIT_EVENT_NAME = "collab:leave_edit";
const MOUSE_POINTER_EVENT_NAME = "collab:mouse_pointer";
const RELEASE_VERSION_EVENT_NAME = "info:release_version";
// Setting the logLevel for all log messages
const logLevel: LogLevelDesc = (process.env.APPSMITH_LOG_LEVEL || "debug") as LogLevelDesc;
const logLevel: LogLevelDesc = (process.env.APPSMITH_LOG_LEVEL ||
"debug") as LogLevelDesc;
log.setLevel(logLevel);
// Verifing Environment Variables
const MONGODB_URI = process.env.APPSMITH_MONGODB_URI;
if (MONGODB_URI == null || MONGODB_URI === "" || !MONGODB_URI.startsWith("mongodb")) {
log.error("Please provide a valid value for `APPSMITH_MONGODB_URI`.");
process.exit(1);
if (
MONGODB_URI == null ||
MONGODB_URI === "" ||
!MONGODB_URI.startsWith("mongodb")
) {
log.error("Please provide a valid value for `APPSMITH_MONGODB_URI`.");
process.exit(1);
}
const API_BASE_URL = process.env.APPSMITH_API_BASE_URL;
if (API_BASE_URL == null || API_BASE_URL === "") {
log.error("Please provide a valid value for `APPSMITH_API_BASE_URL`.");
process.exit(1);
log.error("Please provide a valid value for `APPSMITH_API_BASE_URL`.");
process.exit(1);
}
const PORT = process.env.PORT || 8091;
main();
function main() {
const app = express();
//Disable x-powered-by header to prevent information disclosure
app.disable("x-powered-by");
const server = new http.Server(app);
const io = new Server(server, {
path: RTS_BASE_PATH,
});
const app = express();
//Disable x-powered-by header to prevent information disclosure
app.disable("x-powered-by");
const server = new http.Server(app);
const io = new Server(server, {
path: RTS_BASE_PATH,
});
const port = 8091;
// Initializing Sockets
initializeSockets(io);
app.get("/", (req, res) => {
res.redirect("/index.html");
});
io.on("connection", (socket: Socket) => {
socket.emit(RELEASE_VERSION_EVENT_NAME, buildVersion);
subscribeToEditEvents(socket, APP_ROOM_PREFIX);
onAppSocketConnected(socket)
.catch((error) => log.error("Error in socket connected handler", error));
});
io.of(PAGE_EDIT_NAMESPACE).on("connection", (socket: Socket) => {
subscribeToEditEvents(socket, PAGE_ROOM_PREFIX);
onPageSocketConnected(socket, io)
.catch((error) => log.error("Error in socket connected handler", error));
});
io.of(ROOT_NAMESPACE).adapter.on("leave-room", (room, id) => {
if (room.startsWith(APP_ROOM_PREFIX)) {
log.debug(`ns:${ROOT_NAMESPACE}# socket ${id} left the room ${room}`);
}
sendCurrentUsers(io, room, APP_ROOM_PREFIX);
});
io.of(ROOT_NAMESPACE).adapter.on("join-room", (room, id) => {
if (room.startsWith(APP_ROOM_PREFIX)) {
log.debug(`ns:${ROOT_NAMESPACE}# socket ${id} joined the room ${room}`);
}
sendCurrentUsers(io, room, APP_ROOM_PREFIX);
});
io.of(PAGE_EDIT_NAMESPACE).adapter.on("leave-room", (room, id) => {
if (room.startsWith(PAGE_ROOM_PREFIX)) { // someone left the page edit, notify others
log.debug(`ns:${PAGE_EDIT_NAMESPACE} # socket ${id} left the room ${room}`);
io.of(PAGE_EDIT_NAMESPACE).to(room).emit(LEAVE_EDIT_EVENT_NAME, id);
}
sendCurrentUsers(io.of(PAGE_EDIT_NAMESPACE), room, PAGE_ROOM_PREFIX);
});
io.of(PAGE_EDIT_NAMESPACE).adapter.on("join-room", (room, id) => {
if (room.startsWith(PAGE_ROOM_PREFIX)) {
log.debug(`ns:${PAGE_EDIT_NAMESPACE}# socket ${id} joined the room ${room}`);
}
sendCurrentUsers(io.of(PAGE_EDIT_NAMESPACE), room, PAGE_ROOM_PREFIX);
});
app.use(express.static(path.join(__dirname, "static")));
watchMongoDB(io).catch((error) => log.error("Error watching MongoDB", error));
// run the server
server.listen(port, () => {
log.info(`RTS version ${buildVersion} running at http://localhost:${port}`);
});
}
function joinEditRoom(socket: Socket, roomId: string, roomPrefix: string) {
// remove this socket from any other rooms with roomPrefix
if (socket.rooms) {
socket.rooms.forEach(roomName => {
if (roomName.startsWith(roomPrefix)) {
socket.leave(roomName);
}
});
}
// add this socket to room with application id
let roomName = roomPrefix + roomId;
socket.join(roomName);
}
function subscribeToEditEvents(socket: Socket, appRoomPrefix: string) {
socket.on(START_EDIT_EVENT_NAME, (resourceId) => {
if (socket.data.email) { // user is authenticated, join the room now
joinEditRoom(socket, resourceId, appRoomPrefix);
} else { // user not authenticated yet, save the resource id and room prefix to join later after auth
socket.data.pendingRoomId = resourceId;
socket.data.pendingRoomPrefix = appRoomPrefix;
}
});
socket.on(LEAVE_EDIT_EVENT_NAME, (resourceId) => {
let roomName = appRoomPrefix + resourceId;
socket.leave(roomName); // remove this socket from room
});
}
async function onAppSocketConnected(socket: Socket) {
let isAuthenticated = await tryAuth(socket);
if (isAuthenticated) {
socket.join("email:" + socket.data.email);
}
}
async function onPageSocketConnected(socket: Socket, socketIo: Server) {
let isAuthenticated = await tryAuth(socket);
if (isAuthenticated) {
socket.on(MOUSE_POINTER_EVENT_NAME, (event: MousePointerEvent) => {
event.user = new AppUser(socket.data.name, socket.data.email);
event.socketId = socket.id;
socketIo.of(PAGE_EDIT_NAMESPACE).to(PAGE_ROOM_PREFIX + event.pageId).emit(MOUSE_POINTER_EVENT_NAME, event);
});
}
}
async function tryAuth(socket: Socket) {
/* ********************************************************* */
// TODO: This change is not being used at the moment. Instead of using the environment variable API_BASE_URL
// we should be able to derive the API_BASE_URL from the host header. This will make configuration simpler
// for the user. The problem with this implementation is that Axios doesn't work for https endpoints currently.
// This needs to be debugged.
/* ********************************************************* */
// const host = socket.handshake.headers.host;
const connectionCookie = socket?.handshake?.headers?.cookie;
if (connectionCookie === undefined || connectionCookie === null || connectionCookie === "") {
return false;
}
const matchedCookie = connectionCookie.match(/\bSESSION=\S+/);
if (!matchedCookie) {
return false;
}
const sessionCookie = matchedCookie[0];
let response;
try {
response = await axios.request({
method: "GET",
url: API_BASE_URL + "/users/me",
headers: {
Cookie: sessionCookie,
},
});
} catch (error) {
if (error.response?.status === 401) {
console.info("401 received when authenticating user with cookie: " + sessionCookie);
} else if (error.response) {
log.error("Error response received while authentication: ", error.response);
} else {
log.error("Error authenticating", error);
}
return false;
}
const email = response?.data?.data?.email;
const name = response?.data?.data?.name ?? email;
// If the session check API succeeds & the email/name is anonymousUser, then the user is not authenticated
// and we should not allow them to join any rooms
if (email == null || email === "anonymousUser" || name === "anonymousUser") {
return false;
}
socket.data.email = email;
socket.data.name = name;
if (socket.data.pendingRoomId) { // an appId or pageId is pending for this socket, join now
joinEditRoom(socket, socket.data.pendingRoomId, socket.data.pendingRoomPrefix);
}
return true;
}
async function watchMongoDB(io) {
const client = await MongoClient.connect(MONGODB_URI, {useUnifiedTopology: true});
const db = client.db();
const threadCollection: mongodb.Collection<CommentThread> = db.collection("commentThread");
const commentChangeStream = db.collection("comment").watch(
[
// Prevent server-internal fields from being sent to the client.
{
$unset: [
"deletedAt",
"_class",
].map(f => "fullDocument." + f)
},
],
{fullDocument: "updateLookup"}
);
commentChangeStream.on("change", async (event: mongodb.ChangeEventCR<Comment>) => {
let eventName = event.operationType + ":" + event.ns.coll;
const comment: Comment = event.fullDocument;
if (comment.deleted) {
eventName = 'delete' + ":" + event.ns.coll; // emit delete event if deleted=true
}
comment.creationTime = comment.createdAt;
comment.updationTime = comment.updatedAt;
delete comment.createdAt;
delete comment.updatedAt;
delete comment.deleted;
let target = io;
let shouldEmit = false;
for (const email of findPolicyEmails(comment.policies, "read:comments")) {
shouldEmit = true;
target = target.to("email:" + email);
}
if (shouldEmit) {
target.emit(eventName, {comment});
}
});
const threadChangeStream = threadCollection.watch(
[
// Prevent server-internal fields from being sent to the client.
{
$unset: [
"deletedAt",
"_class",
].map(f => "fullDocument." + f)
},
],
{fullDocument: "updateLookup"}
);
threadChangeStream.on("change", async (event: mongodb.ChangeEventCR) => {
let eventName = event.operationType + ":" + event.ns.coll;
const thread = event.fullDocument;
if (thread.deleted) {
eventName = 'delete' + ":" + event.ns.coll; // emit delete event if deleted=true
}
if (thread === null) {
// This happens when `event.operationType === "drop"`, when a comment is deleted.
log.error("Null document recieved for comment change event", event);
return;
}
thread.creationTime = thread.createdAt;
thread.updationTime = thread.updatedAt;
delete thread.createdAt;
delete thread.updatedAt;
delete thread.deleted;
thread.isViewed = false;
let target = io;
let shouldEmit = false;
for (const email of findPolicyEmails(thread.policies, "read:commentThreads")) {
shouldEmit = true;
target = target.to("email:" + email);
}
if (shouldEmit) {
target.emit(eventName, {thread});
}
});
const notificationsStream = db.collection("notification").watch(
[
// Prevent server-internal fields from being sent to the client.
{
$unset: [
"deletedAt",
"deleted",
].map(f => "fullDocument." + f)
},
],
{fullDocument: "updateLookup"}
);
notificationsStream.on("change", async (event: mongodb.ChangeEventCR) => {
const notification = event.fullDocument;
if (notification === null) {
// This happens when `event.operationType === "drop"`, when a notification is deleted.
log.error("Null document recieved for notification change event", event);
return;
}
// set the type from _class attribute
notification.type = notification._class.slice(notification._class.lastIndexOf(".") + 1);
delete notification._class;
const eventName = event.operationType + ":" + event.ns.coll;
io.to("email:" + notification.forUsername).emit(eventName, {notification});
});
process.on('uncaughtExceptionMonitor', (err, origin) => {
log.error(`Caught exception: ${err}\n` + `Exception origin: ${origin}`);
});
process.on('unhandledRejection', (reason, promise) => {
log.debug('Unhandled Rejection at:', promise, 'reason:', reason);
});
process.on("exit", () => {
(commentChangeStream != null ? commentChangeStream.close() : Promise.bind(client).resolve())
.then(client.close.bind(client))
.finally("Fin");
});
log.debug("Watching MongoDB");
}
function findPolicyEmails(policies: Policy[], permission: string): string[] {
const emails: string[] = [];
for (const policy of policies) {
if (policy.permission === permission) {
for (const email of policy.users) {
emails.push(email);
}
break;
}
}
return emails;
}
function sendCurrentUsers(socketIo, roomName: string, roomPrefix: string) {
if (roomName.startsWith(roomPrefix)) {
socketIo.in(roomName).fetchSockets().then(sockets => {
let onlineUsernames = new Set<string>();
let onlineUsers = new Array<AppUser>();
if (sockets) {
sockets.forEach(s => {
if (!onlineUsernames.has(s.data.email)) {
onlineUsers.push(new AppUser(s.data.name, s.data.email));
}
onlineUsernames.add(s.data.email);
});
}
let resourceId = roomName.replace(roomPrefix, ""); // get resourceId from room name by removing the prefix
let response = new CurrentEditorsEvent(resourceId, onlineUsers);
socketIo.to(roomName).emit(EDITORS_EVENT_NAME, response);
});
}
// Initializing Routes
app.use(express.static(path.join(__dirname, "static")));
app.get("/", (_, res) => {
res.redirect("/index.html");
});
// Run the server
server.listen(PORT, () => {
log.info(`RTS version ${buildVersion} running at http://localhost:${PORT}`);
});
}

View File

@ -0,0 +1,68 @@
import { Server, Socket } from "socket.io";
import log from "loglevel";
import {
APP_ROOM_PREFIX,
RELEASE_VERSION_EVENT_NAME,
LEAVE_EDIT_EVENT_NAME,
PAGE_EDIT_NAMESPACE,
PAGE_ROOM_PREFIX,
ROOT_NAMESPACE,
} from "../constants/socket";
import { VERSION as buildVersion } from "../constants/version";
import {
subscribeToEditEvents,
onAppSocketConnected,
onPageSocketConnected,
sendCurrentUsers,
} from "../controllers/socket";
export function watchEvents(io: Server) {
io.on("connection", (socket: Socket) => {
socket.emit(RELEASE_VERSION_EVENT_NAME, buildVersion);
subscribeToEditEvents(socket, APP_ROOM_PREFIX);
onAppSocketConnected(socket).catch((error) =>
log.error("Error in socket connected handler", error)
);
});
io.of(PAGE_EDIT_NAMESPACE).on("connection", (socket: Socket) => {
subscribeToEditEvents(socket, PAGE_ROOM_PREFIX);
onPageSocketConnected(socket, io).catch((error) =>
log.error("Error in socket connected handler", error)
);
});
io.of(ROOT_NAMESPACE).adapter.on("leave-room", (room, id) => {
if (room.startsWith(APP_ROOM_PREFIX)) {
log.debug(`ns:${ROOT_NAMESPACE}# socket ${id} left the room ${room}`);
}
sendCurrentUsers(io, room, APP_ROOM_PREFIX);
});
io.of(ROOT_NAMESPACE).adapter.on("join-room", (room, id) => {
if (room.startsWith(APP_ROOM_PREFIX)) {
log.debug(`ns:${ROOT_NAMESPACE}# socket ${id} joined the room ${room}`);
}
sendCurrentUsers(io, room, APP_ROOM_PREFIX);
});
io.of(PAGE_EDIT_NAMESPACE).adapter.on("leave-room", (room, id) => {
if (room.startsWith(PAGE_ROOM_PREFIX)) {
// someone left the page edit, notify others
log.debug(
`ns:${PAGE_EDIT_NAMESPACE} # socket ${id} left the room ${room}`
);
io.of(PAGE_EDIT_NAMESPACE).to(room).emit(LEAVE_EDIT_EVENT_NAME, id);
}
sendCurrentUsers(io.of(PAGE_EDIT_NAMESPACE), room, PAGE_ROOM_PREFIX);
});
io.of(PAGE_EDIT_NAMESPACE).adapter.on("join-room", (room, id) => {
if (room.startsWith(PAGE_ROOM_PREFIX)) {
log.debug(
`ns:${PAGE_EDIT_NAMESPACE}# socket ${id} joined the room ${room}`
);
}
sendCurrentUsers(io.of(PAGE_EDIT_NAMESPACE), room, PAGE_ROOM_PREFIX);
});
}

View File

@ -0,0 +1,10 @@
import { watchMongoDB } from "./mongo";
import { watchEvents } from "./events";
import { Server } from "socket.io";
import log from "loglevel";
// Initializing Multiple Sockets
export function initializeSockets(io: Server) {
watchMongoDB(io).catch((error) => log.error("Error watching MongoDB", error));
watchEvents(io);
}

View File

@ -0,0 +1,157 @@
import type mongodb from "mongodb";
import log from "loglevel";
import { MongoClient } from "mongodb";
import { CommentThread, Comment } from "../utils/models";
import { findPolicyEmails } from "../controllers/socket";
const MONGODB_URI = process.env.APPSMITH_MONGODB_URI;
export async function watchMongoDB(io) {
const client = await MongoClient.connect(MONGODB_URI, {
useUnifiedTopology: true,
});
const db = client.db();
const threadCollection: mongodb.Collection<CommentThread> =
db.collection("commentThread");
const commentChangeStream = db.collection("comment").watch(
[
// Prevent server-internal fields from being sent to the client.
{
$unset: ["deletedAt", "_class"].map((f) => "fullDocument." + f),
},
],
{ fullDocument: "updateLookup" }
);
commentChangeStream.on(
"change",
async (event: mongodb.ChangeEventCR<Comment>) => {
let eventName = event.operationType + ":" + event.ns.coll;
const comment: Comment = event.fullDocument;
if (comment.deleted) {
eventName = "delete" + ":" + event.ns.coll; // emit delete event if deleted=true
}
comment.creationTime = comment.createdAt;
comment.updationTime = comment.updatedAt;
delete comment.createdAt;
delete comment.updatedAt;
delete comment.deleted;
let target = io;
let shouldEmit = false;
for (const email of findPolicyEmails(comment.policies, "read:comments")) {
shouldEmit = true;
target = target.to("email:" + email);
}
if (shouldEmit) {
target.emit(eventName, { comment });
}
}
);
const threadChangeStream = threadCollection.watch(
[
// Prevent server-internal fields from being sent to the client.
{
$unset: ["deletedAt", "_class"].map((f) => "fullDocument." + f),
},
],
{ fullDocument: "updateLookup" }
);
threadChangeStream.on("change", async (event: mongodb.ChangeEventCR) => {
let eventName = event.operationType + ":" + event.ns.coll;
const thread = event.fullDocument;
if (thread.deleted) {
eventName = "delete" + ":" + event.ns.coll; // emit delete event if deleted=true
}
if (thread === null) {
// This happens when `event.operationType === "drop"`, when a comment is deleted.
log.error("Null document recieved for comment change event", event);
return;
}
thread.creationTime = thread.createdAt;
thread.updationTime = thread.updatedAt;
delete thread.createdAt;
delete thread.updatedAt;
delete thread.deleted;
thread.isViewed = false;
let target = io;
let shouldEmit = false;
for (const email of findPolicyEmails(
thread.policies,
"read:commentThreads"
)) {
shouldEmit = true;
target = target.to("email:" + email);
}
if (shouldEmit) {
target.emit(eventName, { thread });
}
});
const notificationsStream = db.collection("notification").watch(
[
// Prevent server-internal fields from being sent to the client.
{
$unset: ["deletedAt", "deleted"].map((f) => "fullDocument." + f),
},
],
{ fullDocument: "updateLookup" }
);
notificationsStream.on("change", async (event: mongodb.ChangeEventCR) => {
const notification = event.fullDocument;
if (notification === null) {
// This happens when `event.operationType === "drop"`, when a notification is deleted.
log.error("Null document recieved for notification change event", event);
return;
}
// set the type from _class attribute
notification.type = notification._class.slice(
notification._class.lastIndexOf(".") + 1
);
delete notification._class;
const eventName = event.operationType + ":" + event.ns.coll;
io.to("email:" + notification.forUsername).emit(eventName, {
notification,
});
});
process.on("uncaughtExceptionMonitor", (err, origin) => {
log.error(`Caught exception: ${err}\n` + `Exception origin: ${origin}`);
});
process.on("unhandledRejection", (reason, promise) => {
log.debug("Unhandled Rejection at:", promise, "reason:", reason);
});
process.on("exit", () => {
(commentChangeStream != null
? commentChangeStream.close()
: Promise.bind(client).resolve()
)
.then(client.close.bind(client))
.finally("Fin");
});
log.debug("Watching MongoDB");
}

View File

@ -1 +0,0 @@
export const VERSION = "SNAPSHOT";