diff --git a/app/rts/src/models.ts b/app/rts/src/models.ts new file mode 100644 index 0000000000..18b6629e47 --- /dev/null +++ b/app/rts/src/models.ts @@ -0,0 +1,38 @@ +export class AppUser { + name: string; + email: string; + + constructor(name: string, email: string) { + this.name = name; + this.email = email; + } +} + +export class CurrentAppEditorEvent { + appId: string; + users: AppUser []; + + constructor(appId: string, users: AppUser []) { + this.appId = appId; + this.users = users; + } +} + +export interface Policy { + permission: string + users: string[] + groups: string[] +} + +export interface CommentThread { + applicationId: string +} + +export interface Comment { + threadId: string + policies: Policy[] + createdAt: string + updatedAt: string + creationTime: string + updationTime: string +} \ No newline at end of file diff --git a/app/rts/src/server.ts b/app/rts/src/server.ts index 7cd1ed3ecf..20be6daef4 100644 --- a/app/rts/src/server.ts +++ b/app/rts/src/server.ts @@ -5,6 +5,12 @@ import SocketIO from "socket.io" import { MongoClient, ObjectId } from "mongodb" import type mongodb from "mongodb" import axios from "axios" +import { AppUser, CurrentAppEditorEvent, Policy, Comment, CommentThread } from "./models" + +const APP_ROOM_PREFIX : string = "app:" +const APP_EDITORS_EVENT_NAME : string = "collab:online_app_editors" +const START_APP_EDIT_EVENT_NAME : string = "collab:start_edit_app" +const LEAVE_APP_EDIT_EVENT_NAME : string = "collab:leave_edit_app" const MONGODB_URI = process.env.APPSMITH_MONGODB_URI if (MONGODB_URI == null || MONGODB_URI === "" || !MONGODB_URI.startsWith("mongodb")) { @@ -18,18 +24,8 @@ if (API_BASE_URL == null || API_BASE_URL === "") { process.exit(1) } -console.log("Connecting to MongoDB at", MONGODB_URI) - -const ROOMS = {} - main() -interface Policy { - permission: string - users: string[] - groups: string[] -} - function main() { const app = express() const server = new http.Server(app) @@ -46,16 +42,19 @@ function main() { res.redirect("/index.html") }) - app.get("/info", (req, res) => { - return res.json({ rooms: ROOMS }) - }); - io.on("connection", (socket) => { - socket.join("default_room") onSocketConnected(socket) .catch((error) => console.error("Error in socket connected handler", error)) }) + io.of("/").adapter.on("leave-room", (room, id) => { + sendCurrentUsers(io, room); + }); + + io.of("/").adapter.on("join-room", (room, id) => { + sendCurrentUsers(io, room); + }); + watchMongoDB(io) .catch((error) => console.error("Error watching MongoDB", error)) @@ -65,14 +64,35 @@ function main() { }) } +function joinAppEditRoom(socket, appId) { + // remove this socket from any other app rooms + socket.rooms.forEach(roomName => { + if(roomName.startsWith(APP_ROOM_PREFIX)) { + socket.leave(roomName); + } + }); + + // add this socket to room with application id + let roomName = APP_ROOM_PREFIX + appId; + socket.join(roomName); +} + async function onSocketConnected(socket) { + socket.on(START_APP_EDIT_EVENT_NAME, (appId) => { + if(socket.data.email) { // user is authenticated, join the room now + joinAppEditRoom(socket, appId) + } else { // user not authenticated yet, save the appId to join later + socket.data.pendingAppId = appId + } + }); + + socket.on(LEAVE_APP_EDIT_EVENT_NAME, (appId) => { + let roomName = APP_ROOM_PREFIX + appId; + // remove this socket from app room + socket.leave(roomName); + }); + const connectionCookie = socket.handshake.headers.cookie - console.log("new user connected with cookie", connectionCookie) - - socket.on("disconnect", () => { - console.log("user disconnected", connectionCookie) - }) - let isAuthenticated = true if (connectionCookie != null && connectionCookie !== "") { @@ -99,7 +119,7 @@ async function tryAuth(socket, cookie) { }) } catch (error) { if (error.response?.status === 401) { - console.info("Couldn't authenticate user with cookie:", sessionCookie) + console.info("Couldn't authenticate user with cookie:") } else { console.error("Error authenticating", error) } @@ -107,22 +127,15 @@ async function tryAuth(socket, cookie) { } const email = response.data.data.user.email - ROOMS[email] = [] + const name = response.data.data.user.name ? response.data.data.user.name : email; + + socket.data.email = email + socket.data.name = name + socket.join("email:" + email) - console.log("A socket joined email:" + email) - - /*for (const org of response.data.data.organizationApplications) { - for (const app of org.applications) { - ROOMS[email].push(app.id) - console.log("Joining", app.id) - socket.join("application:" + app.id) - } - }//*/ - - socket.on("disconnect", (reason) => { - delete ROOMS[email] - }); - + if(socket.data.pendingAppId) { // an appid is pending for this socket, join now + joinAppEditRoom(socket, socket.data.pendingAppId); + } return true } @@ -130,19 +143,6 @@ async function watchMongoDB(io) { const client = await MongoClient.connect(MONGODB_URI, { useUnifiedTopology: true }); const db = client.db() - interface CommentThread { - applicationId: string - } - - interface Comment { - threadId: string - policies: Policy[] - createdAt: string - updatedAt: string - creationTime: string - updationTime: string - } - const threadCollection: mongodb.Collection = db.collection("commentThread") const commentChangeStream = db.collection("comment").watch( @@ -160,7 +160,7 @@ async function watchMongoDB(io) { ); commentChangeStream.on("change", async (event: mongodb.ChangeEventCR) => { - console.log("comment event", event) + // console.log("comment event", event) const comment: Comment = event.fullDocument const { applicationId }: CommentThread = await threadCollection.findOne( { _id: new ObjectId(comment.threadId) }, @@ -177,13 +177,13 @@ async function watchMongoDB(io) { for (const email of findPolicyEmails(comment.policies, "read:comments")) { shouldEmit = true - console.log("Emitting comment to email", email) + // console.log("Emitting comment to email", email) target = target.to("email:" + email) } if (shouldEmit) { const eventName = event.operationType + ":" + event.ns.coll - console.log("Emitting", eventName) + // console.log("Emitting", eventName) target.emit(eventName, { comment }) } }) @@ -203,7 +203,7 @@ async function watchMongoDB(io) { ); threadChangeStream.on("change", async (event: mongodb.ChangeEventCR) => { - console.log("thread event", event) + // console.log("thread event", event) const thread = event.fullDocument if (thread == null) { // This happens when `event.operationType === "drop"`, when a comment is deleted. @@ -222,13 +222,13 @@ async function watchMongoDB(io) { for (const email of findPolicyEmails(thread.policies, "read:commentThreads")) { shouldEmit = true - console.log("Emitting thread to email", email) + // console.log("Emitting thread to email", email) target = target.to("email:" + email) } if (shouldEmit) { const eventName = event.operationType + ":" + event.ns.coll - console.log("Emitting", eventName) + // console.log("Emitting", eventName) target.emit(eventName, { thread }) } }) @@ -247,7 +247,7 @@ async function watchMongoDB(io) { ); notificationsStream.on("change", async (event: mongodb.ChangeEventCR) => { - console.log("notification event", event) + // console.log("notification event", event) const notification = event.fullDocument if (notification == null) { @@ -278,7 +278,7 @@ function findPolicyEmails(policies: Policy[], permission: string): string[] { for (const policy of policies) { if (policy.permission === permission) { for (const email of policy.users) { - console.log("Emitting comment to email", email) + // console.log("Emitting comment to email", email) emails.push(email) } break @@ -286,3 +286,23 @@ function findPolicyEmails(policies: Policy[], permission: string): string[] { } return emails } + +function sendCurrentUsers(socketIo, roomName:string) { + if(roomName.startsWith(APP_ROOM_PREFIX)) { + socketIo.in(roomName).fetchSockets().then(sockets => { + let onlineUsernames = new Set(); + let onlineUsers = new Array(); + if(sockets) { + sockets.forEach(s => { + if(!onlineUsernames.has(s.data.email)) { + onlineUsers.push(new AppUser(s.data.name, s.data.email)); + } + onlineUsernames.add(s.data.email); + }); + } + let appId = roomName.replace(APP_ROOM_PREFIX, "") // get app id from room name by removing the prefix + let response = new CurrentAppEditorEvent(appId, onlineUsers); + socketIo.to(roomName).emit(APP_EDITORS_EVENT_NAME, response); + }); + } +} \ No newline at end of file