Feature: Add API to set and get concurrent app editors (#6292)
* -add an API in RTS to store and retrieve currently online users of an application * -handled the case when edit app event received before user is authenticated * -updated authentication API in rts
This commit is contained in:
parent
60c4887432
commit
5d8b7a961f
38
app/rts/src/models.ts
Normal file
38
app/rts/src/models.ts
Normal file
|
|
@ -0,0 +1,38 @@
|
|||
export class AppUser {
|
||||
name: string;
|
||||
email: string;
|
||||
|
||||
constructor(name: string, email: string) {
|
||||
this.name = name;
|
||||
this.email = email;
|
||||
}
|
||||
}
|
||||
|
||||
export class CurrentAppEditorEvent {
|
||||
appId: string;
|
||||
users: AppUser [];
|
||||
|
||||
constructor(appId: string, users: AppUser []) {
|
||||
this.appId = appId;
|
||||
this.users = users;
|
||||
}
|
||||
}
|
||||
|
||||
export interface Policy {
|
||||
permission: string
|
||||
users: string[]
|
||||
groups: string[]
|
||||
}
|
||||
|
||||
export interface CommentThread {
|
||||
applicationId: string
|
||||
}
|
||||
|
||||
export interface Comment {
|
||||
threadId: string
|
||||
policies: Policy[]
|
||||
createdAt: string
|
||||
updatedAt: string
|
||||
creationTime: string
|
||||
updationTime: string
|
||||
}
|
||||
|
|
@ -5,6 +5,12 @@ import SocketIO from "socket.io"
|
|||
import { MongoClient, ObjectId } from "mongodb"
|
||||
import type mongodb from "mongodb"
|
||||
import axios from "axios"
|
||||
import { AppUser, CurrentAppEditorEvent, Policy, Comment, CommentThread } from "./models"
|
||||
|
||||
const APP_ROOM_PREFIX : string = "app:"
|
||||
const APP_EDITORS_EVENT_NAME : string = "collab:online_app_editors"
|
||||
const START_APP_EDIT_EVENT_NAME : string = "collab:start_edit_app"
|
||||
const LEAVE_APP_EDIT_EVENT_NAME : string = "collab:leave_edit_app"
|
||||
|
||||
const MONGODB_URI = process.env.APPSMITH_MONGODB_URI
|
||||
if (MONGODB_URI == null || MONGODB_URI === "" || !MONGODB_URI.startsWith("mongodb")) {
|
||||
|
|
@ -18,18 +24,8 @@ if (API_BASE_URL == null || API_BASE_URL === "") {
|
|||
process.exit(1)
|
||||
}
|
||||
|
||||
console.log("Connecting to MongoDB at", MONGODB_URI)
|
||||
|
||||
const ROOMS = {}
|
||||
|
||||
main()
|
||||
|
||||
interface Policy {
|
||||
permission: string
|
||||
users: string[]
|
||||
groups: string[]
|
||||
}
|
||||
|
||||
function main() {
|
||||
const app = express()
|
||||
const server = new http.Server(app)
|
||||
|
|
@ -46,16 +42,19 @@ function main() {
|
|||
res.redirect("/index.html")
|
||||
})
|
||||
|
||||
app.get("/info", (req, res) => {
|
||||
return res.json({ rooms: ROOMS })
|
||||
});
|
||||
|
||||
io.on("connection", (socket) => {
|
||||
socket.join("default_room")
|
||||
onSocketConnected(socket)
|
||||
.catch((error) => console.error("Error in socket connected handler", error))
|
||||
})
|
||||
|
||||
io.of("/").adapter.on("leave-room", (room, id) => {
|
||||
sendCurrentUsers(io, room);
|
||||
});
|
||||
|
||||
io.of("/").adapter.on("join-room", (room, id) => {
|
||||
sendCurrentUsers(io, room);
|
||||
});
|
||||
|
||||
watchMongoDB(io)
|
||||
.catch((error) => console.error("Error watching MongoDB", error))
|
||||
|
||||
|
|
@ -65,14 +64,35 @@ function main() {
|
|||
})
|
||||
}
|
||||
|
||||
function joinAppEditRoom(socket, appId) {
|
||||
// remove this socket from any other app rooms
|
||||
socket.rooms.forEach(roomName => {
|
||||
if(roomName.startsWith(APP_ROOM_PREFIX)) {
|
||||
socket.leave(roomName);
|
||||
}
|
||||
});
|
||||
|
||||
// add this socket to room with application id
|
||||
let roomName = APP_ROOM_PREFIX + appId;
|
||||
socket.join(roomName);
|
||||
}
|
||||
|
||||
async function onSocketConnected(socket) {
|
||||
socket.on(START_APP_EDIT_EVENT_NAME, (appId) => {
|
||||
if(socket.data.email) { // user is authenticated, join the room now
|
||||
joinAppEditRoom(socket, appId)
|
||||
} else { // user not authenticated yet, save the appId to join later
|
||||
socket.data.pendingAppId = appId
|
||||
}
|
||||
});
|
||||
|
||||
socket.on(LEAVE_APP_EDIT_EVENT_NAME, (appId) => {
|
||||
let roomName = APP_ROOM_PREFIX + appId;
|
||||
// remove this socket from app room
|
||||
socket.leave(roomName);
|
||||
});
|
||||
|
||||
const connectionCookie = socket.handshake.headers.cookie
|
||||
console.log("new user connected with cookie", connectionCookie)
|
||||
|
||||
socket.on("disconnect", () => {
|
||||
console.log("user disconnected", connectionCookie)
|
||||
})
|
||||
|
||||
let isAuthenticated = true
|
||||
|
||||
if (connectionCookie != null && connectionCookie !== "") {
|
||||
|
|
@ -99,7 +119,7 @@ async function tryAuth(socket, cookie) {
|
|||
})
|
||||
} catch (error) {
|
||||
if (error.response?.status === 401) {
|
||||
console.info("Couldn't authenticate user with cookie:", sessionCookie)
|
||||
console.info("Couldn't authenticate user with cookie:")
|
||||
} else {
|
||||
console.error("Error authenticating", error)
|
||||
}
|
||||
|
|
@ -107,22 +127,15 @@ async function tryAuth(socket, cookie) {
|
|||
}
|
||||
|
||||
const email = response.data.data.user.email
|
||||
ROOMS[email] = []
|
||||
const name = response.data.data.user.name ? response.data.data.user.name : email;
|
||||
|
||||
socket.data.email = email
|
||||
socket.data.name = name
|
||||
|
||||
socket.join("email:" + email)
|
||||
console.log("A socket joined email:" + email)
|
||||
|
||||
/*for (const org of response.data.data.organizationApplications) {
|
||||
for (const app of org.applications) {
|
||||
ROOMS[email].push(app.id)
|
||||
console.log("Joining", app.id)
|
||||
socket.join("application:" + app.id)
|
||||
}
|
||||
}//*/
|
||||
|
||||
socket.on("disconnect", (reason) => {
|
||||
delete ROOMS[email]
|
||||
});
|
||||
|
||||
if(socket.data.pendingAppId) { // an appid is pending for this socket, join now
|
||||
joinAppEditRoom(socket, socket.data.pendingAppId);
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
@ -130,19 +143,6 @@ async function watchMongoDB(io) {
|
|||
const client = await MongoClient.connect(MONGODB_URI, { useUnifiedTopology: true });
|
||||
const db = client.db()
|
||||
|
||||
interface CommentThread {
|
||||
applicationId: string
|
||||
}
|
||||
|
||||
interface Comment {
|
||||
threadId: string
|
||||
policies: Policy[]
|
||||
createdAt: string
|
||||
updatedAt: string
|
||||
creationTime: string
|
||||
updationTime: string
|
||||
}
|
||||
|
||||
const threadCollection: mongodb.Collection<CommentThread> = db.collection("commentThread")
|
||||
|
||||
const commentChangeStream = db.collection("comment").watch(
|
||||
|
|
@ -160,7 +160,7 @@ async function watchMongoDB(io) {
|
|||
);
|
||||
|
||||
commentChangeStream.on("change", async (event: mongodb.ChangeEventCR<Comment>) => {
|
||||
console.log("comment event", event)
|
||||
// console.log("comment event", event)
|
||||
const comment: Comment = event.fullDocument
|
||||
const { applicationId }: CommentThread = await threadCollection.findOne(
|
||||
{ _id: new ObjectId(comment.threadId) },
|
||||
|
|
@ -177,13 +177,13 @@ async function watchMongoDB(io) {
|
|||
|
||||
for (const email of findPolicyEmails(comment.policies, "read:comments")) {
|
||||
shouldEmit = true
|
||||
console.log("Emitting comment to email", email)
|
||||
// console.log("Emitting comment to email", email)
|
||||
target = target.to("email:" + email)
|
||||
}
|
||||
|
||||
if (shouldEmit) {
|
||||
const eventName = event.operationType + ":" + event.ns.coll
|
||||
console.log("Emitting", eventName)
|
||||
// console.log("Emitting", eventName)
|
||||
target.emit(eventName, { comment })
|
||||
}
|
||||
})
|
||||
|
|
@ -203,7 +203,7 @@ async function watchMongoDB(io) {
|
|||
);
|
||||
|
||||
threadChangeStream.on("change", async (event: mongodb.ChangeEventCR) => {
|
||||
console.log("thread event", event)
|
||||
// console.log("thread event", event)
|
||||
const thread = event.fullDocument
|
||||
if (thread == null) {
|
||||
// This happens when `event.operationType === "drop"`, when a comment is deleted.
|
||||
|
|
@ -222,13 +222,13 @@ async function watchMongoDB(io) {
|
|||
|
||||
for (const email of findPolicyEmails(thread.policies, "read:commentThreads")) {
|
||||
shouldEmit = true
|
||||
console.log("Emitting thread to email", email)
|
||||
// console.log("Emitting thread to email", email)
|
||||
target = target.to("email:" + email)
|
||||
}
|
||||
|
||||
if (shouldEmit) {
|
||||
const eventName = event.operationType + ":" + event.ns.coll
|
||||
console.log("Emitting", eventName)
|
||||
// console.log("Emitting", eventName)
|
||||
target.emit(eventName, { thread })
|
||||
}
|
||||
})
|
||||
|
|
@ -247,7 +247,7 @@ async function watchMongoDB(io) {
|
|||
);
|
||||
|
||||
notificationsStream.on("change", async (event: mongodb.ChangeEventCR) => {
|
||||
console.log("notification event", event)
|
||||
// console.log("notification event", event)
|
||||
const notification = event.fullDocument
|
||||
|
||||
if (notification == null) {
|
||||
|
|
@ -278,7 +278,7 @@ function findPolicyEmails(policies: Policy[], permission: string): string[] {
|
|||
for (const policy of policies) {
|
||||
if (policy.permission === permission) {
|
||||
for (const email of policy.users) {
|
||||
console.log("Emitting comment to email", email)
|
||||
// console.log("Emitting comment to email", email)
|
||||
emails.push(email)
|
||||
}
|
||||
break
|
||||
|
|
@ -286,3 +286,23 @@ function findPolicyEmails(policies: Policy[], permission: string): string[] {
|
|||
}
|
||||
return emails
|
||||
}
|
||||
|
||||
function sendCurrentUsers(socketIo, roomName:string) {
|
||||
if(roomName.startsWith(APP_ROOM_PREFIX)) {
|
||||
socketIo.in(roomName).fetchSockets().then(sockets => {
|
||||
let onlineUsernames = new Set<string>();
|
||||
let onlineUsers = new Array<AppUser>();
|
||||
if(sockets) {
|
||||
sockets.forEach(s => {
|
||||
if(!onlineUsernames.has(s.data.email)) {
|
||||
onlineUsers.push(new AppUser(s.data.name, s.data.email));
|
||||
}
|
||||
onlineUsernames.add(s.data.email);
|
||||
});
|
||||
}
|
||||
let appId = roomName.replace(APP_ROOM_PREFIX, "") // get app id from room name by removing the prefix
|
||||
let response = new CurrentAppEditorEvent(appId, onlineUsers);
|
||||
socketIo.to(roomName).emit(APP_EDITORS_EVENT_NAME, response);
|
||||
});
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user