diff --git a/app/rts/.env.example b/app/rts/.env.example index 9477484fb2..6fb9ad4e9f 100644 --- a/app/rts/.env.example +++ b/app/rts/.env.example @@ -1,2 +1,3 @@ APPSMITH_MONGODB_URI=mongodb://localhost:27017/appsmith -APPSMITH_API_BASE_URL=http://localhost:8080/api/v1 \ No newline at end of file +APPSMITH_API_BASE_URL=http://localhost:8080/api/v1 +PORT=8091 \ No newline at end of file diff --git a/app/rts/src/constants/socket.ts b/app/rts/src/constants/socket.ts new file mode 100644 index 0000000000..66601713b2 --- /dev/null +++ b/app/rts/src/constants/socket.ts @@ -0,0 +1,10 @@ +export const APP_ROOM_PREFIX = "app:"; +export const PAGE_ROOM_PREFIX = "page:"; +export const ROOT_NAMESPACE = "/"; +export const PAGE_EDIT_NAMESPACE = "/page/edit"; + +export const EDITORS_EVENT_NAME = "collab:online_editors"; +export const START_EDIT_EVENT_NAME = "collab:start_edit"; +export const LEAVE_EDIT_EVENT_NAME = "collab:leave_edit"; +export const MOUSE_POINTER_EVENT_NAME = "collab:mouse_pointer"; +export const RELEASE_VERSION_EVENT_NAME = "info:release_version"; \ No newline at end of file diff --git a/app/rts/src/constants/version.ts b/app/rts/src/constants/version.ts new file mode 100644 index 0000000000..4a88868350 --- /dev/null +++ b/app/rts/src/constants/version.ts @@ -0,0 +1 @@ +export const VERSION = "SNAPSHOT"; \ No newline at end of file diff --git a/app/rts/src/controllers/socket.ts b/app/rts/src/controllers/socket.ts new file mode 100644 index 0000000000..6fc51e2916 --- /dev/null +++ b/app/rts/src/controllers/socket.ts @@ -0,0 +1,128 @@ +import { Server, Socket } from "socket.io"; +import { tryAuth } from "../middlewares/socket-auth"; +import { + START_EDIT_EVENT_NAME, + LEAVE_EDIT_EVENT_NAME, + MOUSE_POINTER_EVENT_NAME, + PAGE_EDIT_NAMESPACE, + PAGE_ROOM_PREFIX, + EDITORS_EVENT_NAME, +} from "../constants/socket"; +import { + AppUser, + Policy, + CurrentEditorsEvent, + MousePointerEvent, +} from "../utils/models"; + +function subscribeToEditEvents(socket: Socket, appRoomPrefix: string) { + socket.on(START_EDIT_EVENT_NAME, (resourceId) => { + if (socket.data.email) { + // user is authenticated, join the room now + joinEditRoom(socket, resourceId, appRoomPrefix); + } else { + // user not authenticated yet, save the resource id and room prefix to join later after auth + socket.data.pendingRoomId = resourceId; + socket.data.pendingRoomPrefix = appRoomPrefix; + } + }); + + socket.on(LEAVE_EDIT_EVENT_NAME, (resourceId) => { + let roomName = appRoomPrefix + resourceId; + socket.leave(roomName); // remove this socket from room + }); +} + +async function onAppSocketConnected(socket: Socket) { + let isAuthenticated = await tryAuthAndJoinPendingRoom(socket); + if (isAuthenticated) { + socket.join("email:" + socket.data.email); + } +} + +async function onPageSocketConnected(socket: Socket, socketIo: Server) { + let isAuthenticated = await tryAuthAndJoinPendingRoom(socket); + if (isAuthenticated) { + socket.on(MOUSE_POINTER_EVENT_NAME, (event: MousePointerEvent) => { + event.user = new AppUser(socket.data.name, socket.data.email); + event.socketId = socket.id; + socketIo + .of(PAGE_EDIT_NAMESPACE) + .to(PAGE_ROOM_PREFIX + event.pageId) + .emit(MOUSE_POINTER_EVENT_NAME, event); + }); + } +} + +async function tryAuthAndJoinPendingRoom(socket: Socket) { + const isAuthenticated = await tryAuth(socket); + if (socket.data.pendingRoomId) { + // an appId or pageId is pending for this socket, join now + joinEditRoom( + socket, + socket.data.pendingRoomId, + socket.data.pendingRoomPrefix + ); + } + + return isAuthenticated; +} + +function joinEditRoom(socket: Socket, roomId: string, roomPrefix: string) { + // remove this socket from any other rooms with roomPrefix + if (socket.rooms) { + socket.rooms.forEach((roomName) => { + if (roomName.startsWith(roomPrefix)) { + socket.leave(roomName); + } + }); + } + + // add this socket to room with application id + let roomName = roomPrefix + roomId; + socket.join(roomName); +} + +function findPolicyEmails(policies: Policy[], permission: string): string[] { + const emails: string[] = []; + for (const policy of policies) { + if (policy.permission === permission) { + for (const email of policy.users) { + emails.push(email); + } + break; + } + } + return emails; +} + +function sendCurrentUsers(socketIo, roomName: string, roomPrefix: string) { + if (roomName.startsWith(roomPrefix)) { + socketIo + .in(roomName) + .fetchSockets() + .then((sockets) => { + let onlineUsernames = new Set(); + let onlineUsers = new Array(); + if (sockets) { + sockets.forEach((s) => { + if (!onlineUsernames.has(s.data.email)) { + onlineUsers.push(new AppUser(s.data.name, s.data.email)); + } + onlineUsernames.add(s.data.email); + }); + } + let resourceId = roomName.replace(roomPrefix, ""); // get resourceId from room name by removing the prefix + let response = new CurrentEditorsEvent(resourceId, onlineUsers); + socketIo.to(roomName).emit(EDITORS_EVENT_NAME, response); + }); + } +} + +export { + subscribeToEditEvents, + onAppSocketConnected, + onPageSocketConnected, + sendCurrentUsers, + findPolicyEmails, +}; diff --git a/app/rts/src/middlewares/socket-auth.ts b/app/rts/src/middlewares/socket-auth.ts new file mode 100644 index 0000000000..3e9754f31e --- /dev/null +++ b/app/rts/src/middlewares/socket-auth.ts @@ -0,0 +1,69 @@ +import { Socket } from "socket.io"; +import log from "loglevel"; +import axios from "axios"; + +const API_BASE_URL = process.env.APPSMITH_API_BASE_URL; + +export async function tryAuth(socket: Socket) { + /* ********************************************************* */ + // TODO: This change is not being used at the moment. Instead of using the environment variable API_BASE_URL + // we should be able to derive the API_BASE_URL from the host header. This will make configuration simpler + // for the user. The problem with this implementation is that Axios doesn't work for https endpoints currently. + // This needs to be debugged. + /* ********************************************************* */ + + // const host = socket.handshake.headers.host; + const connectionCookie = socket?.handshake?.headers?.cookie; + if ( + connectionCookie === undefined || + connectionCookie === null || + connectionCookie === "" + ) { + return false; + } + + const matchedCookie = connectionCookie.match(/\bSESSION=\S+/); + if (!matchedCookie) { + return false; + } + + const sessionCookie = matchedCookie[0]; + let response; + try { + response = await axios.request({ + method: "GET", + url: API_BASE_URL + "/users/me", + headers: { + Cookie: sessionCookie, + }, + }); + } catch (error) { + if (error.response?.status === 401) { + console.info( + "401 received when authenticating user with cookie: " + sessionCookie + ); + } else if (error.response) { + log.error( + "Error response received while authentication: ", + error.response + ); + } else { + log.error("Error authenticating", error); + } + return false; + } + + const email = response?.data?.data?.email; + const name = response?.data?.data?.name ?? email; + + // If the session check API succeeds & the email/name is anonymousUser, then the user is not authenticated + // and we should not allow them to join any rooms + if (email == null || email === "anonymousUser" || name === "anonymousUser") { + return false; + } + + socket.data.email = email; + socket.data.name = name; + + return true; +} \ No newline at end of file diff --git a/app/rts/src/server.ts b/app/rts/src/server.ts index f5ba172f68..90a1cb36db 100644 --- a/app/rts/src/server.ts +++ b/app/rts/src/server.ts @@ -1,394 +1,59 @@ import http from "http"; import path from "path"; import express from "express"; -import {Server, Socket} from "socket.io"; -import type mongodb from "mongodb"; -import {MongoClient} from "mongodb"; -import axios from "axios"; -import log, {LogLevelDesc} from "loglevel"; - -import {AppUser, Comment, CommentThread, CurrentEditorsEvent, MousePointerEvent, Policy} from "./models"; -import {VERSION as buildVersion} from "./version"; // release version of the api +import { Server } from "socket.io"; +import log, { LogLevelDesc } from "loglevel"; +import { VERSION as buildVersion } from "./constants/version"; // release version of the api +import { initializeSockets } from "./sockets"; const RTS_BASE_PATH = "/rts"; -const APP_ROOM_PREFIX = "app:"; -const PAGE_ROOM_PREFIX = "page:"; -const ROOT_NAMESPACE = "/"; -const PAGE_EDIT_NAMESPACE = "/page/edit"; - -const EDITORS_EVENT_NAME = "collab:online_editors"; -const START_EDIT_EVENT_NAME = "collab:start_edit"; -const LEAVE_EDIT_EVENT_NAME = "collab:leave_edit"; -const MOUSE_POINTER_EVENT_NAME = "collab:mouse_pointer"; -const RELEASE_VERSION_EVENT_NAME = "info:release_version"; - // Setting the logLevel for all log messages -const logLevel: LogLevelDesc = (process.env.APPSMITH_LOG_LEVEL || "debug") as LogLevelDesc; +const logLevel: LogLevelDesc = (process.env.APPSMITH_LOG_LEVEL || + "debug") as LogLevelDesc; log.setLevel(logLevel); +// Verifing Environment Variables const MONGODB_URI = process.env.APPSMITH_MONGODB_URI; -if (MONGODB_URI == null || MONGODB_URI === "" || !MONGODB_URI.startsWith("mongodb")) { - log.error("Please provide a valid value for `APPSMITH_MONGODB_URI`."); - process.exit(1); +if ( + MONGODB_URI == null || + MONGODB_URI === "" || + !MONGODB_URI.startsWith("mongodb") +) { + log.error("Please provide a valid value for `APPSMITH_MONGODB_URI`."); + process.exit(1); } const API_BASE_URL = process.env.APPSMITH_API_BASE_URL; if (API_BASE_URL == null || API_BASE_URL === "") { - log.error("Please provide a valid value for `APPSMITH_API_BASE_URL`."); - process.exit(1); + log.error("Please provide a valid value for `APPSMITH_API_BASE_URL`."); + process.exit(1); } +const PORT = process.env.PORT || 8091; + main(); function main() { - const app = express(); - //Disable x-powered-by header to prevent information disclosure - app.disable("x-powered-by"); - const server = new http.Server(app); - const io = new Server(server, { - path: RTS_BASE_PATH, - }); + const app = express(); + //Disable x-powered-by header to prevent information disclosure + app.disable("x-powered-by"); + const server = new http.Server(app); + const io = new Server(server, { + path: RTS_BASE_PATH, + }); - const port = 8091; + // Initializing Sockets + initializeSockets(io); - app.get("/", (req, res) => { - res.redirect("/index.html"); - }); - - io.on("connection", (socket: Socket) => { - socket.emit(RELEASE_VERSION_EVENT_NAME, buildVersion); - subscribeToEditEvents(socket, APP_ROOM_PREFIX); - onAppSocketConnected(socket) - .catch((error) => log.error("Error in socket connected handler", error)); - }); - - io.of(PAGE_EDIT_NAMESPACE).on("connection", (socket: Socket) => { - subscribeToEditEvents(socket, PAGE_ROOM_PREFIX); - onPageSocketConnected(socket, io) - .catch((error) => log.error("Error in socket connected handler", error)); - }); - - io.of(ROOT_NAMESPACE).adapter.on("leave-room", (room, id) => { - if (room.startsWith(APP_ROOM_PREFIX)) { - log.debug(`ns:${ROOT_NAMESPACE}# socket ${id} left the room ${room}`); - } - sendCurrentUsers(io, room, APP_ROOM_PREFIX); - }); - - io.of(ROOT_NAMESPACE).adapter.on("join-room", (room, id) => { - if (room.startsWith(APP_ROOM_PREFIX)) { - log.debug(`ns:${ROOT_NAMESPACE}# socket ${id} joined the room ${room}`); - } - sendCurrentUsers(io, room, APP_ROOM_PREFIX); - }); - - io.of(PAGE_EDIT_NAMESPACE).adapter.on("leave-room", (room, id) => { - if (room.startsWith(PAGE_ROOM_PREFIX)) { // someone left the page edit, notify others - log.debug(`ns:${PAGE_EDIT_NAMESPACE} # socket ${id} left the room ${room}`); - io.of(PAGE_EDIT_NAMESPACE).to(room).emit(LEAVE_EDIT_EVENT_NAME, id); - } - sendCurrentUsers(io.of(PAGE_EDIT_NAMESPACE), room, PAGE_ROOM_PREFIX); - }); - - io.of(PAGE_EDIT_NAMESPACE).adapter.on("join-room", (room, id) => { - if (room.startsWith(PAGE_ROOM_PREFIX)) { - log.debug(`ns:${PAGE_EDIT_NAMESPACE}# socket ${id} joined the room ${room}`); - } - sendCurrentUsers(io.of(PAGE_EDIT_NAMESPACE), room, PAGE_ROOM_PREFIX); - }); - - app.use(express.static(path.join(__dirname, "static"))); - - watchMongoDB(io).catch((error) => log.error("Error watching MongoDB", error)); - - // run the server - server.listen(port, () => { - log.info(`RTS version ${buildVersion} running at http://localhost:${port}`); - }); -} - -function joinEditRoom(socket: Socket, roomId: string, roomPrefix: string) { - // remove this socket from any other rooms with roomPrefix - if (socket.rooms) { - socket.rooms.forEach(roomName => { - if (roomName.startsWith(roomPrefix)) { - socket.leave(roomName); - } - }); - } - - // add this socket to room with application id - let roomName = roomPrefix + roomId; - socket.join(roomName); -} - -function subscribeToEditEvents(socket: Socket, appRoomPrefix: string) { - socket.on(START_EDIT_EVENT_NAME, (resourceId) => { - if (socket.data.email) { // user is authenticated, join the room now - joinEditRoom(socket, resourceId, appRoomPrefix); - } else { // user not authenticated yet, save the resource id and room prefix to join later after auth - socket.data.pendingRoomId = resourceId; - socket.data.pendingRoomPrefix = appRoomPrefix; - } - }); - - socket.on(LEAVE_EDIT_EVENT_NAME, (resourceId) => { - let roomName = appRoomPrefix + resourceId; - socket.leave(roomName); // remove this socket from room - }); -} - -async function onAppSocketConnected(socket: Socket) { - let isAuthenticated = await tryAuth(socket); - if (isAuthenticated) { - socket.join("email:" + socket.data.email); - } -} - -async function onPageSocketConnected(socket: Socket, socketIo: Server) { - let isAuthenticated = await tryAuth(socket); - if (isAuthenticated) { - socket.on(MOUSE_POINTER_EVENT_NAME, (event: MousePointerEvent) => { - event.user = new AppUser(socket.data.name, socket.data.email); - event.socketId = socket.id; - socketIo.of(PAGE_EDIT_NAMESPACE).to(PAGE_ROOM_PREFIX + event.pageId).emit(MOUSE_POINTER_EVENT_NAME, event); - }); - } -} - -async function tryAuth(socket: Socket) { - - /* ********************************************************* */ - // TODO: This change is not being used at the moment. Instead of using the environment variable API_BASE_URL - // we should be able to derive the API_BASE_URL from the host header. This will make configuration simpler - // for the user. The problem with this implementation is that Axios doesn't work for https endpoints currently. - // This needs to be debugged. - /* ********************************************************* */ - - // const host = socket.handshake.headers.host; - const connectionCookie = socket?.handshake?.headers?.cookie; - if (connectionCookie === undefined || connectionCookie === null || connectionCookie === "") { - return false; - } - - const matchedCookie = connectionCookie.match(/\bSESSION=\S+/); - if (!matchedCookie) { - return false; - } - - const sessionCookie = matchedCookie[0]; - let response; - try { - response = await axios.request({ - method: "GET", - url: API_BASE_URL + "/users/me", - headers: { - Cookie: sessionCookie, - }, - }); - } catch (error) { - if (error.response?.status === 401) { - console.info("401 received when authenticating user with cookie: " + sessionCookie); - } else if (error.response) { - log.error("Error response received while authentication: ", error.response); - } else { - log.error("Error authenticating", error); - } - return false; - } - - const email = response?.data?.data?.email; - const name = response?.data?.data?.name ?? email; - - // If the session check API succeeds & the email/name is anonymousUser, then the user is not authenticated - // and we should not allow them to join any rooms - if (email == null || email === "anonymousUser" || name === "anonymousUser") { - return false; - } - - socket.data.email = email; - socket.data.name = name; - - if (socket.data.pendingRoomId) { // an appId or pageId is pending for this socket, join now - joinEditRoom(socket, socket.data.pendingRoomId, socket.data.pendingRoomPrefix); - } - - return true; -} - -async function watchMongoDB(io) { - const client = await MongoClient.connect(MONGODB_URI, {useUnifiedTopology: true}); - const db = client.db(); - - const threadCollection: mongodb.Collection = db.collection("commentThread"); - - const commentChangeStream = db.collection("comment").watch( - [ - // Prevent server-internal fields from being sent to the client. - { - $unset: [ - "deletedAt", - "_class", - ].map(f => "fullDocument." + f) - }, - ], - {fullDocument: "updateLookup"} - ); - - commentChangeStream.on("change", async (event: mongodb.ChangeEventCR) => { - let eventName = event.operationType + ":" + event.ns.coll; - - const comment: Comment = event.fullDocument; - if (comment.deleted) { - eventName = 'delete' + ":" + event.ns.coll; // emit delete event if deleted=true - } - - comment.creationTime = comment.createdAt; - comment.updationTime = comment.updatedAt; - - delete comment.createdAt; - delete comment.updatedAt; - delete comment.deleted; - - let target = io; - let shouldEmit = false; - - for (const email of findPolicyEmails(comment.policies, "read:comments")) { - shouldEmit = true; - target = target.to("email:" + email); - } - - if (shouldEmit) { - target.emit(eventName, {comment}); - } - }); - - const threadChangeStream = threadCollection.watch( - [ - // Prevent server-internal fields from being sent to the client. - { - $unset: [ - "deletedAt", - "_class", - ].map(f => "fullDocument." + f) - }, - ], - {fullDocument: "updateLookup"} - ); - - threadChangeStream.on("change", async (event: mongodb.ChangeEventCR) => { - let eventName = event.operationType + ":" + event.ns.coll; - - const thread = event.fullDocument; - if (thread.deleted) { - eventName = 'delete' + ":" + event.ns.coll; // emit delete event if deleted=true - } - - if (thread === null) { - // This happens when `event.operationType === "drop"`, when a comment is deleted. - log.error("Null document recieved for comment change event", event); - return; - } - - thread.creationTime = thread.createdAt; - thread.updationTime = thread.updatedAt; - - delete thread.createdAt; - delete thread.updatedAt; - delete thread.deleted; - - thread.isViewed = false; - - let target = io; - let shouldEmit = false; - - for (const email of findPolicyEmails(thread.policies, "read:commentThreads")) { - shouldEmit = true; - target = target.to("email:" + email); - } - - if (shouldEmit) { - target.emit(eventName, {thread}); - } - }); - - const notificationsStream = db.collection("notification").watch( - [ - // Prevent server-internal fields from being sent to the client. - { - $unset: [ - "deletedAt", - "deleted", - ].map(f => "fullDocument." + f) - }, - ], - {fullDocument: "updateLookup"} - ); - - notificationsStream.on("change", async (event: mongodb.ChangeEventCR) => { - const notification = event.fullDocument; - - if (notification === null) { - // This happens when `event.operationType === "drop"`, when a notification is deleted. - log.error("Null document recieved for notification change event", event); - return; - } - - // set the type from _class attribute - notification.type = notification._class.slice(notification._class.lastIndexOf(".") + 1); - delete notification._class; - - const eventName = event.operationType + ":" + event.ns.coll; - io.to("email:" + notification.forUsername).emit(eventName, {notification}); - }); - - process.on('uncaughtExceptionMonitor', (err, origin) => { - log.error(`Caught exception: ${err}\n` + `Exception origin: ${origin}`); - }); - - process.on('unhandledRejection', (reason, promise) => { - log.debug('Unhandled Rejection at:', promise, 'reason:', reason); - }); - - process.on("exit", () => { - (commentChangeStream != null ? commentChangeStream.close() : Promise.bind(client).resolve()) - .then(client.close.bind(client)) - .finally("Fin"); - }); - - log.debug("Watching MongoDB"); -} - -function findPolicyEmails(policies: Policy[], permission: string): string[] { - const emails: string[] = []; - for (const policy of policies) { - if (policy.permission === permission) { - for (const email of policy.users) { - emails.push(email); - } - break; - } - } - return emails; -} - -function sendCurrentUsers(socketIo, roomName: string, roomPrefix: string) { - if (roomName.startsWith(roomPrefix)) { - socketIo.in(roomName).fetchSockets().then(sockets => { - let onlineUsernames = new Set(); - let onlineUsers = new Array(); - if (sockets) { - sockets.forEach(s => { - if (!onlineUsernames.has(s.data.email)) { - onlineUsers.push(new AppUser(s.data.name, s.data.email)); - } - onlineUsernames.add(s.data.email); - }); - } - let resourceId = roomName.replace(roomPrefix, ""); // get resourceId from room name by removing the prefix - let response = new CurrentEditorsEvent(resourceId, onlineUsers); - socketIo.to(roomName).emit(EDITORS_EVENT_NAME, response); - }); - } + // Initializing Routes + app.use(express.static(path.join(__dirname, "static"))); + app.get("/", (_, res) => { + res.redirect("/index.html"); + }); + + // Run the server + server.listen(PORT, () => { + log.info(`RTS version ${buildVersion} running at http://localhost:${PORT}`); + }); } diff --git a/app/rts/src/sockets/events.ts b/app/rts/src/sockets/events.ts new file mode 100644 index 0000000000..61a24a31fa --- /dev/null +++ b/app/rts/src/sockets/events.ts @@ -0,0 +1,68 @@ +import { Server, Socket } from "socket.io"; +import log from "loglevel"; +import { + APP_ROOM_PREFIX, + RELEASE_VERSION_EVENT_NAME, + LEAVE_EDIT_EVENT_NAME, + PAGE_EDIT_NAMESPACE, + PAGE_ROOM_PREFIX, + ROOT_NAMESPACE, +} from "../constants/socket"; +import { VERSION as buildVersion } from "../constants/version"; +import { + subscribeToEditEvents, + onAppSocketConnected, + onPageSocketConnected, + sendCurrentUsers, +} from "../controllers/socket"; + +export function watchEvents(io: Server) { + io.on("connection", (socket: Socket) => { + socket.emit(RELEASE_VERSION_EVENT_NAME, buildVersion); + subscribeToEditEvents(socket, APP_ROOM_PREFIX); + onAppSocketConnected(socket).catch((error) => + log.error("Error in socket connected handler", error) + ); + }); + + io.of(PAGE_EDIT_NAMESPACE).on("connection", (socket: Socket) => { + subscribeToEditEvents(socket, PAGE_ROOM_PREFIX); + onPageSocketConnected(socket, io).catch((error) => + log.error("Error in socket connected handler", error) + ); + }); + + io.of(ROOT_NAMESPACE).adapter.on("leave-room", (room, id) => { + if (room.startsWith(APP_ROOM_PREFIX)) { + log.debug(`ns:${ROOT_NAMESPACE}# socket ${id} left the room ${room}`); + } + sendCurrentUsers(io, room, APP_ROOM_PREFIX); + }); + + io.of(ROOT_NAMESPACE).adapter.on("join-room", (room, id) => { + if (room.startsWith(APP_ROOM_PREFIX)) { + log.debug(`ns:${ROOT_NAMESPACE}# socket ${id} joined the room ${room}`); + } + sendCurrentUsers(io, room, APP_ROOM_PREFIX); + }); + + io.of(PAGE_EDIT_NAMESPACE).adapter.on("leave-room", (room, id) => { + if (room.startsWith(PAGE_ROOM_PREFIX)) { + // someone left the page edit, notify others + log.debug( + `ns:${PAGE_EDIT_NAMESPACE} # socket ${id} left the room ${room}` + ); + io.of(PAGE_EDIT_NAMESPACE).to(room).emit(LEAVE_EDIT_EVENT_NAME, id); + } + sendCurrentUsers(io.of(PAGE_EDIT_NAMESPACE), room, PAGE_ROOM_PREFIX); + }); + + io.of(PAGE_EDIT_NAMESPACE).adapter.on("join-room", (room, id) => { + if (room.startsWith(PAGE_ROOM_PREFIX)) { + log.debug( + `ns:${PAGE_EDIT_NAMESPACE}# socket ${id} joined the room ${room}` + ); + } + sendCurrentUsers(io.of(PAGE_EDIT_NAMESPACE), room, PAGE_ROOM_PREFIX); + }); +} \ No newline at end of file diff --git a/app/rts/src/sockets/index.ts b/app/rts/src/sockets/index.ts new file mode 100644 index 0000000000..18d9ce9e40 --- /dev/null +++ b/app/rts/src/sockets/index.ts @@ -0,0 +1,10 @@ +import { watchMongoDB } from "./mongo"; +import { watchEvents } from "./events"; +import { Server } from "socket.io"; +import log from "loglevel"; + +// Initializing Multiple Sockets +export function initializeSockets(io: Server) { + watchMongoDB(io).catch((error) => log.error("Error watching MongoDB", error)); + watchEvents(io); +} \ No newline at end of file diff --git a/app/rts/src/sockets/mongo.ts b/app/rts/src/sockets/mongo.ts new file mode 100644 index 0000000000..781a446c26 --- /dev/null +++ b/app/rts/src/sockets/mongo.ts @@ -0,0 +1,157 @@ +import type mongodb from "mongodb"; +import log from "loglevel"; +import { MongoClient } from "mongodb"; +import { CommentThread, Comment } from "../utils/models"; +import { findPolicyEmails } from "../controllers/socket"; + +const MONGODB_URI = process.env.APPSMITH_MONGODB_URI; + +export async function watchMongoDB(io) { + const client = await MongoClient.connect(MONGODB_URI, { + useUnifiedTopology: true, + }); + const db = client.db(); + + const threadCollection: mongodb.Collection = + db.collection("commentThread"); + + const commentChangeStream = db.collection("comment").watch( + [ + // Prevent server-internal fields from being sent to the client. + { + $unset: ["deletedAt", "_class"].map((f) => "fullDocument." + f), + }, + ], + { fullDocument: "updateLookup" } + ); + + commentChangeStream.on( + "change", + async (event: mongodb.ChangeEventCR) => { + let eventName = event.operationType + ":" + event.ns.coll; + + const comment: Comment = event.fullDocument; + if (comment.deleted) { + eventName = "delete" + ":" + event.ns.coll; // emit delete event if deleted=true + } + + comment.creationTime = comment.createdAt; + comment.updationTime = comment.updatedAt; + + delete comment.createdAt; + delete comment.updatedAt; + delete comment.deleted; + + let target = io; + let shouldEmit = false; + + for (const email of findPolicyEmails(comment.policies, "read:comments")) { + shouldEmit = true; + target = target.to("email:" + email); + } + + if (shouldEmit) { + target.emit(eventName, { comment }); + } + } + ); + + const threadChangeStream = threadCollection.watch( + [ + // Prevent server-internal fields from being sent to the client. + { + $unset: ["deletedAt", "_class"].map((f) => "fullDocument." + f), + }, + ], + { fullDocument: "updateLookup" } + ); + + threadChangeStream.on("change", async (event: mongodb.ChangeEventCR) => { + let eventName = event.operationType + ":" + event.ns.coll; + + const thread = event.fullDocument; + if (thread.deleted) { + eventName = "delete" + ":" + event.ns.coll; // emit delete event if deleted=true + } + + if (thread === null) { + // This happens when `event.operationType === "drop"`, when a comment is deleted. + log.error("Null document recieved for comment change event", event); + return; + } + + thread.creationTime = thread.createdAt; + thread.updationTime = thread.updatedAt; + + delete thread.createdAt; + delete thread.updatedAt; + delete thread.deleted; + + thread.isViewed = false; + + let target = io; + let shouldEmit = false; + + for (const email of findPolicyEmails( + thread.policies, + "read:commentThreads" + )) { + shouldEmit = true; + target = target.to("email:" + email); + } + + if (shouldEmit) { + target.emit(eventName, { thread }); + } + }); + + const notificationsStream = db.collection("notification").watch( + [ + // Prevent server-internal fields from being sent to the client. + { + $unset: ["deletedAt", "deleted"].map((f) => "fullDocument." + f), + }, + ], + { fullDocument: "updateLookup" } + ); + + notificationsStream.on("change", async (event: mongodb.ChangeEventCR) => { + const notification = event.fullDocument; + + if (notification === null) { + // This happens when `event.operationType === "drop"`, when a notification is deleted. + log.error("Null document recieved for notification change event", event); + return; + } + + // set the type from _class attribute + notification.type = notification._class.slice( + notification._class.lastIndexOf(".") + 1 + ); + delete notification._class; + + const eventName = event.operationType + ":" + event.ns.coll; + io.to("email:" + notification.forUsername).emit(eventName, { + notification, + }); + }); + + process.on("uncaughtExceptionMonitor", (err, origin) => { + log.error(`Caught exception: ${err}\n` + `Exception origin: ${origin}`); + }); + + process.on("unhandledRejection", (reason, promise) => { + log.debug("Unhandled Rejection at:", promise, "reason:", reason); + }); + + process.on("exit", () => { + (commentChangeStream != null + ? commentChangeStream.close() + : Promise.bind(client).resolve() + ) + .then(client.close.bind(client)) + .finally("Fin"); + }); + + log.debug("Watching MongoDB"); +} diff --git a/app/rts/src/models.ts b/app/rts/src/utils/models.ts similarity index 100% rename from app/rts/src/models.ts rename to app/rts/src/utils/models.ts diff --git a/app/rts/src/version.js b/app/rts/src/version.js deleted file mode 100644 index 009d8341d5..0000000000 --- a/app/rts/src/version.js +++ /dev/null @@ -1 +0,0 @@ -export const VERSION = "SNAPSHOT";