From f020be7c96f545db26c02990a4a26c6059ed0988 Mon Sep 17 00:00:00 2001 From: Abhijeet <41686026+abhvsn@users.noreply.github.com> Date: Thu, 3 Apr 2025 14:01:23 +0530 Subject: [PATCH] chore: Insert orgId in context for server internal flows (#40039) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Description /test Sanity ### :mag: Cypress test results > [!TIP] > 🟢 🟢 🟢 All cypress tests have passed! 🎉 🎉 🎉 > Workflow run: > Commit: 81fa3c871d283325f0a1ef50f9d48b8749c652f0 > Cypress dashboard. > Tags: `@tag.Sanity` > Spec: >
Wed, 02 Apr 2025 20:54:27 UTC ## Communication Should the DevRel and Marketing teams inform users about this change? - [ ] Yes - [ ] No ## Summary by CodeRabbit - **New Features** - Enhanced background operations for server telemetry and monitoring to improve performance visibility. - **Refactor** - Updated scheduling mechanisms and context handling to ensure smoother, more reliable operations. - **Chore** - Removed outdated task implementations to simplify system processes and optimize overall performance. --- .../ce/FeatureFlagMigrationHelperCEImpl.java | 1 - .../ce/InstanceConfigHelperCEImpl.java | 6 +- .../server/solutions/PingScheduledTask.java | 5 - .../solutions/PingScheduledTaskImpl.java | 63 ---- .../server/solutions/ScheduledTaskImpl.java | 59 +++- .../server/solutions/ce/EnvManagerCEImpl.java | 3 +- .../solutions/ce/PingScheduledTaskCE.java | 6 - .../solutions/ce/PingScheduledTaskCEImpl.java | 255 ----------------- .../server/solutions/ce/ScheduledTaskCE.java | 3 +- .../solutions/ce/ScheduledTaskCEImpl.java | 268 +++++++++++++++++- 10 files changed, 318 insertions(+), 351 deletions(-) delete mode 100644 app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/PingScheduledTask.java delete mode 100644 app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/PingScheduledTaskImpl.java delete mode 100644 app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCE.java delete mode 100644 app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCEImpl.java diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/FeatureFlagMigrationHelperCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/FeatureFlagMigrationHelperCEImpl.java index df47568176..f2f2990acc 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/FeatureFlagMigrationHelperCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/FeatureFlagMigrationHelperCEImpl.java @@ -7,7 +7,6 @@ import com.appsmith.server.domains.OrganizationConfiguration; import com.appsmith.server.featureflags.CachedFeatures; import com.appsmith.server.helpers.CollectionUtils; import com.appsmith.server.services.CacheableFeatureFlagHelper; -import com.appsmith.server.solutions.ce.ScheduledTaskCEImpl; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Mono; diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java index c208cfbcec..e136dd42dc 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/helpers/ce/InstanceConfigHelperCEImpl.java @@ -37,6 +37,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import static com.appsmith.server.constants.ce.FieldNameCE.ORGANIZATION_ID; + @RequiredArgsConstructor @Slf4j public class InstanceConfigHelperCEImpl implements InstanceConfigHelperCE { @@ -240,7 +242,9 @@ public class InstanceConfigHelperCEImpl implements InstanceConfigHelperCE { // startup return organizationService .retrieveAll() - .flatMap(org -> featureFlagService.getOrganizationFeatures(org.getId())) + .flatMap(org -> featureFlagService + .getOrganizationFeatures(org.getId()) + .contextWrite(ctx -> ctx.put(ORGANIZATION_ID, org.getId()))) .onErrorResume(error -> { log.error("Error while updating cache for org feature flags", error); return Mono.empty(); diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/PingScheduledTask.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/PingScheduledTask.java deleted file mode 100644 index 3d43f742fa..0000000000 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/PingScheduledTask.java +++ /dev/null @@ -1,5 +0,0 @@ -package com.appsmith.server.solutions; - -import com.appsmith.server.solutions.ce.PingScheduledTaskCE; - -public interface PingScheduledTask extends PingScheduledTaskCE {} diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/PingScheduledTaskImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/PingScheduledTaskImpl.java deleted file mode 100644 index 2af74d10aa..0000000000 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/PingScheduledTaskImpl.java +++ /dev/null @@ -1,63 +0,0 @@ -package com.appsmith.server.solutions; - -import com.appsmith.server.configurations.CommonConfig; -import com.appsmith.server.configurations.DeploymentProperties; -import com.appsmith.server.configurations.ProjectProperties; -import com.appsmith.server.configurations.SegmentConfig; -import com.appsmith.server.helpers.NetworkUtils; -import com.appsmith.server.repositories.ApplicationRepository; -import com.appsmith.server.repositories.DatasourceRepository; -import com.appsmith.server.repositories.NewActionRepository; -import com.appsmith.server.repositories.NewPageRepository; -import com.appsmith.server.repositories.UserRepository; -import com.appsmith.server.repositories.WorkspaceRepository; -import com.appsmith.server.services.ConfigService; -import com.appsmith.server.services.OrganizationService; -import com.appsmith.server.services.PermissionGroupService; -import com.appsmith.server.solutions.ce.PingScheduledTaskCEImpl; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; -import org.springframework.stereotype.Component; - -/** - * This class represents a scheduled task that pings a data point indicating that this server installation is live. - * This ping is only invoked if the Appsmith server is NOT running in Appsmith Clouud & the user has given Appsmith - * permissions to collect anonymized data - */ -@ConditionalOnExpression("!${is.cloud-hosting:false}") -@Slf4j -@Component -public class PingScheduledTaskImpl extends PingScheduledTaskCEImpl implements PingScheduledTask { - - public PingScheduledTaskImpl( - ConfigService configService, - SegmentConfig segmentConfig, - CommonConfig commonConfig, - WorkspaceRepository workspaceRepository, - ApplicationRepository applicationRepository, - NewPageRepository newPageRepository, - NewActionRepository newActionRepository, - DatasourceRepository datasourceRepository, - UserRepository userRepository, - ProjectProperties projectProperties, - DeploymentProperties deploymentProperties, - NetworkUtils networkUtils, - PermissionGroupService permissionGroupService, - OrganizationService organizationService) { - super( - configService, - segmentConfig, - commonConfig, - workspaceRepository, - applicationRepository, - newPageRepository, - newActionRepository, - datasourceRepository, - userRepository, - projectProperties, - deploymentProperties, - networkUtils, - permissionGroupService, - organizationService); - } -} diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ScheduledTaskImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ScheduledTaskImpl.java index 4ea28396a8..45873c5b0f 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ScheduledTaskImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ScheduledTaskImpl.java @@ -1,11 +1,66 @@ package com.appsmith.server.solutions; +import com.appsmith.server.configurations.CommonConfig; +import com.appsmith.server.configurations.DeploymentProperties; +import com.appsmith.server.configurations.ProjectProperties; +import com.appsmith.server.configurations.SegmentConfig; +import com.appsmith.server.helpers.NetworkUtils; +import com.appsmith.server.repositories.ApplicationRepository; +import com.appsmith.server.repositories.DatasourceRepository; +import com.appsmith.server.repositories.NewActionRepository; +import com.appsmith.server.repositories.NewPageRepository; +import com.appsmith.server.repositories.UserRepository; +import com.appsmith.server.repositories.WorkspaceRepository; +import com.appsmith.server.services.ConfigService; import com.appsmith.server.services.FeatureFlagService; import com.appsmith.server.services.OrganizationService; +import com.appsmith.server.services.PermissionGroupService; import com.appsmith.server.solutions.ce.ScheduledTaskCEImpl; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Profile; +import org.springframework.stereotype.Component; +/** + * This class represents a scheduled task that pings a data point indicating that this server installation is live. + * This ping is only invoked if the Appsmith server is NOT running in Appsmith Clouud & the user has given Appsmith + * permissions to collect anonymized data + */ +@Slf4j +@Component +@Profile("!test") public class ScheduledTaskImpl extends ScheduledTaskCEImpl implements ScheduledTask { - public ScheduledTaskImpl(FeatureFlagService featureFlagService, OrganizationService organizationService) { - super(featureFlagService, organizationService); + + public ScheduledTaskImpl( + ConfigService configService, + SegmentConfig segmentConfig, + CommonConfig commonConfig, + WorkspaceRepository workspaceRepository, + ApplicationRepository applicationRepository, + NewPageRepository newPageRepository, + NewActionRepository newActionRepository, + DatasourceRepository datasourceRepository, + UserRepository userRepository, + ProjectProperties projectProperties, + DeploymentProperties deploymentProperties, + NetworkUtils networkUtils, + PermissionGroupService permissionGroupService, + OrganizationService organizationService, + FeatureFlagService featureFlagService) { + super( + configService, + segmentConfig, + commonConfig, + workspaceRepository, + applicationRepository, + newPageRepository, + newActionRepository, + datasourceRepository, + userRepository, + projectProperties, + deploymentProperties, + networkUtils, + permissionGroupService, + organizationService, + featureFlagService); } } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/EnvManagerCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/EnvManagerCEImpl.java index f3dd450a51..0cca1f3fb9 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/EnvManagerCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/EnvManagerCEImpl.java @@ -47,7 +47,6 @@ import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -import reactor.util.context.Context; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; @@ -381,7 +380,7 @@ public class EnvManagerCEImpl implements EnvManagerCE { .flatMap(user -> validateChanges(user, envChanges).thenReturn(user)) .flatMap(user -> applyChangesToEnvFileWithoutAclCheck(envChanges) // Add the organization id to the context to be able to extract the feature flags - .contextWrite(Context.of(ORGANIZATION_ID, user.getOrganizationId())) + .contextWrite(ctx -> ctx.put(ORGANIZATION_ID, user.getOrganizationId())) // For configuration variables, save the variables to the config collection instead of .env file // We ideally want to migrate all variables from .env file to the config collection for better // scalability diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCE.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCE.java deleted file mode 100644 index 2ebc844450..0000000000 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCE.java +++ /dev/null @@ -1,6 +0,0 @@ -package com.appsmith.server.solutions.ce; - -public interface PingScheduledTaskCE { - - void pingSchedule(); -} diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCEImpl.java deleted file mode 100644 index 111ebefb7d..0000000000 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/PingScheduledTaskCEImpl.java +++ /dev/null @@ -1,255 +0,0 @@ -package com.appsmith.server.solutions.ce; - -import com.appsmith.caching.annotations.DistributedLock; -import com.appsmith.server.acl.AclPermission; -import com.appsmith.server.configurations.CommonConfig; -import com.appsmith.server.configurations.DeploymentProperties; -import com.appsmith.server.configurations.ProjectProperties; -import com.appsmith.server.configurations.SegmentConfig; -import com.appsmith.server.domains.Organization; -import com.appsmith.server.helpers.NetworkUtils; -import com.appsmith.server.repositories.ApplicationRepository; -import com.appsmith.server.repositories.DatasourceRepository; -import com.appsmith.server.repositories.NewActionRepository; -import com.appsmith.server.repositories.NewPageRepository; -import com.appsmith.server.repositories.UserRepository; -import com.appsmith.server.repositories.WorkspaceRepository; -import com.appsmith.server.services.ConfigService; -import com.appsmith.server.services.OrganizationService; -import com.appsmith.server.services.PermissionGroupService; -import com.appsmith.util.WebClientUtils; -import io.micrometer.observation.annotation.Observed; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.springframework.http.MediaType; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.web.reactive.function.BodyInserters; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; -import reactor.util.function.Tuple7; - -import java.time.Duration; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.Map; - -import static com.appsmith.external.constants.AnalyticsConstants.ADMIN_EMAIL_DOMAIN_HASH; -import static com.appsmith.external.constants.AnalyticsConstants.EMAIL_DOMAIN_HASH; -import static java.util.Map.entry; -import static org.apache.commons.lang3.StringUtils.defaultIfEmpty; - -/** - * This class represents a scheduled task that pings a data point indicating that this server installation is live. - * This ping is only invoked if the Appsmith server is NOT running in Appsmith Cloud & the user has given Appsmith - * permissions to collect anonymized data - */ -@Slf4j -@RequiredArgsConstructor -public class PingScheduledTaskCEImpl implements PingScheduledTaskCE { - - private final ConfigService configService; - private final SegmentConfig segmentConfig; - private final CommonConfig commonConfig; - - private final WorkspaceRepository workspaceRepository; - private final ApplicationRepository applicationRepository; - private final NewPageRepository newPageRepository; - private final NewActionRepository newActionRepository; - private final DatasourceRepository datasourceRepository; - private final UserRepository userRepository; - private final ProjectProperties projectProperties; - private final DeploymentProperties deploymentProperties; - private final NetworkUtils networkUtils; - private final PermissionGroupService permissionGroupService; - private final OrganizationService organizationService; - - // Delay to avoid 429 between the analytics call. - private static final Duration DELAY_BETWEEN_PINGS = Duration.ofMillis(200); - - enum UserTrackingType { - DAU, - WAU, - MAU - } - - /** - * Gets the external IP address of this server and pings a data point to indicate that this server instance is live. - * We use an initial delay of two minutes to roughly wait for the application along with the migrations are finished - * and ready. - */ - // Number of milliseconds between the start of each scheduled calls to this method. - @Scheduled(initialDelay = 2 * 60 * 1000 /* two minutes */, fixedRate = 6 * 60 * 60 * 1000 /* six hours */) - @DistributedLock( - key = "pingSchedule", - ttl = 5 * 60 * 60, // 5 hours - shouldReleaseLock = false) - @Observed(name = "pingSchedule") - public void pingSchedule() { - if (commonConfig.isTelemetryDisabled()) { - return; - } - - Mono instanceMono = configService.getInstanceId().cache(); - Mono ipMono = networkUtils.getExternalAddress().cache(); - organizationService - .retrieveAll() - .delayElements(DELAY_BETWEEN_PINGS) - .flatMap(organization -> Mono.zip(Mono.just(organization.getId()), instanceMono, ipMono)) - .flatMap(objects -> doPing(objects.getT1(), objects.getT2(), objects.getT3())) - .subscribeOn(Schedulers.single()) - .subscribe(); - } - - /** - * Given a unique ID (called a `userId` here), this method hits the Segment API to save a data point on this server - * instance being live. - * - * @param instanceId A unique identifier for this server instance, usually generated at the server's first start. - * @param ipAddress The external IP address of this instance's machine. - * @return A publisher that yields the string response of recording the data point. - */ - private Mono doPing(String organizationId, String instanceId, String ipAddress) { - // Note: Hard-coding Segment auth header and the event name intentionally. These are not intended to be - // environment specific values, instead, they are common values for all self-hosted environments. As such, they - // are not intended to be configurable. - final String ceKey = segmentConfig.getCeKey(); - if (StringUtils.isEmpty(ceKey)) { - log.error("The segment ce key is null"); - return Mono.empty(); - } - - return WebClientUtils.create("https://api.segment.io") - .post() - .uri("/v1/track") - .headers(headers -> headers.setBasicAuth(ceKey, "")) - .contentType(MediaType.APPLICATION_JSON) - .body(BodyInserters.fromValue(Map.of( - "userId", - instanceId, - "context", - Map.of("ip", ipAddress), - "properties", - Map.of("instanceId", instanceId, "organizationId", organizationId), - "event", - "Instance Active"))) - .retrieve() - .bodyToMono(String.class); - } - - // Number of milliseconds between the start of each scheduled calls to this method. - @Scheduled(initialDelay = 2 * 60 * 1000 /* two minutes */, fixedRate = 24 * 60 * 60 * 1000 /* a day */) - @DistributedLock(key = "pingStats", ttl = 12 * 60 * 60, shouldReleaseLock = false) - @Observed(name = "pingStats") - public void pingStats() { - // TODO @CloudBilling remove cloud hosting check and migrate the cron to report organization level stats - if (commonConfig.isTelemetryDisabled() || commonConfig.isCloudHosting()) { - return; - } - - final String ceKey = segmentConfig.getCeKey(); - if (StringUtils.isEmpty(ceKey)) { - log.error("The segment ce key is null"); - return; - } - - Mono publicPermissionGroupIdMono = permissionGroupService.getPublicPermissionGroupId(); - - // Get the non-system generated active user count - Mono userCountMono = userRepository - .countByDeletedAtIsNullAndIsSystemGeneratedIsNot(true) - .defaultIfEmpty(0L); - - Mono>> nonDeletedObjectsCountMono = Mono.zip( - workspaceRepository.countByDeletedAtNull().defaultIfEmpty(0L), - applicationRepository.countByDeletedAtNull().defaultIfEmpty(0L), - newPageRepository.countByDeletedAtNull().defaultIfEmpty(0L), - newActionRepository.countByDeletedAtNull().defaultIfEmpty(0L), - datasourceRepository.countByDeletedAtNull().defaultIfEmpty(0L), - userCountMono, - getUserTrackingDetails()); - - organizationService - .retrieveAll() - .delayElements(DELAY_BETWEEN_PINGS) - .map(Organization::getId) - .zipWith(publicPermissionGroupIdMono) - .flatMap(tuple2 -> { - final String organizationId = tuple2.getT1(); - final String publicPermissionGroupId = tuple2.getT2(); - return Mono.zip( - configService.getInstanceId().defaultIfEmpty("null"), - Mono.just(organizationId), - networkUtils.getExternalAddress(), - nonDeletedObjectsCountMono, - applicationRepository.getAllApplicationsCountAccessibleToARoleWithPermission( - AclPermission.READ_APPLICATIONS, publicPermissionGroupId)); - }) - .flatMap(statsData -> { - Map propertiesMap = new java.util.HashMap<>(Map.ofEntries( - entry("instanceId", statsData.getT1()), - entry("organizationId", statsData.getT2()), - entry("numOrgs", statsData.getT4().getT1()), - entry("numApps", statsData.getT4().getT2()), - entry("numPages", statsData.getT4().getT3()), - entry("numActions", statsData.getT4().getT4()), - entry("numDatasources", statsData.getT4().getT5()), - entry("numUsers", statsData.getT4().getT6()), - entry("numPublicApps", statsData.getT5()), - entry("version", projectProperties.getVersion()), - entry("edition", deploymentProperties.getEdition()), - entry("cloudProvider", defaultIfEmpty(deploymentProperties.getCloudProvider(), "")), - entry("efs", defaultIfEmpty(deploymentProperties.getEfs(), "")), - entry("tool", defaultIfEmpty(deploymentProperties.getTool(), "")), - entry("hostname", defaultIfEmpty(deploymentProperties.getHostname(), "")), - entry("deployedAt", defaultIfEmpty(deploymentProperties.getDeployedAt(), "")), - entry(ADMIN_EMAIL_DOMAIN_HASH, commonConfig.getAdminEmailDomainHash()), - entry(EMAIL_DOMAIN_HASH, commonConfig.getAdminEmailDomainHash()))); - - propertiesMap.putAll(statsData.getT4().getT7()); - - final String ipAddress = statsData.getT3(); - return WebClientUtils.create("https://api.segment.io") - .post() - .uri("/v1/track") - .headers(headers -> headers.setBasicAuth(ceKey, "")) - .contentType(MediaType.APPLICATION_JSON) - .body(BodyInserters.fromValue(Map.of( - "userId", - statsData.getT1(), - "context", - Map.of("ip", ipAddress), - "properties", - propertiesMap, - "event", - "instance_stats"))) - .retrieve() - .bodyToMono(String.class); - }) - .doOnError(error -> log.error("Error sending anonymous counts {0}", error)) - .subscribeOn(Schedulers.boundedElastic()) - .subscribe(); - } - - private Mono> getUserTrackingDetails() { - - Mono dauCountMono = userRepository - .countByDeletedAtIsNullAndLastActiveAtGreaterThanAndIsSystemGeneratedIsNot( - Instant.now().minus(1, ChronoUnit.DAYS), true) - .defaultIfEmpty(0L); - Mono wauCountMono = userRepository - .countByDeletedAtIsNullAndLastActiveAtGreaterThanAndIsSystemGeneratedIsNot( - Instant.now().minus(7, ChronoUnit.DAYS), true) - .defaultIfEmpty(0L); - Mono mauCountMono = userRepository - .countByDeletedAtIsNullAndLastActiveAtGreaterThanAndIsSystemGeneratedIsNot( - Instant.now().minus(30, ChronoUnit.DAYS), true) - .defaultIfEmpty(0L); - - return Mono.zip(dauCountMono, wauCountMono, mauCountMono) - .map(tuple -> Map.of( - UserTrackingType.DAU.name(), tuple.getT1(), - UserTrackingType.WAU.name(), tuple.getT2(), - UserTrackingType.MAU.name(), tuple.getT3())); - } -} diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCE.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCE.java index c4c5a1f0e3..a1ac006507 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCE.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCE.java @@ -1,5 +1,6 @@ package com.appsmith.server.solutions.ce; public interface ScheduledTaskCE { - void fetchFeatures(); + + void pingSchedule(); } diff --git a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java index 8276720efe..6cdab9350c 100644 --- a/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java +++ b/app/server/appsmith-server/src/main/java/com/appsmith/server/solutions/ce/ScheduledTaskCEImpl.java @@ -1,25 +1,262 @@ package com.appsmith.server.solutions.ce; import com.appsmith.caching.annotations.DistributedLock; +import com.appsmith.server.acl.AclPermission; +import com.appsmith.server.configurations.CommonConfig; +import com.appsmith.server.configurations.DeploymentProperties; +import com.appsmith.server.configurations.ProjectProperties; +import com.appsmith.server.configurations.SegmentConfig; import com.appsmith.server.domains.Organization; import com.appsmith.server.helpers.LoadShifter; +import com.appsmith.server.helpers.NetworkUtils; +import com.appsmith.server.repositories.ApplicationRepository; +import com.appsmith.server.repositories.DatasourceRepository; +import com.appsmith.server.repositories.NewActionRepository; +import com.appsmith.server.repositories.NewPageRepository; +import com.appsmith.server.repositories.UserRepository; +import com.appsmith.server.repositories.WorkspaceRepository; +import com.appsmith.server.services.ConfigService; import com.appsmith.server.services.FeatureFlagService; import com.appsmith.server.services.OrganizationService; +import com.appsmith.server.services.PermissionGroupService; +import com.appsmith.util.WebClientUtils; import io.micrometer.observation.annotation.Observed; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.http.MediaType; import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; -import reactor.core.publisher.Flux; +import org.springframework.web.reactive.function.BodyInserters; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.util.context.Context; +import reactor.util.function.Tuple7; -@RequiredArgsConstructor +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Map; + +import static com.appsmith.external.constants.AnalyticsConstants.ADMIN_EMAIL_DOMAIN_HASH; +import static com.appsmith.external.constants.AnalyticsConstants.EMAIL_DOMAIN_HASH; +import static com.appsmith.server.constants.ce.FieldNameCE.ORGANIZATION_ID; +import static java.util.Map.entry; +import static org.apache.commons.lang3.StringUtils.defaultIfEmpty; + +/** + * This class represents a scheduled task that pings a data point indicating that this server installation is live. + * This ping is only invoked if the Appsmith server is NOT running in Appsmith Cloud & the user has given Appsmith + * permissions to collect anonymized data + */ @Slf4j -@Component +@RequiredArgsConstructor public class ScheduledTaskCEImpl implements ScheduledTaskCE { + private final ConfigService configService; + private final SegmentConfig segmentConfig; + private final CommonConfig commonConfig; + + private final WorkspaceRepository workspaceRepository; + private final ApplicationRepository applicationRepository; + private final NewPageRepository newPageRepository; + private final NewActionRepository newActionRepository; + private final DatasourceRepository datasourceRepository; + private final UserRepository userRepository; + private final ProjectProperties projectProperties; + private final DeploymentProperties deploymentProperties; + private final NetworkUtils networkUtils; + private final PermissionGroupService permissionGroupService; + private final OrganizationService organizationService; private final FeatureFlagService featureFlagService; - private final OrganizationService organizationService; + // Delay to avoid 429 between the analytics call. + protected static final Duration DELAY_BETWEEN_PINGS = Duration.ofMillis(200); + + enum UserTrackingType { + DAU, + WAU, + MAU + } + + /** + * Gets the external IP address of this server and pings a data point to indicate that this server instance is live. + * We use an initial delay of two minutes to roughly wait for the application along with the migrations are finished + * and ready. + */ + // Number of milliseconds between the start of each scheduled calls to this method. + @Scheduled(initialDelay = 2 * 60 * 1000 /* two minutes */, fixedRate = 6 * 60 * 60 * 1000 /* six hours */) + @DistributedLock( + key = "pingSchedule", + ttl = 5 * 60 * 60, // 5 hours + shouldReleaseLock = false) + @Observed(name = "pingSchedule") + public void pingSchedule() { + if (commonConfig.isTelemetryDisabled()) { + return; + } + + Mono instanceMono = configService.getInstanceId().cache(); + Mono ipMono = networkUtils.getExternalAddress().cache(); + organizationService + .retrieveAll() + .delayElements(DELAY_BETWEEN_PINGS) + .flatMap(organization -> Mono.zip(Mono.just(organization.getId()), instanceMono, ipMono)) + .flatMap(objects -> doPing(objects.getT1(), objects.getT2(), objects.getT3())) + .subscribeOn(Schedulers.single()) + .subscribe(); + } + + /** + * Given a unique ID (called a `userId` here), this method hits the Segment API to save a data point on this server + * instance being live. + * + * @param instanceId A unique identifier for this server instance, usually generated at the server's first start. + * @param ipAddress The external IP address of this instance's machine. + * @return A publisher that yields the string response of recording the data point. + */ + private Mono doPing(String organizationId, String instanceId, String ipAddress) { + // Note: Hard-coding Segment auth header and the event name intentionally. These are not intended to be + // environment specific values, instead, they are common values for all self-hosted environments. As such, they + // are not intended to be configurable. + final String ceKey = segmentConfig.getCeKey(); + if (StringUtils.isEmpty(ceKey)) { + log.error("The segment ce key is null"); + return Mono.empty(); + } + + return WebClientUtils.create("https://api.segment.io") + .post() + .uri("/v1/track") + .headers(headers -> headers.setBasicAuth(ceKey, "")) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromValue(Map.of( + "userId", + instanceId, + "context", + Map.of("ip", ipAddress), + "properties", + Map.of("instanceId", instanceId, "organizationId", organizationId), + "event", + "Instance Active"))) + .retrieve() + .bodyToMono(String.class); + } + + // Number of milliseconds between the start of each scheduled calls to this method. + @Scheduled(initialDelay = 2 * 60 * 1000 /* two minutes */, fixedRate = 24 * 60 * 60 * 1000 /* a day */) + @DistributedLock(key = "pingStats", ttl = 12 * 60 * 60, shouldReleaseLock = false) + @Observed(name = "pingStats") + public void pingStats() { + // TODO @CloudBilling remove cloud hosting check and migrate the cron to report organization level stats + if (commonConfig.isTelemetryDisabled() || commonConfig.isCloudHosting()) { + return; + } + + final String ceKey = segmentConfig.getCeKey(); + if (StringUtils.isEmpty(ceKey)) { + log.error("The segment ce key is null"); + return; + } + + Mono publicPermissionGroupIdMono = permissionGroupService.getPublicPermissionGroupId(); + + // Get the non-system generated active user count + Mono userCountMono = userRepository + .countByDeletedAtIsNullAndIsSystemGeneratedIsNot(true) + .defaultIfEmpty(0L); + + Mono>> nonDeletedObjectsCountMono = Mono.zip( + workspaceRepository.countByDeletedAtNull().defaultIfEmpty(0L), + applicationRepository.countByDeletedAtNull().defaultIfEmpty(0L), + newPageRepository.countByDeletedAtNull().defaultIfEmpty(0L), + newActionRepository.countByDeletedAtNull().defaultIfEmpty(0L), + datasourceRepository.countByDeletedAtNull().defaultIfEmpty(0L), + userCountMono, + getUserTrackingDetails()); + + organizationService + .retrieveAll() + .delayElements(DELAY_BETWEEN_PINGS) + .map(Organization::getId) + .zipWith(publicPermissionGroupIdMono) + .flatMap(tuple2 -> { + final String organizationId = tuple2.getT1(); + final String publicPermissionGroupId = tuple2.getT2(); + return Mono.zip( + configService.getInstanceId().defaultIfEmpty("null"), + Mono.just(organizationId), + networkUtils.getExternalAddress(), + nonDeletedObjectsCountMono, + applicationRepository.getAllApplicationsCountAccessibleToARoleWithPermission( + AclPermission.READ_APPLICATIONS, publicPermissionGroupId)); + }) + .flatMap(statsData -> { + Map propertiesMap = new java.util.HashMap<>(Map.ofEntries( + entry("instanceId", statsData.getT1()), + entry("organizationId", statsData.getT2()), + entry("numOrgs", statsData.getT4().getT1()), + entry("numApps", statsData.getT4().getT2()), + entry("numPages", statsData.getT4().getT3()), + entry("numActions", statsData.getT4().getT4()), + entry("numDatasources", statsData.getT4().getT5()), + entry("numUsers", statsData.getT4().getT6()), + entry("numPublicApps", statsData.getT5()), + entry("version", projectProperties.getVersion()), + entry("edition", deploymentProperties.getEdition()), + entry("cloudProvider", defaultIfEmpty(deploymentProperties.getCloudProvider(), "")), + entry("efs", defaultIfEmpty(deploymentProperties.getEfs(), "")), + entry("tool", defaultIfEmpty(deploymentProperties.getTool(), "")), + entry("hostname", defaultIfEmpty(deploymentProperties.getHostname(), "")), + entry("deployedAt", defaultIfEmpty(deploymentProperties.getDeployedAt(), "")), + entry(ADMIN_EMAIL_DOMAIN_HASH, commonConfig.getAdminEmailDomainHash()), + entry(EMAIL_DOMAIN_HASH, commonConfig.getAdminEmailDomainHash()))); + + propertiesMap.putAll(statsData.getT4().getT7()); + + final String ipAddress = statsData.getT3(); + return WebClientUtils.create("https://api.segment.io") + .post() + .uri("/v1/track") + .headers(headers -> headers.setBasicAuth(ceKey, "")) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromValue(Map.of( + "userId", + statsData.getT1(), + "context", + Map.of("ip", ipAddress), + "properties", + propertiesMap, + "event", + "instance_stats"))) + .retrieve() + .bodyToMono(String.class); + }) + .doOnError(error -> log.error("Error sending anonymous counts {0}", error)) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe(); + } + + private Mono> getUserTrackingDetails() { + + Mono dauCountMono = userRepository + .countByDeletedAtIsNullAndLastActiveAtGreaterThanAndIsSystemGeneratedIsNot( + Instant.now().minus(1, ChronoUnit.DAYS), true) + .defaultIfEmpty(0L); + Mono wauCountMono = userRepository + .countByDeletedAtIsNullAndLastActiveAtGreaterThanAndIsSystemGeneratedIsNot( + Instant.now().minus(7, ChronoUnit.DAYS), true) + .defaultIfEmpty(0L); + Mono mauCountMono = userRepository + .countByDeletedAtIsNullAndLastActiveAtGreaterThanAndIsSystemGeneratedIsNot( + Instant.now().minus(30, ChronoUnit.DAYS), true) + .defaultIfEmpty(0L); + + return Mono.zip(dauCountMono, wauCountMono, mauCountMono) + .map(tuple -> Map.of( + UserTrackingType.DAU.name(), tuple.getT1(), + UserTrackingType.WAU.name(), tuple.getT2(), + UserTrackingType.MAU.name(), tuple.getT3())); + } @Scheduled(initialDelay = 10 * 1000 /* ten seconds */, fixedRate = 30 * 60 * 1000 /* thirty minutes */) @DistributedLock( @@ -29,16 +266,17 @@ public class ScheduledTaskCEImpl implements ScheduledTaskCE { @Observed(name = "fetchFeatures") public void fetchFeatures() { log.info("Fetching features for organizations"); - Flux organizationFlux = organizationService.retrieveAll(); - organizationFlux - .flatMap( - featureFlagService - ::getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations) - .flatMap(featureFlagService::checkAndExecuteMigrationsForOrganizationFeatureFlags) - .onErrorResume(error -> { - log.error("Error while fetching organization feature flags", error); - return Flux.empty(); - }) + organizationService + .retrieveAll() + .delayElements(DELAY_BETWEEN_PINGS) + .flatMap(organization -> featureFlagService + .getAllRemoteFeaturesForOrganizationAndUpdateFeatureFlagsWithPendingMigrations(organization) + .flatMap(featureFlagService::checkAndExecuteMigrationsForOrganizationFeatureFlags) + .onErrorResume(error -> { + log.error("Error while fetching organization feature flags", error); + return Mono.empty(); + }) + .contextWrite(Context.of(ORGANIZATION_ID, organization.getId()))) .subscribeOn(LoadShifter.elasticScheduler) .subscribe(); }