import http from "http"; import path from "path"; import express from "express"; import {Server, Socket} from "socket.io"; import type mongodb from "mongodb"; import {MongoClient} from "mongodb"; import axios from "axios"; import log, {LogLevelDesc} from "loglevel"; import {AppUser, Comment, CommentThread, CurrentEditorsEvent, MousePointerEvent, Policy} from "./models"; import {VERSION as buildVersion} from "./version"; // release version of the api const RTS_BASE_PATH = "/rts"; const APP_ROOM_PREFIX = "app:"; const PAGE_ROOM_PREFIX = "page:"; const ROOT_NAMESPACE = "/"; const PAGE_EDIT_NAMESPACE = "/page/edit"; const EDITORS_EVENT_NAME = "collab:online_editors"; const START_EDIT_EVENT_NAME = "collab:start_edit"; const LEAVE_EDIT_EVENT_NAME = "collab:leave_edit"; const MOUSE_POINTER_EVENT_NAME = "collab:mouse_pointer"; const RELEASE_VERSION_EVENT_NAME = "info:release_version"; // Setting the logLevel for all log messages const logLevel: LogLevelDesc = (process.env.APPSMITH_LOG_LEVEL || "debug") as LogLevelDesc; log.setLevel(logLevel); const MONGODB_URI = process.env.APPSMITH_MONGODB_URI; if (MONGODB_URI == null || MONGODB_URI === "" || !MONGODB_URI.startsWith("mongodb")) { log.error("Please provide a valid value for `APPSMITH_MONGODB_URI`."); process.exit(1); } const API_BASE_URL = process.env.APPSMITH_API_BASE_URL; if (API_BASE_URL == null || API_BASE_URL === "") { log.error("Please provide a valid value for `APPSMITH_API_BASE_URL`."); process.exit(1); } main(); function main() { const app = express(); //Disable x-powered-by header to prevent information disclosure app.disable("x-powered-by"); const server = new http.Server(app); const io = new Server(server, { path: RTS_BASE_PATH, }); const port = 8091; app.get("/", (req, res) => { res.redirect("/index.html"); }); io.on("connection", (socket: Socket) => { socket.emit(RELEASE_VERSION_EVENT_NAME, buildVersion); subscribeToEditEvents(socket, APP_ROOM_PREFIX); onAppSocketConnected(socket) .catch((error) => log.error("Error in socket connected handler", error)); }); io.of(PAGE_EDIT_NAMESPACE).on("connection", (socket: Socket) => { subscribeToEditEvents(socket, PAGE_ROOM_PREFIX); onPageSocketConnected(socket, io) .catch((error) => log.error("Error in socket connected handler", error)); }); io.of(ROOT_NAMESPACE).adapter.on("leave-room", (room, id) => { if (room.startsWith(APP_ROOM_PREFIX)) { log.debug(`ns:${ROOT_NAMESPACE}# socket ${id} left the room ${room}`); } sendCurrentUsers(io, room, APP_ROOM_PREFIX); }); io.of(ROOT_NAMESPACE).adapter.on("join-room", (room, id) => { if (room.startsWith(APP_ROOM_PREFIX)) { log.debug(`ns:${ROOT_NAMESPACE}# socket ${id} joined the room ${room}`); } sendCurrentUsers(io, room, APP_ROOM_PREFIX); }); io.of(PAGE_EDIT_NAMESPACE).adapter.on("leave-room", (room, id) => { if (room.startsWith(PAGE_ROOM_PREFIX)) { // someone left the page edit, notify others log.debug(`ns:${PAGE_EDIT_NAMESPACE} # socket ${id} left the room ${room}`); io.of(PAGE_EDIT_NAMESPACE).to(room).emit(LEAVE_EDIT_EVENT_NAME, id); } sendCurrentUsers(io.of(PAGE_EDIT_NAMESPACE), room, PAGE_ROOM_PREFIX); }); io.of(PAGE_EDIT_NAMESPACE).adapter.on("join-room", (room, id) => { if (room.startsWith(PAGE_ROOM_PREFIX)) { log.debug(`ns:${PAGE_EDIT_NAMESPACE}# socket ${id} joined the room ${room}`); } sendCurrentUsers(io.of(PAGE_EDIT_NAMESPACE), room, PAGE_ROOM_PREFIX); }); app.use(express.static(path.join(__dirname, "static"))); watchMongoDB(io).catch((error) => log.error("Error watching MongoDB", error)); // run the server server.listen(port, () => { log.info(`RTS version ${buildVersion} running at http://localhost:${port}`); }); } function joinEditRoom(socket: Socket, roomId: string, roomPrefix: string) { // remove this socket from any other rooms with roomPrefix if (socket.rooms) { socket.rooms.forEach(roomName => { if (roomName.startsWith(roomPrefix)) { socket.leave(roomName); } }); } // add this socket to room with application id let roomName = roomPrefix + roomId; socket.join(roomName); } function subscribeToEditEvents(socket: Socket, appRoomPrefix: string) { socket.on(START_EDIT_EVENT_NAME, (resourceId) => { if (socket.data.email) { // user is authenticated, join the room now joinEditRoom(socket, resourceId, appRoomPrefix); } else { // user not authenticated yet, save the resource id and room prefix to join later after auth socket.data.pendingRoomId = resourceId; socket.data.pendingRoomPrefix = appRoomPrefix; } }); socket.on(LEAVE_EDIT_EVENT_NAME, (resourceId) => { let roomName = appRoomPrefix + resourceId; socket.leave(roomName); // remove this socket from room }); } async function onAppSocketConnected(socket: Socket) { let isAuthenticated = await tryAuth(socket); if (isAuthenticated) { socket.join("email:" + socket.data.email); } } async function onPageSocketConnected(socket: Socket, socketIo: Server) { let isAuthenticated = await tryAuth(socket); if (isAuthenticated) { socket.on(MOUSE_POINTER_EVENT_NAME, (event: MousePointerEvent) => { event.user = new AppUser(socket.data.name, socket.data.email); event.socketId = socket.id; socketIo.of(PAGE_EDIT_NAMESPACE).to(PAGE_ROOM_PREFIX + event.pageId).emit(MOUSE_POINTER_EVENT_NAME, event); }); } } async function tryAuth(socket: Socket) { /* ********************************************************* */ // TODO: This change is not being used at the moment. Instead of using the environment variable API_BASE_URL // we should be able to derive the API_BASE_URL from the host header. This will make configuration simpler // for the user. The problem with this implementation is that Axios doesn't work for https endpoints currently. // This needs to be debugged. /* ********************************************************* */ // const host = socket.handshake.headers.host; const connectionCookie = socket?.handshake?.headers?.cookie; if (connectionCookie === undefined || connectionCookie === null || connectionCookie === "") { return false; } const matchedCookie = connectionCookie.match(/\bSESSION=\S+/); if (!matchedCookie) { return false; } const sessionCookie = matchedCookie[0]; let response; try { response = await axios.request({ method: "GET", url: API_BASE_URL + "/users/me", headers: { Cookie: sessionCookie, }, }); } catch (error) { if (error.response?.status === 401) { console.info("401 received when authenticating user with cookie: " + sessionCookie); } else if (error.response) { log.error("Error response received while authentication: ", error.response); } else { log.error("Error authenticating", error); } return false; } const email = response?.data?.data?.email; const name = response?.data?.data?.name ?? email; // If the session check API succeeds & the email/name is anonymousUser, then the user is not authenticated // and we should not allow them to join any rooms if (email == null || email === "anonymousUser" || name === "anonymousUser") { return false; } socket.data.email = email; socket.data.name = name; if (socket.data.pendingRoomId) { // an appId or pageId is pending for this socket, join now joinEditRoom(socket, socket.data.pendingRoomId, socket.data.pendingRoomPrefix); } return true; } async function watchMongoDB(io) { const client = await MongoClient.connect(MONGODB_URI, {useUnifiedTopology: true}); const db = client.db(); const threadCollection: mongodb.Collection = db.collection("commentThread"); const commentChangeStream = db.collection("comment").watch( [ // Prevent server-internal fields from being sent to the client. { $unset: [ "deletedAt", "_class", ].map(f => "fullDocument." + f) }, ], {fullDocument: "updateLookup"} ); commentChangeStream.on("change", async (event: mongodb.ChangeEventCR) => { let eventName = event.operationType + ":" + event.ns.coll; const comment: Comment = event.fullDocument; if (comment.deleted) { eventName = 'delete' + ":" + event.ns.coll; // emit delete event if deleted=true } comment.creationTime = comment.createdAt; comment.updationTime = comment.updatedAt; delete comment.createdAt; delete comment.updatedAt; delete comment.deleted; let target = io; let shouldEmit = false; for (const email of findPolicyEmails(comment.policies, "read:comments")) { shouldEmit = true; target = target.to("email:" + email); } if (shouldEmit) { target.emit(eventName, {comment}); } }); const threadChangeStream = threadCollection.watch( [ // Prevent server-internal fields from being sent to the client. { $unset: [ "deletedAt", "_class", ].map(f => "fullDocument." + f) }, ], {fullDocument: "updateLookup"} ); threadChangeStream.on("change", async (event: mongodb.ChangeEventCR) => { let eventName = event.operationType + ":" + event.ns.coll; const thread = event.fullDocument; if (thread.deleted) { eventName = 'delete' + ":" + event.ns.coll; // emit delete event if deleted=true } if (thread === null) { // This happens when `event.operationType === "drop"`, when a comment is deleted. log.error("Null document recieved for comment change event", event); return; } thread.creationTime = thread.createdAt; thread.updationTime = thread.updatedAt; delete thread.createdAt; delete thread.updatedAt; delete thread.deleted; thread.isViewed = false; let target = io; let shouldEmit = false; for (const email of findPolicyEmails(thread.policies, "read:commentThreads")) { shouldEmit = true; target = target.to("email:" + email); } if (shouldEmit) { target.emit(eventName, {thread}); } }); const notificationsStream = db.collection("notification").watch( [ // Prevent server-internal fields from being sent to the client. { $unset: [ "deletedAt", "deleted", ].map(f => "fullDocument." + f) }, ], {fullDocument: "updateLookup"} ); notificationsStream.on("change", async (event: mongodb.ChangeEventCR) => { const notification = event.fullDocument; if (notification === null) { // This happens when `event.operationType === "drop"`, when a notification is deleted. log.error("Null document recieved for notification change event", event); return; } // set the type from _class attribute notification.type = notification._class.slice(notification._class.lastIndexOf(".") + 1); delete notification._class; const eventName = event.operationType + ":" + event.ns.coll; io.to("email:" + notification.forUsername).emit(eventName, {notification}); }); process.on('uncaughtExceptionMonitor', (err, origin) => { log.error(`Caught exception: ${err}\n` + `Exception origin: ${origin}`); }); process.on('unhandledRejection', (reason, promise) => { log.debug('Unhandled Rejection at:', promise, 'reason:', reason); }); process.on("exit", () => { (commentChangeStream != null ? commentChangeStream.close() : Promise.bind(client).resolve()) .then(client.close.bind(client)) .finally("Fin"); }); log.debug("Watching MongoDB"); } function findPolicyEmails(policies: Policy[], permission: string): string[] { const emails: string[] = []; for (const policy of policies) { if (policy.permission === permission) { for (const email of policy.users) { emails.push(email); } break; } } return emails; } function sendCurrentUsers(socketIo, roomName: string, roomPrefix: string) { if (roomName.startsWith(roomPrefix)) { socketIo.in(roomName).fetchSockets().then(sockets => { let onlineUsernames = new Set(); let onlineUsers = new Array(); if (sockets) { sockets.forEach(s => { if (!onlineUsernames.has(s.data.email)) { onlineUsers.push(new AppUser(s.data.name, s.data.email)); } onlineUsernames.add(s.data.email); }); } let resourceId = roomName.replace(roomPrefix, ""); // get resourceId from room name by removing the prefix let response = new CurrentEditorsEvent(resourceId, onlineUsers); socketIo.to(roomName).emit(EDITORS_EVENT_NAME, response); }); } }