diff --git a/app/jobs/runtime/events_cleanup.rb b/app/jobs/runtime/events_cleanup.rb
index a1841576395..873c6a7f319 100644
--- a/app/jobs/runtime/events_cleanup.rb
+++ b/app/jobs/runtime/events_cleanup.rb
@@ -9,7 +9,7 @@ def initialize(cutoff_age_in_days)
end
def perform
- Database::OldRecordCleanup.new(Event, cutoff_age_in_days).delete
+ Database::OldRecordCleanup.new(Event, cutoff_age_in_days:).delete
end
def job_name_in_configuration
diff --git a/app/models/runtime/app_usage_event.rb b/app/models/runtime/app_usage_event.rb
index 32913f8cdda..e8c28ba7e74 100644
--- a/app/models/runtime/app_usage_event.rb
+++ b/app/models/runtime/app_usage_event.rb
@@ -9,6 +9,23 @@ class AppUsageEvent < Sequel::Model
:buildpack_guid, :buildpack_name,
:package_state, :previous_package_state, :parent_app_guid,
:parent_app_name, :process_type, :task_name, :task_guid
+
+ def self.usage_lifecycles
+ [
+ {
+ beginning_states: [ProcessModel::STARTED, Repositories::AppUsageEventRepository::WAS_RUNNING_EVENT_STATE],
+ ending_state: ProcessModel::STOPPED,
+ guid_column: :app_guid
+ },
+ {
+ beginning_states: [Repositories::AppUsageEventRepository::TASK_STARTED_EVENT_STATE,
+ Repositories::AppUsageEventRepository::TASK_WAS_RUNNING_EVENT_STATE],
+ ending_state: Repositories::AppUsageEventRepository::TASK_STOPPED_EVENT_STATE,
+ guid_column: :task_guid
+ }
+ ].freeze
+ end
+
AppUsageEvent.dataset_module do
def supports_window_functions?
false
diff --git a/app/models/runtime/task_model.rb b/app/models/runtime/task_model.rb
index bad22c00eef..576513c5f63 100644
--- a/app/models/runtime/task_model.rb
+++ b/app/models/runtime/task_model.rb
@@ -137,9 +137,16 @@ def create_start_event
def create_stop_event_if_needed
app_usage_repo = Repositories::AppUsageEventRepository.new
- start_event = app_usage_repo.find_by_task_and_state(task: self, state: 'TASK_STARTED')
- existing_stop_event = app_usage_repo.find_by_task_and_state(task: self, state: 'TASK_STOPPED')
- return if start_event.nil? || existing_stop_event.present?
+ return if app_usage_repo.find_by_task_and_state(task: self, state: 'TASK_STOPPED').present?
+
+ # Record the stop only when there is recorded evidence that the task
+ # started: the TASK_STARTED event, or the TASK_WAS_RUNNING baseline seeded
+ # for tasks that were already running when the keep-running cleanup was
+ # introduced. Without either, no consumer ever saw the task start, so a
+ # stop event would be unmatched noise.
+ started = app_usage_repo.find_by_task_and_state(task: self, state: 'TASK_STARTED') ||
+ app_usage_repo.find_by_task_and_state(task: self, state: 'TASK_WAS_RUNNING')
+ return if started.nil?
create_stop_event
end
diff --git a/app/models/services/service_usage_event.rb b/app/models/services/service_usage_event.rb
index f5e694169f9..c09557870d1 100644
--- a/app/models/services/service_usage_event.rb
+++ b/app/models/services/service_usage_event.rb
@@ -7,5 +7,17 @@ class ServiceUsageEvent < Sequel::Model
:service_plan_guid, :service_plan_name,
:service_guid, :service_label,
:service_broker_name, :service_broker_guid
+
+ def self.usage_lifecycles
+ [
+ {
+ beginning_states: [Repositories::ServiceUsageEventRepository::CREATED_EVENT_STATE,
+ Repositories::ServiceUsageEventRepository::UPDATED_EVENT_STATE,
+ Repositories::ServiceUsageEventRepository::WAS_RUNNING_EVENT_STATE],
+ ending_state: Repositories::ServiceUsageEventRepository::DELETED_EVENT_STATE,
+ guid_column: :service_instance_guid
+ }
+ ].freeze
+ end
end
end
diff --git a/app/repositories/app_usage_event_repository.rb b/app/repositories/app_usage_event_repository.rb
index 51d751a8cd1..bce322d530a 100644
--- a/app/repositories/app_usage_event_repository.rb
+++ b/app/repositories/app_usage_event_repository.rb
@@ -4,6 +4,16 @@
module VCAP::CloudController
module Repositories
class AppUsageEventRepository
+ WAS_RUNNING_EVENT_STATE = 'WAS_RUNNING'.freeze
+ TASK_STARTED_EVENT_STATE = 'TASK_STARTED'.freeze
+ TASK_STOPPED_EVENT_STATE = 'TASK_STOPPED'.freeze
+ # Task baselines get their own state (rather than reusing WAS_RUNNING)
+ # because task events share the app_usage_events table with app events but
+ # carry an empty app_guid: a WAS_RUNNING row keyed by app_guid '' would be
+ # correlated with every other task's baseline by the app lifecycle's
+ # cleanup and swept by the app backfill's stale-row sweep.
+ TASK_WAS_RUNNING_EVENT_STATE = 'TASK_WAS_RUNNING'.freeze
+
def find(guid)
AppUsageEvent.find(guid:)
end
@@ -152,7 +162,7 @@ def purge_and_reseed_started_apps!
end
def delete_events_older_than(cutoff_age_in_days)
- Database::OldRecordCleanup.new(AppUsageEvent, cutoff_age_in_days, keep_at_least_one_record: true).delete
+ Database::OldRecordCleanup.new(AppUsageEvent, cutoff_age_in_days: cutoff_age_in_days, keep_at_least_one_record: true, keep_running_records: true).delete
end
private
diff --git a/app/repositories/service_usage_event_repository.rb b/app/repositories/service_usage_event_repository.rb
index 35fbef5a1f0..baad88a7772 100644
--- a/app/repositories/service_usage_event_repository.rb
+++ b/app/repositories/service_usage_event_repository.rb
@@ -7,6 +7,7 @@ class ServiceUsageEventRepository
DELETED_EVENT_STATE = 'DELETED'.freeze
CREATED_EVENT_STATE = 'CREATED'.freeze
UPDATED_EVENT_STATE = 'UPDATED'.freeze
+ WAS_RUNNING_EVENT_STATE = 'WAS_RUNNING'.freeze
def find(guid)
ServiceUsageEvent.find(guid:)
@@ -92,7 +93,7 @@ def purge_and_reseed_service_instances!
end
def delete_events_older_than(cutoff_age_in_days)
- Database::OldRecordCleanup.new(ServiceUsageEvent, cutoff_age_in_days, keep_at_least_one_record: true).delete
+ Database::OldRecordCleanup.new(ServiceUsageEvent, cutoff_age_in_days: cutoff_age_in_days, keep_at_least_one_record: true, keep_running_records: true).delete
end
end
end
diff --git a/db/migrations/20260601120000_add_lifecycle_index_to_usage_events.rb b/db/migrations/20260601120000_add_lifecycle_index_to_usage_events.rb
new file mode 100644
index 00000000000..728394f11c0
--- /dev/null
+++ b/db/migrations/20260601120000_add_lifecycle_index_to_usage_events.rb
@@ -0,0 +1,68 @@
+Sequel.migration do
+ no_transaction # to use the 'concurrently' option
+
+ up do
+ if database_type == :postgres
+ VCAP::Migration.with_concurrent_timeout(self) do
+ add_index :app_usage_events, %i[state app_guid id],
+ name: :app_usage_events_lifecycle_index,
+ if_not_exists: true,
+ concurrently: true
+
+ add_index :service_usage_events, %i[state service_instance_guid id],
+ name: :service_usage_events_lifecycle_index,
+ if_not_exists: true,
+ concurrently: true
+ end
+
+ elsif database_type == :mysql
+ alter_table :app_usage_events do
+ # rubocop:disable Sequel/ConcurrentIndex
+ add_index %i[state app_guid id], name: :app_usage_events_lifecycle_index unless @db.indexes(:app_usage_events).include?(:app_usage_events_lifecycle_index)
+ # rubocop:enable Sequel/ConcurrentIndex
+ end
+
+ alter_table :service_usage_events do
+ # rubocop:disable Sequel/ConcurrentIndex
+ unless @db.indexes(:service_usage_events).include?(:service_usage_events_lifecycle_index)
+ add_index %i[state service_instance_guid id],
+ name: :service_usage_events_lifecycle_index
+ end
+ # rubocop:enable Sequel/ConcurrentIndex
+ end
+ end
+ end
+
+ down do
+ if database_type == :postgres
+ VCAP::Migration.with_concurrent_timeout(self) do
+ drop_index :app_usage_events, %i[state app_guid id],
+ name: :app_usage_events_lifecycle_index,
+ if_exists: true,
+ concurrently: true
+
+ drop_index :service_usage_events, %i[state service_instance_guid id],
+ name: :service_usage_events_lifecycle_index,
+ if_exists: true,
+ concurrently: true
+ end
+ end
+
+ if database_type == :mysql
+ alter_table :app_usage_events do
+ # rubocop:disable Sequel/ConcurrentIndex
+ drop_index %i[state app_guid id], name: :app_usage_events_lifecycle_index if @db.indexes(:app_usage_events).include?(:app_usage_events_lifecycle_index)
+ # rubocop:enable Sequel/ConcurrentIndex
+ end
+
+ alter_table :service_usage_events do
+ # rubocop:disable Sequel/ConcurrentIndex
+ if @db.indexes(:service_usage_events).include?(:service_usage_events_lifecycle_index)
+ drop_index %i[state service_instance_guid id],
+ name: :service_usage_events_lifecycle_index
+ end
+ # rubocop:enable Sequel/ConcurrentIndex
+ end
+ end
+ end
+end
diff --git a/db/migrations/20260601120100_seed_was_running_app_usage_events.rb b/db/migrations/20260601120100_seed_was_running_app_usage_events.rb
new file mode 100644
index 00000000000..3749f1c3967
--- /dev/null
+++ b/db/migrations/20260601120100_seed_was_running_app_usage_events.rb
@@ -0,0 +1,18 @@
+require 'database/was_running_backfill'
+
+Sequel.migration do
+ no_transaction # backfill manages its own per-batch transactions
+
+ up do
+ logger = Steno.logger('cc.backfill.was_running')
+ if VCAP::WasRunningBackfill.skip?
+ VCAP::WasRunningBackfill.log_skip(logger, 'app')
+ else
+ VCAP::WasRunningBackfill.seed_app_usage_events(self, logger)
+ end
+ end
+
+ down do
+ VCAP::WasRunningBackfill.delete_app_usage_events(self)
+ end
+end
diff --git a/db/migrations/20260601120200_seed_was_running_service_usage_events.rb b/db/migrations/20260601120200_seed_was_running_service_usage_events.rb
new file mode 100644
index 00000000000..ead0eb7aea1
--- /dev/null
+++ b/db/migrations/20260601120200_seed_was_running_service_usage_events.rb
@@ -0,0 +1,18 @@
+require 'database/was_running_backfill'
+
+Sequel.migration do
+ no_transaction # backfill manages its own per-batch transactions
+
+ up do
+ logger = Steno.logger('cc.backfill.was_running')
+ if VCAP::WasRunningBackfill.skip?
+ VCAP::WasRunningBackfill.log_skip(logger, 'service')
+ else
+ VCAP::WasRunningBackfill.seed_service_usage_events(self, logger)
+ end
+ end
+
+ down do
+ VCAP::WasRunningBackfill.delete_service_usage_events(self)
+ end
+end
diff --git a/db/migrations/20260601120300_seed_was_running_task_usage_events.rb b/db/migrations/20260601120300_seed_was_running_task_usage_events.rb
new file mode 100644
index 00000000000..2e38e888312
--- /dev/null
+++ b/db/migrations/20260601120300_seed_was_running_task_usage_events.rb
@@ -0,0 +1,18 @@
+require 'database/was_running_backfill'
+
+Sequel.migration do
+ no_transaction # backfill manages its own per-batch transactions
+
+ up do
+ logger = Steno.logger('cc.backfill.was_running')
+ if VCAP::WasRunningBackfill.skip?
+ VCAP::WasRunningBackfill.log_skip(logger, 'task')
+ else
+ VCAP::WasRunningBackfill.seed_task_usage_events(self, logger)
+ end
+ end
+
+ down do
+ VCAP::WasRunningBackfill.delete_task_usage_events(self)
+ end
+end
diff --git a/docs/v2/app_usage_events/list_all_app_usage_events.html b/docs/v2/app_usage_events/list_all_app_usage_events.html
index 471dd0e8586..583bce89d12 100644
--- a/docs/v2/app_usage_events/list_all_app_usage_events.html
+++ b/docs/v2/app_usage_events/list_all_app_usage_events.html
@@ -631,9 +631,11 @@
Body
- STARTED
- STOPPED
+ - WAS_RUNNING
- BUILDPACK_SET
- TASK_STARTED
- TASK_STOPPED
+ - TASK_WAS_RUNNING
diff --git a/docs/v2/service_usage_events/list_service_usage_events.html b/docs/v2/service_usage_events/list_service_usage_events.html
index 501999684c8..5f1b68b393a 100644
--- a/docs/v2/service_usage_events/list_service_usage_events.html
+++ b/docs/v2/service_usage_events/list_service_usage_events.html
@@ -290,6 +290,7 @@ Body
CREATED
DELETED
UPDATED
+ WAS_RUNNING
|
diff --git a/docs/v3/source/includes/resources/app_usage_events/_object.md.erb b/docs/v3/source/includes/resources/app_usage_events/_object.md.erb
index 36afafaf3d1..515c6c84385 100644
--- a/docs/v3/source/includes/resources/app_usage_events/_object.md.erb
+++ b/docs/v3/source/includes/resources/app_usage_events/_object.md.erb
@@ -30,3 +30,14 @@ Name | Type | Description
**instance_count.current** | _integer_ or `null` | Current instance count of the app that this event pertains to, if applicable
**instance_count.previous** | _integer_ or `null` | Previous instance count of the app that this event pertains to, if applicable
**links** | [_links object_](#links) | Links to related resources
+
+#### WAS_RUNNING and TASK_WAS_RUNNING events
+
+`WAS_RUNNING` and `TASK_WAS_RUNNING` are synthetic values for `state.current` recorded once per running process (`WAS_RUNNING`) and once per running task (`TASK_WAS_RUNNING`) by a one-time data migration when the keep-running cleanup feature was introduced. They mark every process and task that was already running at the time of the upgrade so that billing consumers can bootstrap from a complete baseline even if the original `STARTED`/`TASK_STARTED` events have been pruned.
+
+**Consumer interpretation** (read `WAS_RUNNING`/`STARTED` as `TASK_WAS_RUNNING`/`TASK_STARTED` for task events, which are keyed by `task.guid`):
+
+* If you have not previously recorded a `STARTED` event for this resource, treat `WAS_RUNNING` as equivalent to `STARTED`.
+* If you have already recorded `STARTED` (or an earlier `WAS_RUNNING`) for this resource, treat as a redundant baseline confirmation and ignore.
+* `created_at` reflects when the backfill migration ran, **not** when the app or task actually started. Treat `WAS_RUNNING` as a baseline marker that the resource was already running as of that timestamp, not as the true start of the running interval.
+* `state.previous` on a `WAS_RUNNING` event is always `null`. Subsequent real events for the same resource will continue to report their actual prior process state in `state.previous` (typically `STARTED`). If you perform chain validation, treat `WAS_RUNNING` as equivalent to `STARTED` for the purpose of validating the next event's `state.previous`.
diff --git a/docs/v3/source/includes/resources/service_usage_events/_object.md.erb b/docs/v3/source/includes/resources/service_usage_events/_object.md.erb
index f904e1ff083..36c785d0a91 100644
--- a/docs/v3/source/includes/resources/service_usage_events/_object.md.erb
+++ b/docs/v3/source/includes/resources/service_usage_events/_object.md.erb
@@ -26,3 +26,13 @@ Name | Type | Description
**service_broker.guid** | _string_ or `null` | Unique identifier of the service broker that this event pertains to, if applicable
**service_broker.name** | _string_ or `null` | Name of the service broker that this event pertains to, if applicable
**links** | [_links object_](#links) | Links to related resources
+
+#### WAS_RUNNING events
+
+`WAS_RUNNING` is a synthetic value for `state` recorded once per existing service instance by a one-time data migration when the keep-running cleanup feature was introduced. It marks every service instance that existed at the time of the upgrade so that billing consumers can bootstrap from a complete baseline of service instances even if the original `CREATED` events have been pruned.
+
+**Consumer interpretation:**
+
+* If you have not previously recorded a `CREATED` event for this service instance, treat `WAS_RUNNING` as equivalent to `CREATED`.
+* If you have already recorded `CREATED` (or an earlier `WAS_RUNNING`) for this instance, treat as a redundant baseline confirmation and ignore.
+* `created_at` reflects when the backfill migration ran, **not** when the service instance was created. Treat `WAS_RUNNING` as a baseline marker that the instance already existed as of that timestamp.
diff --git a/lib/cloud_controller/config_schemas/api_schema.rb b/lib/cloud_controller/config_schemas/api_schema.rb
index ac952146dc3..7c9b685478c 100644
--- a/lib/cloud_controller/config_schemas/api_schema.rb
+++ b/lib/cloud_controller/config_schemas/api_schema.rb
@@ -109,6 +109,7 @@ class ApiSchema < VCAP::Config
optional(:migration_psql_concurrent_statement_timeout_in_seconds) => Integer,
optional(:migration_psql_worker_memory_kb) => Integer,
optional(:skip_bigint_id_migration) => bool,
+ optional(:skip_was_running_backfill) => bool,
db: {
optional(:database) => Hash, # db connection hash for sequel
max_connections: Integer, # max connections in the connection pool
diff --git a/lib/cloud_controller/config_schemas/migrate_schema.rb b/lib/cloud_controller/config_schemas/migrate_schema.rb
index 971ff0999aa..eff4f6e2d14 100644
--- a/lib/cloud_controller/config_schemas/migrate_schema.rb
+++ b/lib/cloud_controller/config_schemas/migrate_schema.rb
@@ -10,6 +10,7 @@ class MigrateSchema < VCAP::Config
optional(:migration_psql_concurrent_statement_timeout_in_seconds) => Integer,
optional(:migration_psql_worker_memory_kb) => Integer,
optional(:skip_bigint_id_migration) => bool,
+ optional(:skip_was_running_backfill) => bool,
db: {
optional(:database) => Hash, # db connection hash for sequel
diff --git a/lib/database/batch_delete.rb b/lib/database/batch_delete.rb
index 9fcc4b58fea..377eff097c9 100644
--- a/lib/database/batch_delete.rb
+++ b/lib/database/batch_delete.rb
@@ -11,10 +11,12 @@ def delete
total_count = 0
loop do
- set = dataset.limit(amount)
- break if set.empty?
+ # Fetch the batch's ids in the same query that checks for emptiness, so the
+ # (potentially expensive) filtered dataset is evaluated once per batch.
+ ids = dataset.limit(amount).select_map(:id)
+ break if ids.empty?
- total_count += delete_batch(set)
+ total_count += delete_batch(ids)
end
total_count
@@ -22,8 +24,8 @@ def delete
private
- def delete_batch(set)
- dataset.model.where(id: set.select_map(:id)).delete
+ def delete_batch(ids)
+ dataset.model.where(id: ids).delete
end
end
end
diff --git a/lib/database/old_record_cleanup.rb b/lib/database/old_record_cleanup.rb
index 44227d84507..a8b1aa236da 100644
--- a/lib/database/old_record_cleanup.rb
+++ b/lib/database/old_record_cleanup.rb
@@ -3,25 +3,33 @@
module Database
class OldRecordCleanup
class NoCurrentTimestampError < StandardError; end
- attr_reader :model, :days_ago, :keep_at_least_one_record
+ attr_reader :model, :cutoff_age_in_days, :keep_at_least_one_record, :keep_running_records
- def initialize(model, days_ago, keep_at_least_one_record: false)
+ def initialize(model, cutoff_age_in_days:, keep_at_least_one_record: false, keep_running_records: false)
@model = model
- @days_ago = days_ago
+ @cutoff_age_in_days = cutoff_age_in_days
@keep_at_least_one_record = keep_at_least_one_record
+ @keep_running_records = keep_running_records
end
+ # keep_running_records and keep_at_least_one_record compose: a still-running
+ # resource (a beginning-state event with no later ending-state event for the
+ # same resource) is always retained, and keep_at_least_one_record additionally
+ # protects the single newest row so the table is never fully emptied for
+ # clients that poll the most recent event.
def delete
- cutoff_date = current_timestamp_from_database - days_ago.to_i.days
-
+ cutoff_date = current_timestamp_from_database - cutoff_age_in_days.to_i.days
old_records = model.dataset.where(Sequel.lit('created_at < ?', cutoff_date))
- if keep_at_least_one_record
- last_record = model.order(:id).last
- old_records = old_records.where(Sequel.lit('id < ?', last_record.id)) if last_record
- end
- logger.info("Cleaning up #{old_records.count} #{model.table_name} table rows")
- Database::BatchDelete.new(old_records, 1000).delete
+ if keep_running_records
+ raise ArgumentError.new("keep_running_records requires #{model} to define .usage_lifecycles") unless model.respond_to?(:usage_lifecycles)
+
+ delete_keeping_running_records(old_records)
+ else
+ old_records = exclude_newest_record(old_records)
+ logger.info("Cleaning up #{old_records.count} #{model.table_name} table rows")
+ Database::BatchDelete.new(old_records, 1000).delete
+ end
end
private
@@ -35,5 +43,96 @@ def current_timestamp_from_database
def logger
@logger ||= Steno.logger('cc.old_record_cleanup')
end
+
+ # Deletes old records while retaining a usable billing baseline for
+ # still-running resources.
+ #
+ # For each lifecycle of the model, a beginning-state row (e.g.
+ # STARTED/CREATED/WAS_RUNNING) is prunable when:
+ # * a later ending-state row (e.g. STOPPED/DELETED) is also old -- the run is
+ # over; or
+ # * it is a superseded baseline: an earlier beginning of the same run and a
+ # later beginning both exist (and are old). Consumers only need the first
+ # beginning of the current run (the true start time) and the latest one (the
+ # current footprint); the in-between rows written by scaling/updating a
+ # running resource carry no baseline information.
+ #
+ # The deletes are ordered deliberately: prunable beginning rows are removed
+ # FIRST, while the rows that make them prunable still exist, so each beginning
+ # stays prunable until it is itself deleted. Only then are the ending rows (and
+ # any other, non-lifecycle states) removed. Reversing the order could strand a
+ # beginning row whose paired ending was deleted in an earlier batch.
+ def delete_keeping_running_records(old_records)
+ lifecycles = model.usage_lifecycles
+ prunable_beginnings = lifecycles.map { |lifecycle| prunable_beginnings_dataset(old_records, lifecycle) }
+
+ # Everything that is not a beginning-state row of some lifecycle (ending rows
+ # plus any other, non-lifecycle states) is unconditionally prunable.
+ all_beginning_states = lifecycles.flat_map { |lifecycle| lifecycle.fetch(:beginning_states) }
+ unconditional_records = exclude_newest_record(old_records.exclude(state: all_beginning_states))
+
+ deleted_count = prunable_beginnings.sum { |dataset| Database::BatchDelete.new(dataset, 1000).delete }
+ deleted_count += Database::BatchDelete.new(unconditional_records, 1000).delete
+
+ logger.info("Cleaned up #{deleted_count} #{model.table_name} table rows")
+ end
+
+ # Builds the dataset of old beginning-state rows that are prunable for one
+ # lifecycle. All correlations use the (state, guid, id) lifecycle index; higher
+ # id implies later creation throughout. The probes only consider OLD rows: a
+ # superseded beginning is kept until the row superseding it is itself old,
+ # which keeps the pruning decision stable for consumers reading within the
+ # retention window.
+ def prunable_beginnings_dataset(old_records, lifecycle)
+ beginning_states = lifecycle.fetch(:beginning_states)
+ ending_state = lifecycle.fetch(:ending_state)
+ guid_column = lifecycle.fetch(:guid_column)
+
+ old_beginnings = old_records.where(state: beginning_states)
+ old_endings = old_records.where(state: ending_state)
+ initial_records = old_beginnings.from_self(alias: :initial_records)
+
+ # The run is over: an ending row for the same resource was created later.
+ matching_ending = old_endings.from_self(alias: :final_records).
+ where(Sequel[:final_records][guid_column] => Sequel[:initial_records][guid_column]).
+ where { Sequel[:final_records][:id] > Sequel[:initial_records][:id] }.
+ select(1).exists
+
+ # Not the run's true start: an earlier beginning of the same run exists,
+ # i.e. one with no ending event between the two.
+ intervening_ending = old_endings.from_self(alias: :intervening_endings).
+ where(Sequel[:intervening_endings][guid_column] => Sequel[:earlier_beginnings][guid_column]).
+ where { Sequel[:intervening_endings][:id] > Sequel[:earlier_beginnings][:id] }.
+ where { Sequel[:intervening_endings][:id] < Sequel[:initial_records][:id] }.
+ select(1).exists
+ earlier_beginning_in_same_run = old_beginnings.from_self(alias: :earlier_beginnings).
+ where(Sequel[:earlier_beginnings][guid_column] => Sequel[:initial_records][guid_column]).
+ where { Sequel[:earlier_beginnings][:id] < Sequel[:initial_records][:id] }.
+ where(Sequel.~(intervening_ending)).
+ select(1).exists
+
+ # Not the latest baseline: a later beginning for the same resource exists.
+ later_beginning = old_beginnings.from_self(alias: :later_beginnings).
+ where(Sequel[:later_beginnings][guid_column] => Sequel[:initial_records][guid_column]).
+ where { Sequel[:later_beginnings][:id] > Sequel[:initial_records][:id] }.
+ select(1).exists
+
+ superseded_baseline = Sequel.&(earlier_beginning_in_same_run, later_beginning)
+ exclude_newest_record(initial_records.where(Sequel.|(matching_ending, superseded_baseline)))
+ end
+
+ # When keep_at_least_one_record is set, never delete the single newest row so
+ # the table always retains at least one record.
+ def exclude_newest_record(records)
+ return records unless keep_at_least_one_record && newest_record_id
+
+ records.where(Sequel.lit('id < ?', newest_record_id))
+ end
+
+ def newest_record_id
+ return @newest_record_id if defined?(@newest_record_id)
+
+ @newest_record_id = model.order(:id).last&.id
+ end
end
end
diff --git a/lib/database/was_running_backfill.rb b/lib/database/was_running_backfill.rb
new file mode 100644
index 00000000000..653f6ab4211
--- /dev/null
+++ b/lib/database/was_running_backfill.rb
@@ -0,0 +1,305 @@
+# Backfills synthetic WAS_RUNNING usage events (TASK_WAS_RUNNING for tasks) for
+# resources that were already running/existing when the keep-running cleanup
+# feature was introduced.
+#
+# Run from thin Sequel migrations as a batched, idempotent, resumable operation
+# (mirrors VCAP::BigintMigration): each batch is keyset-paginated by source-table
+# id and runs in its own transaction, well under the migration statement timeout.
+# Re-running is safe via the NOT EXISTS guard. Uses only raw Sequel — no Cloud
+# Controller models/repositories — because the schema, not the app code, is the
+# contract at migration time.
+# rubocop:disable Metrics/ModuleLength
+module VCAP::WasRunningBackfill
+ WAS_RUNNING = 'WAS_RUNNING'.freeze
+ # Task baselines use a distinct state because task events share the
+ # app_usage_events table with app events but carry an empty app_guid -- see
+ # Repositories::AppUsageEventRepository::TASK_WAS_RUNNING_EVENT_STATE.
+ TASK_WAS_RUNNING = 'TASK_WAS_RUNNING'.freeze
+ DEFAULT_BATCH_SIZE = 1000
+
+ class << self
+ # Operators can opt out (e.g. very large foundations, or downstream usage-event
+ # consumers that are not yet ready for the WAS_RUNNING state). Mirrors
+ # skip_bigint_id_migration. The flag is checked by the seed migrations rather
+ # than here, so the 'db:was_running_backfill' rake task can still seed the
+ # baseline after a skipped migration has been recorded as applied.
+ def skip?
+ VCAP::CloudController::Config.config&.get(:skip_was_running_backfill) || false
+ rescue VCAP::CloudController::Config::InvalidConfigPath
+ false
+ end
+
+ def log_skip(logger, kind)
+ logger.info("skipping WAS_RUNNING #{kind} usage event backfill (skip_was_running_backfill is set); " \
+ "run 'rake db:was_running_backfill' to seed the baseline later")
+ end
+
+ def seed_app_usage_events(db, logger, batch_size: DEFAULT_BATCH_SIZE)
+ uuid_fn = uuid_function(db)
+ each_batch(db[:processes].where(state: 'STARTED'), batch_size) do |low, high|
+ db.run(app_usage_events_insert_sql(uuid_fn, low, high))
+ logger.info("backfilled WAS_RUNNING app usage events up to process id #{high}")
+ end
+ sweep_stale_app_usage_events(db, logger, batch_size)
+ end
+
+ def seed_service_usage_events(db, logger, batch_size: DEFAULT_BATCH_SIZE)
+ uuid_fn = uuid_function(db)
+ each_batch(db[:service_instances], batch_size) do |low, high|
+ db.run(service_usage_events_insert_sql(uuid_fn, low, high))
+ logger.info("backfilled WAS_RUNNING service usage events up to service_instance id #{high}")
+ end
+ sweep_stale_service_usage_events(db, logger, batch_size)
+ end
+
+ def seed_task_usage_events(db, logger, batch_size: DEFAULT_BATCH_SIZE)
+ uuid_fn = uuid_function(db)
+ each_batch(db[:tasks].where(state: 'RUNNING'), batch_size) do |low, high|
+ db.run(task_usage_events_insert_sql(uuid_fn, low, high))
+ logger.info("backfilled TASK_WAS_RUNNING app usage events up to task id #{high}")
+ end
+ sweep_stale_task_usage_events(db, logger, batch_size)
+ end
+
+ def delete_app_usage_events(db, batch_size: DEFAULT_BATCH_SIZE)
+ delete_was_running(db, :app_usage_events, batch_size, state: WAS_RUNNING)
+ end
+
+ def delete_service_usage_events(db, batch_size: DEFAULT_BATCH_SIZE)
+ delete_was_running(db, :service_usage_events, batch_size, state: WAS_RUNNING)
+ end
+
+ def delete_task_usage_events(db, batch_size: DEFAULT_BATCH_SIZE)
+ delete_was_running(db, :app_usage_events, batch_size, state: TASK_WAS_RUNNING)
+ end
+
+ private
+
+ # The latest-package / latest-droplet aggregates are scoped to the batch's apps
+ # so they never scan the whole packages/droplets tables (the cost that would
+ # otherwise blow the migration statement timeout).
+ #
+ # The COALESCEs are defensive: processes.memory/instances and apps.name are
+ # nullable columns whose defaults are normally backfilled by the model layer,
+ # but the target event columns are NOT NULL — one legacy NULL row written
+ # outside the models must not abort the whole migration.
+ #
+ # previous_state is NULL, not the current state. purge_and_reseed_started_apps!
+ # writes previous_state == state (STARTED/STARTED) to mean "no change, current
+ # snapshot" — safe only because its state is the real STARTED. Our state is the
+ # synthetic WAS_RUNNING, so STARTED here would invent a STARTED->WAS_RUNNING
+ # transition, and WAS_RUNNING would imply an earlier WAS_RUNNING event that
+ # never actually happened.
+ def app_usage_events_insert_sql(uuid_fn, low, high)
+ batch_apps = "SELECT app_guid FROM processes WHERE id > #{low} AND id <= #{high} AND state = 'STARTED'"
+ <<~SQL.squish
+ INSERT INTO app_usage_events (
+ guid, created_at,
+ state, previous_state,
+ instance_count, previous_instance_count,
+ memory_in_mb_per_instance, previous_memory_in_mb_per_instance,
+ app_guid, app_name,
+ parent_app_guid, parent_app_name,
+ process_type,
+ space_guid, space_name, org_guid,
+ buildpack_guid, buildpack_name,
+ package_state, previous_package_state
+ )
+ SELECT
+ #{uuid_fn}, CURRENT_TIMESTAMP,
+ '#{WAS_RUNNING}', NULL,
+ COALESCE(p.instances, 0), COALESCE(p.instances, 0),
+ COALESCE(p.memory, 0), COALESCE(p.memory, 0),
+ p.guid, COALESCE(parent_app.name, ''),
+ parent_app.guid, COALESCE(parent_app.name, ''),
+ p.type,
+ spaces.guid, spaces.name, organizations.guid,
+ desired_droplet.buildpack_receipt_buildpack_guid, desired_droplet.buildpack_receipt_buildpack,
+ CASE
+ WHEN latest_droplet.state = 'FAILED' THEN 'FAILED'
+ WHEN latest_droplet.state = 'STAGED' AND latest_droplet.guid = parent_app.droplet_guid THEN 'STAGED'
+ WHEN latest_package.state = 'FAILED' THEN 'FAILED'
+ ELSE 'PENDING'
+ END,
+ 'UNKNOWN'
+ FROM processes p
+ INNER JOIN apps parent_app ON parent_app.guid = p.app_guid
+ INNER JOIN spaces ON spaces.guid = parent_app.space_guid
+ INNER JOIN organizations ON organizations.id = spaces.organization_id
+ LEFT JOIN droplets desired_droplet ON desired_droplet.guid = parent_app.droplet_guid
+ LEFT JOIN (
+ SELECT pkg.guid, pkg.app_guid, pkg.state FROM packages pkg
+ INNER JOIN (
+ SELECT app_guid, MAX(id) AS max_id FROM packages
+ WHERE app_guid IN (#{batch_apps}) GROUP BY app_guid
+ ) lp_ids ON lp_ids.app_guid = pkg.app_guid AND lp_ids.max_id = pkg.id
+ ) latest_package ON latest_package.app_guid = parent_app.guid
+ LEFT JOIN (
+ SELECT d.guid, d.package_guid, d.state FROM droplets d
+ INNER JOIN (
+ SELECT package_guid, MAX(id) AS max_id FROM droplets
+ WHERE package_guid IN (SELECT guid FROM packages WHERE app_guid IN (#{batch_apps}))
+ GROUP BY package_guid
+ ) ld_ids ON ld_ids.package_guid = d.package_guid AND ld_ids.max_id = d.id
+ ) latest_droplet ON latest_droplet.package_guid = latest_package.guid
+ WHERE p.id > #{low} AND p.id <= #{high}
+ AND p.state = 'STARTED'
+ AND NOT EXISTS (
+ SELECT 1 FROM app_usage_events WHERE state = '#{WAS_RUNNING}' AND app_guid = p.guid
+ )
+ SQL
+ end
+
+ def service_usage_events_insert_sql(uuid_fn, low, high)
+ <<~SQL.squish
+ INSERT INTO service_usage_events (
+ guid, created_at, state,
+ service_instance_guid, service_instance_name, service_instance_type,
+ service_plan_guid, service_plan_name,
+ service_guid, service_label,
+ service_broker_name, service_broker_guid,
+ space_guid, space_name, org_guid
+ )
+ SELECT
+ #{uuid_fn}, CURRENT_TIMESTAMP, '#{WAS_RUNNING}',
+ service_instances.guid, service_instances.name,
+ CASE WHEN service_instances.is_gateway_service THEN 'managed_service_instance' ELSE 'user_provided_service_instance' END,
+ service_plans.guid, service_plans.name,
+ services.guid, services.label,
+ service_brokers.name, service_brokers.guid,
+ spaces.guid, spaces.name, organizations.guid
+ FROM service_instances
+ INNER JOIN spaces ON spaces.id = service_instances.space_id
+ INNER JOIN organizations ON organizations.id = spaces.organization_id
+ LEFT OUTER JOIN service_plans ON service_plans.id = service_instances.service_plan_id
+ LEFT OUTER JOIN services ON services.id = service_plans.service_id
+ LEFT OUTER JOIN service_brokers ON service_brokers.id = services.service_broker_id
+ WHERE service_instances.id > #{low} AND service_instances.id <= #{high}
+ AND NOT EXISTS (
+ SELECT 1 FROM service_usage_events WHERE state = '#{WAS_RUNNING}' AND service_instance_guid = service_instances.guid
+ )
+ SQL
+ end
+
+ # Mirrors AppUsageEventRepository#create_from_task: task events carry an
+ # empty app_guid/app_name and are keyed by task_guid instead. The COALESCE
+ # on memory_in_mb is defensive -- it is a nullable legacy column whose
+ # default is normally backfilled by the model layer.
+ #
+ # previous_state is NULL for the same reason as the app baseline (see
+ # app_usage_events_insert_sql): TASK_WAS_RUNNING is synthetic, so RUNNING here
+ # would invent a RUNNING->TASK_WAS_RUNNING transition that never happened.
+ def task_usage_events_insert_sql(uuid_fn, low, high)
+ <<~SQL.squish
+ INSERT INTO app_usage_events (
+ guid, created_at,
+ state, previous_state,
+ instance_count, previous_instance_count,
+ memory_in_mb_per_instance, previous_memory_in_mb_per_instance,
+ app_guid, app_name,
+ parent_app_guid, parent_app_name,
+ space_guid, space_name, org_guid,
+ package_state, previous_package_state,
+ task_guid, task_name
+ )
+ SELECT
+ #{uuid_fn}, CURRENT_TIMESTAMP,
+ '#{TASK_WAS_RUNNING}', NULL,
+ 1, 1,
+ COALESCE(t.memory_in_mb, 0), COALESCE(t.memory_in_mb, 0),
+ '', '',
+ parent_app.guid, COALESCE(parent_app.name, ''),
+ spaces.guid, spaces.name, organizations.guid,
+ 'STAGED', 'STAGED',
+ t.guid, t.name
+ FROM tasks t
+ INNER JOIN apps parent_app ON parent_app.guid = t.app_guid
+ INNER JOIN spaces ON spaces.guid = parent_app.space_guid
+ INNER JOIN organizations ON organizations.id = spaces.organization_id
+ WHERE t.id > #{low} AND t.id <= #{high}
+ AND t.state = 'RUNNING'
+ AND NOT EXISTS (
+ SELECT 1 FROM app_usage_events WHERE state = '#{TASK_WAS_RUNNING}' AND task_guid = t.guid
+ )
+ SQL
+ end
+
+ # The API stays live while the backfill runs, so a batch's INSERT..SELECT can
+ # race a concurrent stop/delete: the statement's snapshot still sees the
+ # resource as running and inserts a WAS_RUNNING row with a HIGHER id than the
+ # concurrent STOPPED/DELETED event. No later ending event would ever arrive
+ # for that resource, so the keep-running cleanup would retain the row forever
+ # and consumers would read the stopped resource as still running. Sweep such
+ # rows afterwards in id-keyed batches (like the rollback deletes) -- the
+ # predicate is re-checked against current resource state each batch, and no
+ # single DELETE (or its lock hold) grows large enough to risk the timeout.
+ def sweep_stale_app_usage_events(db, logger, batch_size)
+ stale = db[:app_usage_events].
+ where(state: WAS_RUNNING).
+ where(Sequel.lit("NOT EXISTS (SELECT 1 FROM processes WHERE processes.guid = app_usage_events.app_guid AND processes.state = 'STARTED')"))
+ deleted = batch_delete(db, :app_usage_events, stale, batch_size)
+ logger.info("swept #{deleted} stale WAS_RUNNING app usage events") if deleted.positive?
+ end
+
+ def sweep_stale_service_usage_events(db, logger, batch_size)
+ stale = db[:service_usage_events].
+ where(state: WAS_RUNNING).
+ where(Sequel.lit('NOT EXISTS (SELECT 1 FROM service_instances WHERE service_instances.guid = service_usage_events.service_instance_guid)'))
+ deleted = batch_delete(db, :service_usage_events, stale, batch_size)
+ logger.info("swept #{deleted} stale WAS_RUNNING service usage events") if deleted.positive?
+ end
+
+ def sweep_stale_task_usage_events(db, logger, batch_size)
+ stale = db[:app_usage_events].
+ where(state: TASK_WAS_RUNNING).
+ where(Sequel.lit("NOT EXISTS (SELECT 1 FROM tasks WHERE tasks.guid = app_usage_events.task_guid AND tasks.state = 'RUNNING')"))
+ deleted = batch_delete(db, :app_usage_events, stale, batch_size)
+ logger.info("swept #{deleted} stale TASK_WAS_RUNNING app usage events") if deleted.positive?
+ end
+
+ # Keyset-paginate over a source dataset by id, yielding the (exclusive-low,
+ # inclusive-high) id bounds of each batch inside its own transaction.
+ def each_batch(source, batch_size)
+ cursor = 0
+ loop do
+ high = source.where(Sequel.lit('id > ?', cursor)).order(:id).limit(batch_size).max(:id)
+ break if high.nil?
+
+ # READ COMMITTED keeps MySQL's INSERT..SELECT from taking shared next-key
+ # locks on every scanned source row while the API serves traffic (safe:
+ # CF MySQL releases run with binlog_format=ROW). On Postgres it is the
+ # default isolation level anyway.
+ source.db.transaction(isolation: :committed) { yield(cursor, high) }
+ cursor = high
+ end
+ end
+
+ # Delete every row matching `dataset` in batches of `batch_size` ids so
+ # neither the statement nor its lock hold ever grows large enough to risk the
+ # migration statement timeout. Returns the number of rows deleted. Re-selecting
+ # the dataset each pass is what lets the sweeps re-check current resource state.
+ def batch_delete(db, table, dataset, batch_size)
+ deleted = 0
+ loop do
+ ids = dataset.limit(batch_size).select_map(:id)
+ break if ids.empty?
+
+ deleted += db[table].where(id: ids).delete
+ end
+ deleted
+ end
+
+ def delete_was_running(db, table, batch_size, state:)
+ batch_delete(db, table, db[table].where(state: state), batch_size)
+ end
+
+ def uuid_function(db)
+ case db.database_type
+ when :postgres then 'get_uuid()'
+ when :mysql then 'UUID()'
+ else raise "unsupported database: #{db.database_type}"
+ end
+ end
+ end
+end
+# rubocop:enable Metrics/ModuleLength
diff --git a/lib/tasks/db.rake b/lib/tasks/db.rake
index 8cbdb229c9b..2530577e626 100644
--- a/lib/tasks/db.rake
+++ b/lib/tasks/db.rake
@@ -192,6 +192,23 @@ namespace :db do
VCAP::BigintMigration.backfill(logger, db, args.table.to_sym, batch_size: args.batch_size.to_i, iterations: args.iterations.to_i)
end
+ desc 'Seed WAS_RUNNING usage events for running apps, running tasks, and existing service instances ' \
+ '(e.g. after the seed migrations were skipped via skip_was_running_backfill)'
+ task :was_running_backfill, %i[batch_size] => :environment do |_t, args|
+ args.with_defaults(batch_size: 1000)
+
+ RakeConfig.context = :migrate
+
+ require 'database/was_running_backfill'
+ logging_output
+ logger = Steno.logger('cc.db.was_running_backfill')
+ RakeConfig.config.load_db_encryption_key
+ db = VCAP::CloudController::DB.connect(RakeConfig.config.get(:db), logger)
+ VCAP::WasRunningBackfill.seed_app_usage_events(db, logger, batch_size: args.batch_size.to_i)
+ VCAP::WasRunningBackfill.seed_task_usage_events(db, logger, batch_size: args.batch_size.to_i)
+ VCAP::WasRunningBackfill.seed_service_usage_events(db, logger, batch_size: args.batch_size.to_i)
+ end
+
namespace :dev do
desc 'Migrate the database set in spec/support/bootstrap/db_config'
task migrate: :environment do
diff --git a/spec/api/documentation/app_usage_event_api_spec.rb b/spec/api/documentation/app_usage_event_api_spec.rb
index 4c7ff0c07a9..f8e45e421e0 100644
--- a/spec/api/documentation/app_usage_event_api_spec.rb
+++ b/spec/api/documentation/app_usage_event_api_spec.rb
@@ -47,7 +47,7 @@
"The desired state of the app or 'BUILDPACK_SET' when buildpack info has been set.",
required: false,
readonly: true,
- valid_values: %w[STARTED STOPPED BUILDPACK_SET TASK_STARTED TASK_STOPPED]
+ valid_values: %w[STARTED STOPPED WAS_RUNNING BUILDPACK_SET TASK_STARTED TASK_STOPPED TASK_WAS_RUNNING]
field :task_guid, 'The GUID of the task if one exists.', required: false, readonly: true, experimental: true
field :task_name, 'The NAME of the task if one exists.', required: false, readonly: true, experimental: true
diff --git a/spec/api/documentation/service_usage_events_api_spec.rb b/spec/api/documentation/service_usage_events_api_spec.rb
index 4d3e5dc3cda..94c707d71b3 100644
--- a/spec/api/documentation/service_usage_events_api_spec.rb
+++ b/spec/api/documentation/service_usage_events_api_spec.rb
@@ -13,7 +13,7 @@
get '/v2/service_usage_events' do
field :guid, 'The guid of the event.', required: false
- field :state, 'The desired state of the service.', required: false, readonly: true, valid_values: %w[CREATED DELETED UPDATED]
+ field :state, 'The desired state of the service.', required: false, readonly: true, valid_values: %w[CREATED DELETED UPDATED WAS_RUNNING]
field :org_guid, 'The GUID of the organization.', required: false, readonly: true
field :space_guid, 'The GUID of the space.', required: false, readonly: true
field :space_name, 'The name of the space.', required: false, readonly: true
diff --git a/spec/migrations/20260601120000_add_lifecycle_index_to_usage_events_spec.rb b/spec/migrations/20260601120000_add_lifecycle_index_to_usage_events_spec.rb
new file mode 100644
index 00000000000..3180fbf9456
--- /dev/null
+++ b/spec/migrations/20260601120000_add_lifecycle_index_to_usage_events_spec.rb
@@ -0,0 +1,44 @@
+require 'spec_helper'
+require 'migrations/helpers/migration_shared_context'
+
+RSpec.describe 'migration to add the lifecycle index to the usage event tables', isolation: :truncation, type: :migration do
+ include_context 'migration' do
+ let(:migration_filename) { '20260601120000_add_lifecycle_index_to_usage_events.rb' }
+ end
+
+ let(:run_migration) do
+ Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true)
+ end
+
+ let(:revert_migration) do
+ Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true)
+ end
+
+ it 'adds the lifecycle index to both usage event tables (idempotently) and removes it on revert' do
+ # Before migration: the lifecycle indexes should not exist.
+ expect(db.indexes(:app_usage_events)).not_to include(:app_usage_events_lifecycle_index)
+ expect(db.indexes(:service_usage_events)).not_to include(:service_usage_events_lifecycle_index)
+
+ # Up migration adds both indexes with the expected column order.
+ expect { run_migration }.not_to raise_error
+ expect(db.indexes(:app_usage_events)).to include(:app_usage_events_lifecycle_index)
+ expect(db.indexes(:service_usage_events)).to include(:service_usage_events_lifecycle_index)
+ expect(db.indexes(:app_usage_events)[:app_usage_events_lifecycle_index][:columns]).to eq(%i[state app_guid id])
+ expect(db.indexes(:service_usage_events)[:service_usage_events_lifecycle_index][:columns]).to eq(%i[state service_instance_guid id])
+
+ # Up migration is idempotent: running again does not fail.
+ expect { run_migration }.not_to raise_error
+ expect(db.indexes(:app_usage_events)).to include(:app_usage_events_lifecycle_index)
+ expect(db.indexes(:service_usage_events)).to include(:service_usage_events_lifecycle_index)
+
+ # Down migration removes both indexes.
+ expect { revert_migration }.not_to raise_error
+ expect(db.indexes(:app_usage_events)).not_to include(:app_usage_events_lifecycle_index)
+ expect(db.indexes(:service_usage_events)).not_to include(:service_usage_events_lifecycle_index)
+
+ # Down migration is idempotent: running again does not fail.
+ expect { revert_migration }.not_to raise_error
+ expect(db.indexes(:app_usage_events)).not_to include(:app_usage_events_lifecycle_index)
+ expect(db.indexes(:service_usage_events)).not_to include(:service_usage_events_lifecycle_index)
+ end
+end
diff --git a/spec/migrations/20260601120100_seed_was_running_app_usage_events_spec.rb b/spec/migrations/20260601120100_seed_was_running_app_usage_events_spec.rb
new file mode 100644
index 00000000000..877cc3f04de
--- /dev/null
+++ b/spec/migrations/20260601120100_seed_was_running_app_usage_events_spec.rb
@@ -0,0 +1,140 @@
+require 'spec_helper'
+require 'migrations/helpers/migration_shared_context'
+
+RSpec.describe 'migration to seed WAS_RUNNING events for currently-running app processes', isolation: :truncation, type: :migration do
+ include_context 'migration' do
+ let(:migration_filename) { '20260601120100_seed_was_running_app_usage_events.rb' }
+ end
+
+ let(:run_migration) do
+ Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true)
+ end
+
+ let(:revert_migration) do
+ Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true)
+ end
+
+ # Builds an org/space scaffold and returns the space guid that apps can reference.
+ def seed_space(suffix)
+ quota_id = db[:quota_definitions].insert(guid: "quota-#{suffix}", name: "quota-#{suffix}", non_basic_services_allowed: true,
+ total_services: 10, memory_limit: 1024, total_routes: 10)
+ org_id = db[:organizations].insert(guid: "org-#{suffix}", name: "org-#{suffix}", quota_definition_id: quota_id)
+ db[:spaces].insert(guid: "space-#{suffix}", name: "space-#{suffix}", organization_id: org_id)
+ "space-#{suffix}"
+ end
+
+ def seed_app_event(suffix, state:, app_guid:)
+ db[:app_usage_events].insert(guid: "event-#{suffix}", created_at: Time.now.utc, state: state,
+ instance_count: 1, memory_in_mb_per_instance: 1, app_guid: app_guid, app_name: "app-#{suffix}",
+ space_guid: "space-#{suffix}", space_name: "space-#{suffix}", org_guid: "org-#{suffix}")
+ end
+
+ describe 'up migration' do
+ it 'seeds WAS_RUNNING rows only for started processes, skips stopped ones, and preserves existing rows' do
+ space_guid = seed_space('main')
+
+ # A running process with no droplet -> package_state PENDING
+ db[:apps].insert(guid: 'app-pending', name: 'pending-app', space_guid: space_guid)
+ db[:processes].insert(guid: 'proc-pending', app_guid: 'app-pending', state: 'STARTED', instances: 3, memory: 512, type: 'web')
+
+ # A running process whose app points at a STAGED droplet -> package_state STAGED.
+ # Insert order avoids the apps.droplet_guid <-> droplets <-> packages.app_guid FK cycle.
+ db[:apps].insert(guid: 'app-staged', name: 'staged-app', space_guid: space_guid)
+ db[:packages].insert(guid: 'pkg-staged', app_guid: 'app-staged', state: 'READY')
+ db[:droplets].insert(guid: 'drop-staged', package_guid: 'pkg-staged', state: 'STAGED')
+ db[:apps].where(guid: 'app-staged').update(droplet_guid: 'drop-staged')
+ db[:processes].insert(guid: 'proc-staged', app_guid: 'app-staged', state: 'STARTED', instances: 1, memory: 256, type: 'web')
+
+ # A running process whose latest droplet FAILED -> package_state FAILED
+ db[:apps].insert(guid: 'app-faildrop', name: 'faildrop-app', space_guid: space_guid)
+ db[:packages].insert(guid: 'pkg-faildrop', app_guid: 'app-faildrop', state: 'READY')
+ db[:droplets].insert(guid: 'drop-faildrop', package_guid: 'pkg-faildrop', state: 'FAILED')
+ db[:apps].where(guid: 'app-faildrop').update(droplet_guid: 'drop-faildrop')
+ db[:processes].insert(guid: 'proc-faildrop', app_guid: 'app-faildrop', state: 'STARTED', instances: 1, memory: 128, type: 'web')
+
+ # A running process whose latest package FAILED (and has no droplet) -> package_state FAILED
+ db[:apps].insert(guid: 'app-failpkg', name: 'failpkg-app', space_guid: space_guid)
+ db[:packages].insert(guid: 'pkg-failpkg', app_guid: 'app-failpkg', state: 'FAILED')
+ db[:processes].insert(guid: 'proc-failpkg', app_guid: 'app-failpkg', state: 'STARTED', instances: 1, memory: 128, type: 'web')
+
+ # A stopped process -> no WAS_RUNNING row
+ db[:apps].insert(guid: 'app-stopped', name: 'stopped-app', space_guid: space_guid)
+ db[:processes].insert(guid: 'proc-stopped', app_guid: 'app-stopped', state: 'STOPPED', instances: 1, memory: 128, type: 'web')
+
+ # A running process that already has a WAS_RUNNING row -> not duplicated
+ db[:apps].insert(guid: 'app-existing', name: 'existing-app', space_guid: space_guid)
+ db[:processes].insert(guid: 'proc-existing', app_guid: 'app-existing', state: 'STARTED', instances: 1, memory: 128, type: 'web')
+ seed_app_event('existing', state: 'WAS_RUNNING', app_guid: 'proc-existing')
+
+ # An unrelated pre-existing row that must be preserved (no truncate)
+ preexisting_id = seed_app_event('unrelated', state: 'STARTED', app_guid: 'some-other-guid')
+
+ run_migration
+
+ was_running = db[:app_usage_events].where(state: 'WAS_RUNNING')
+ # One row each for proc-pending, proc-staged, proc-faildrop and proc-failpkg, plus the
+ # pre-seeded proc-existing row (not duplicated).
+ expect(was_running.count).to eq(5)
+ expect(was_running.where(app_guid: 'proc-stopped').count).to eq(0)
+ expect(was_running.where(app_guid: 'proc-existing').count).to eq(1)
+ expect(db[:app_usage_events].where(id: preexisting_id).count).to eq(1)
+
+ pending_row = was_running.where(app_guid: 'proc-pending').first
+ expect(pending_row[:guid]).to be_present
+ expect(pending_row[:previous_state]).to be_nil
+ expect(pending_row[:app_name]).to eq('pending-app')
+ expect(pending_row[:parent_app_guid]).to eq('app-pending')
+ expect(pending_row[:parent_app_name]).to eq('pending-app')
+ expect(pending_row[:process_type]).to eq('web')
+ expect(pending_row[:space_guid]).to eq(space_guid)
+ expect(pending_row[:space_name]).to eq('space-main')
+ expect(pending_row[:org_guid]).to eq('org-main')
+ expect(pending_row[:instance_count]).to eq(3)
+ expect(pending_row[:previous_instance_count]).to eq(3)
+ expect(pending_row[:memory_in_mb_per_instance]).to eq(512)
+ expect(pending_row[:previous_memory_in_mb_per_instance]).to eq(512)
+ expect(pending_row[:package_state]).to eq('PENDING')
+ expect(pending_row[:previous_package_state]).to eq('UNKNOWN')
+
+ expect(was_running.where(app_guid: 'proc-staged').first[:package_state]).to eq('STAGED')
+ expect(was_running.where(app_guid: 'proc-faildrop').first[:package_state]).to eq('FAILED')
+ expect(was_running.where(app_guid: 'proc-failpkg').first[:package_state]).to eq('FAILED')
+
+ # Idempotency of the seeding itself (the NOT EXISTS guard) is covered in
+ # spec/unit/lib/database/was_running_backfill_spec.rb, where seed_app_usage_events
+ # runs twice; the migrator does not re-apply an already-recorded migration.
+ end
+
+ context 'when skip_was_running_backfill is set' do
+ before do
+ allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_was_running_backfill).and_return(true)
+ end
+
+ it 'does not seed any WAS_RUNNING rows' do
+ space_guid = seed_space('main')
+ db[:apps].insert(guid: 'app-skip', name: 'skip-app', space_guid: space_guid)
+ db[:processes].insert(guid: 'proc-skip', app_guid: 'app-skip', state: 'STARTED', instances: 1, memory: 128, type: 'web')
+
+ run_migration
+
+ expect(db[:app_usage_events].where(state: 'WAS_RUNNING').count).to eq(0)
+ end
+ end
+ end
+
+ describe 'down migration' do
+ it 'removes only the WAS_RUNNING rows' do
+ space_guid = seed_space('main')
+ db[:apps].insert(guid: 'app-down', name: 'down-app', space_guid: space_guid)
+ db[:processes].insert(guid: 'proc-down', app_guid: 'app-down', state: 'STARTED', instances: 1, memory: 128, type: 'web')
+ unrelated_id = seed_app_event('unrelated', state: 'STARTED', app_guid: 'some-other-guid')
+
+ run_migration
+ expect(db[:app_usage_events].where(state: 'WAS_RUNNING').count).to eq(1)
+
+ revert_migration
+ expect(db[:app_usage_events].where(state: 'WAS_RUNNING').count).to eq(0)
+ expect(db[:app_usage_events].where(id: unrelated_id).count).to eq(1)
+ end
+ end
+end
diff --git a/spec/migrations/20260601120200_seed_was_running_service_usage_events_spec.rb b/spec/migrations/20260601120200_seed_was_running_service_usage_events_spec.rb
new file mode 100644
index 00000000000..7d0f79a296b
--- /dev/null
+++ b/spec/migrations/20260601120200_seed_was_running_service_usage_events_spec.rb
@@ -0,0 +1,124 @@
+require 'spec_helper'
+require 'migrations/helpers/migration_shared_context'
+
+RSpec.describe 'migration to seed WAS_RUNNING events for existing service instances', isolation: :truncation, type: :migration do
+ include_context 'migration' do
+ let(:migration_filename) { '20260601120200_seed_was_running_service_usage_events.rb' }
+ end
+
+ let(:run_migration) do
+ Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true)
+ end
+
+ let(:revert_migration) do
+ Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true)
+ end
+
+ # Builds an org/space scaffold and returns the space id that instances can reference.
+ def seed_space(suffix)
+ quota_id = db[:quota_definitions].insert(guid: "quota-#{suffix}", name: "quota-#{suffix}", non_basic_services_allowed: true,
+ total_services: 10, memory_limit: 1024, total_routes: 10)
+ org_id = db[:organizations].insert(guid: "org-#{suffix}", name: "org-#{suffix}", quota_definition_id: quota_id)
+ db[:spaces].insert(guid: "space-#{suffix}", name: "space-#{suffix}", organization_id: org_id)
+ end
+
+ # Builds a broker -> service -> plan chain and returns the plan id.
+ def seed_plan(suffix)
+ broker_id = db[:service_brokers].insert(guid: "broker-#{suffix}", name: "broker-#{suffix}", broker_url: 'http://example.com', auth_password: 'pw')
+ service_id = db[:services].insert(guid: "service-#{suffix}", label: "service-#{suffix}", description: 'desc', bindable: true, service_broker_id: broker_id)
+ db[:service_plans].insert(guid: "plan-#{suffix}", name: "plan-#{suffix}", description: 'desc', free: true, service_id: service_id, unique_id: "plan-unique-#{suffix}")
+ end
+
+ def seed_service_event(suffix, state:, service_instance_guid:)
+ db[:service_usage_events].insert(guid: "event-#{suffix}", created_at: Time.now.utc, state: state,
+ org_guid: 'org-main', space_guid: 'space-main', space_name: 'space-main',
+ service_instance_guid: service_instance_guid, service_instance_name: "instance-#{suffix}",
+ service_instance_type: 'managed_service_instance')
+ end
+
+ describe 'up migration' do
+ it 'seeds one WAS_RUNNING row per instance with the correct type and preserves existing rows' do
+ space_id = seed_space('main')
+ plan_id = seed_plan('main')
+
+ # A managed instance -> managed_service_instance type with full broker chain.
+ db[:service_instances].insert(guid: 'managed-guid', name: 'my-instance', space_id: space_id, is_gateway_service: true, service_plan_id: plan_id)
+ # A user-provided instance -> user_provided_service_instance type with NULL plan/service/broker.
+ db[:service_instances].insert(guid: 'upsi-guid', name: 'upsi', space_id: space_id, is_gateway_service: false)
+
+ # A managed instance that already has a WAS_RUNNING row -> not duplicated.
+ db[:service_instances].insert(guid: 'existing-guid', name: 'existing', space_id: space_id, is_gateway_service: true, service_plan_id: plan_id)
+ seed_service_event('existing', state: 'WAS_RUNNING', service_instance_guid: 'existing-guid')
+
+ # An unrelated pre-existing row that must be preserved (no truncate).
+ preexisting_id = seed_service_event('unrelated', state: 'CREATED', service_instance_guid: 'managed-guid')
+
+ run_migration
+
+ was_running = db[:service_usage_events].where(state: 'WAS_RUNNING')
+ # One row each for managed-guid and upsi-guid, plus the pre-seeded existing-guid row (not duplicated).
+ expect(was_running.count).to eq(3)
+ expect(was_running.where(service_instance_guid: 'existing-guid').count).to eq(1)
+ expect(db[:service_usage_events].where(id: preexisting_id).count).to eq(1)
+
+ managed_row = was_running.where(service_instance_guid: 'managed-guid').first
+ expect(managed_row[:guid]).to be_present
+ expect(managed_row[:service_instance_name]).to eq('my-instance')
+ expect(managed_row[:service_instance_type]).to eq('managed_service_instance')
+ expect(managed_row[:service_plan_guid]).to eq('plan-main')
+ expect(managed_row[:service_plan_name]).to eq('plan-main')
+ expect(managed_row[:service_guid]).to eq('service-main')
+ expect(managed_row[:service_label]).to eq('service-main')
+ expect(managed_row[:service_broker_name]).to eq('broker-main')
+ expect(managed_row[:service_broker_guid]).to eq('broker-main')
+ expect(managed_row[:space_guid]).to eq('space-main')
+ expect(managed_row[:space_name]).to eq('space-main')
+ expect(managed_row[:org_guid]).to eq('org-main')
+
+ upsi_row = was_running.where(service_instance_guid: 'upsi-guid').first
+ expect(upsi_row[:service_instance_type]).to eq('user_provided_service_instance')
+ expect(upsi_row[:service_plan_guid]).to be_nil
+ expect(upsi_row[:service_plan_name]).to be_nil
+ expect(upsi_row[:service_guid]).to be_nil
+ expect(upsi_row[:service_label]).to be_nil
+ expect(upsi_row[:service_broker_name]).to be_nil
+ expect(upsi_row[:service_broker_guid]).to be_nil
+
+ # Idempotency of the seeding itself (the NOT EXISTS guard) is covered in
+ # spec/unit/lib/database/was_running_backfill_spec.rb, where seed_service_usage_events
+ # runs twice; the migrator does not re-apply an already-recorded migration.
+ end
+
+ context 'when skip_was_running_backfill is set' do
+ before do
+ allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_was_running_backfill).and_return(true)
+ end
+
+ it 'does not seed any WAS_RUNNING rows' do
+ space_id = seed_space('main')
+ plan_id = seed_plan('main')
+ db[:service_instances].insert(guid: 'managed-guid', name: 'my-instance', space_id: space_id, is_gateway_service: true, service_plan_id: plan_id)
+
+ run_migration
+
+ expect(db[:service_usage_events].where(state: 'WAS_RUNNING').count).to eq(0)
+ end
+ end
+ end
+
+ describe 'down migration' do
+ it 'removes only the WAS_RUNNING rows' do
+ space_id = seed_space('main')
+ plan_id = seed_plan('main')
+ db[:service_instances].insert(guid: 'managed-guid', name: 'my-instance', space_id: space_id, is_gateway_service: true, service_plan_id: plan_id)
+ unrelated_id = seed_service_event('unrelated', state: 'CREATED', service_instance_guid: 'managed-guid')
+
+ run_migration
+ expect(db[:service_usage_events].where(state: 'WAS_RUNNING').count).to eq(1)
+
+ revert_migration
+ expect(db[:service_usage_events].where(state: 'WAS_RUNNING').count).to eq(0)
+ expect(db[:service_usage_events].where(id: unrelated_id).count).to eq(1)
+ end
+ end
+end
diff --git a/spec/migrations/20260601120300_seed_was_running_task_usage_events_spec.rb b/spec/migrations/20260601120300_seed_was_running_task_usage_events_spec.rb
new file mode 100644
index 00000000000..e701c883c8e
--- /dev/null
+++ b/spec/migrations/20260601120300_seed_was_running_task_usage_events_spec.rb
@@ -0,0 +1,117 @@
+require 'spec_helper'
+require 'migrations/helpers/migration_shared_context'
+
+RSpec.describe 'migration to seed TASK_WAS_RUNNING events for currently-running tasks', isolation: :truncation, type: :migration do
+ include_context 'migration' do
+ let(:migration_filename) { '20260601120300_seed_was_running_task_usage_events.rb' }
+ end
+
+ let(:run_migration) do
+ Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true)
+ end
+
+ let(:revert_migration) do
+ Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true)
+ end
+
+ # Builds an org/space/app scaffold and returns the app guid that tasks can reference.
+ def seed_app(suffix)
+ quota_id = db[:quota_definitions].insert(guid: "quota-#{suffix}", name: "quota-#{suffix}", non_basic_services_allowed: true,
+ total_services: 10, memory_limit: 1024, total_routes: 10)
+ org_id = db[:organizations].insert(guid: "org-#{suffix}", name: "org-#{suffix}", quota_definition_id: quota_id)
+ db[:spaces].insert(guid: "space-#{suffix}", name: "space-#{suffix}", organization_id: org_id)
+ db[:apps].insert(guid: "app-#{suffix}", name: "app-#{suffix}", space_guid: "space-#{suffix}")
+ "app-#{suffix}"
+ end
+
+ def seed_task(suffix, state:, app_guid:, memory_in_mb: 256)
+ db[:tasks].insert(guid: "task-#{suffix}", name: "task-#{suffix}", command: 'bundle exec rake', state: state,
+ app_guid: app_guid, droplet_guid: "droplet-#{suffix}", memory_in_mb: memory_in_mb)
+ end
+
+ def seed_task_event(suffix, state:, task_guid:)
+ db[:app_usage_events].insert(guid: "event-#{suffix}", created_at: Time.now.utc, state: state,
+ instance_count: 1, memory_in_mb_per_instance: 1, app_guid: '', app_name: '',
+ space_guid: "space-#{suffix}", space_name: "space-#{suffix}", org_guid: "org-#{suffix}",
+ task_guid: task_guid, task_name: "task-#{suffix}")
+ end
+
+ describe 'up migration' do
+ it 'seeds TASK_WAS_RUNNING rows only for running tasks, skips finished ones, and preserves existing rows' do
+ app_guid = seed_app('main')
+
+ seed_task('running', state: 'RUNNING', app_guid: app_guid, memory_in_mb: 512)
+ seed_task('succeeded', state: 'SUCCEEDED', app_guid: app_guid)
+ seed_task('pending', state: 'PENDING', app_guid: app_guid)
+
+ # A running task that already has a TASK_WAS_RUNNING row -> not duplicated
+ seed_task('existing', state: 'RUNNING', app_guid: app_guid)
+ seed_task_event('existing', state: 'TASK_WAS_RUNNING', task_guid: 'task-existing')
+
+ # An unrelated pre-existing row that must be preserved (no truncate)
+ preexisting_id = seed_task_event('unrelated', state: 'TASK_STARTED', task_guid: 'some-other-task')
+
+ run_migration
+
+ task_was_running = db[:app_usage_events].where(state: 'TASK_WAS_RUNNING')
+ # One row for task-running, plus the pre-seeded task-existing row (not duplicated).
+ expect(task_was_running.count).to eq(2)
+ expect(task_was_running.where(task_guid: 'task-succeeded').count).to eq(0)
+ expect(task_was_running.where(task_guid: 'task-pending').count).to eq(0)
+ expect(task_was_running.where(task_guid: 'task-existing').count).to eq(1)
+ expect(db[:app_usage_events].where(id: preexisting_id).count).to eq(1)
+
+ row = task_was_running.where(task_guid: 'task-running').first
+ expect(row[:guid]).to be_present
+ expect(row[:previous_state]).to be_nil
+ expect(row[:task_name]).to eq('task-running')
+ expect(row[:app_guid]).to eq('')
+ expect(row[:app_name]).to eq('')
+ expect(row[:parent_app_guid]).to eq(app_guid)
+ expect(row[:parent_app_name]).to eq('app-main')
+ expect(row[:space_guid]).to eq('space-main')
+ expect(row[:space_name]).to eq('space-main')
+ expect(row[:org_guid]).to eq('org-main')
+ expect(row[:instance_count]).to eq(1)
+ expect(row[:previous_instance_count]).to eq(1)
+ expect(row[:memory_in_mb_per_instance]).to eq(512)
+ expect(row[:previous_memory_in_mb_per_instance]).to eq(512)
+ expect(row[:package_state]).to eq('STAGED')
+ expect(row[:previous_package_state]).to eq('STAGED')
+
+ # Idempotency of the seeding itself (the NOT EXISTS guard) is covered in
+ # spec/unit/lib/database/was_running_backfill_spec.rb, where seed_task_usage_events
+ # runs twice; the migrator does not re-apply an already-recorded migration.
+ end
+
+ context 'when skip_was_running_backfill is set' do
+ before do
+ allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_was_running_backfill).and_return(true)
+ end
+
+ it 'does not seed any TASK_WAS_RUNNING rows' do
+ app_guid = seed_app('main')
+ seed_task('skip', state: 'RUNNING', app_guid: app_guid)
+
+ run_migration
+
+ expect(db[:app_usage_events].where(state: 'TASK_WAS_RUNNING').count).to eq(0)
+ end
+ end
+ end
+
+ describe 'down migration' do
+ it 'removes only the TASK_WAS_RUNNING rows' do
+ app_guid = seed_app('main')
+ seed_task('down', state: 'RUNNING', app_guid: app_guid)
+ unrelated_id = seed_task_event('unrelated', state: 'TASK_STARTED', task_guid: 'some-other-task')
+
+ run_migration
+ expect(db[:app_usage_events].where(state: 'TASK_WAS_RUNNING').count).to eq(1)
+
+ revert_migration
+ expect(db[:app_usage_events].where(state: 'TASK_WAS_RUNNING').count).to eq(0)
+ expect(db[:app_usage_events].where(id: unrelated_id).count).to eq(1)
+ end
+ end
+end
diff --git a/spec/migrations/helpers/bigint_migration_step1_shared_context.rb b/spec/migrations/helpers/bigint_migration_step1_shared_context.rb
index 81b48f484a7..3b34a4af58b 100644
--- a/spec/migrations/helpers/bigint_migration_step1_shared_context.rb
+++ b/spec/migrations/helpers/bigint_migration_step1_shared_context.rb
@@ -10,6 +10,9 @@
let(:skip_bigint_id_migration) { nil }
before do
+ # Default so config reads from other migrations during the shared after-hook's forward run
+ # (e.g. the WAS_RUNNING backfill's skip? check) don't trip these strict argument matchers.
+ allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).and_call_original
allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_bigint_id_migration).and_return(skip_bigint_id_migration)
allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:migration_psql_concurrent_statement_timeout_in_seconds).and_return(300)
end
diff --git a/spec/migrations/helpers/bigint_migration_step3_shared_context.rb b/spec/migrations/helpers/bigint_migration_step3_shared_context.rb
index 2378c849909..5f9832d02bd 100644
--- a/spec/migrations/helpers/bigint_migration_step3_shared_context.rb
+++ b/spec/migrations/helpers/bigint_migration_step3_shared_context.rb
@@ -14,6 +14,9 @@
let(:logger) { double(:logger, info: nil) }
before do
+ # Default so config reads from other migrations during the shared after-hook's forward run
+ # (e.g. the WAS_RUNNING backfill's skip? check) don't trip these strict argument matchers.
+ allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).and_call_original
allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_bigint_id_migration).and_return(skip_bigint_id_migration)
allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:migration_psql_concurrent_statement_timeout_in_seconds).and_return(300)
end
@@ -119,6 +122,9 @@
let(:logger) { double(:logger, info: nil) }
before do
+ # Default so config reads from other migrations during the shared after-hook's forward run
+ # (e.g. the WAS_RUNNING backfill's skip? check) don't trip these strict argument matchers.
+ allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).and_call_original
allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_bigint_id_migration).and_return(skip_bigint_id_migration)
allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:migration_psql_concurrent_statement_timeout_in_seconds).and_return(300)
end
diff --git a/spec/unit/jobs/runtime/app_usage_events_cleanup_spec.rb b/spec/unit/jobs/runtime/app_usage_events_cleanup_spec.rb
index 8018ee0ee69..ce4f1df9956 100644
--- a/spec/unit/jobs/runtime/app_usage_events_cleanup_spec.rb
+++ b/spec/unit/jobs/runtime/app_usage_events_cleanup_spec.rb
@@ -5,7 +5,7 @@ module Jobs::Runtime
RSpec.describe AppUsageEventsCleanup, job_context: :worker do
let(:cutoff_age_in_days) { 30 }
let(:logger) { double(Steno::Logger, info: nil) }
- let!(:event_before_threshold) { AppUsageEvent.make(created_at: (cutoff_age_in_days + 1).days.ago) }
+ let!(:event_before_threshold) { AppUsageEvent.make(created_at: (cutoff_age_in_days + 1).days.ago, state: 'STOPPED') }
let!(:event_after_threshold) { AppUsageEvent.make(created_at: (cutoff_age_in_days - 1).days.ago) }
subject(:job) do
diff --git a/spec/unit/jobs/services/service_usage_events_cleanup_spec.rb b/spec/unit/jobs/services/service_usage_events_cleanup_spec.rb
index 181ad89e18d..da11beb39b5 100644
--- a/spec/unit/jobs/services/service_usage_events_cleanup_spec.rb
+++ b/spec/unit/jobs/services/service_usage_events_cleanup_spec.rb
@@ -5,7 +5,7 @@ module Jobs::Services
RSpec.describe ServiceUsageEventsCleanup, job_context: :worker do
let(:cutoff_age_in_days) { 30 }
let(:logger) { double(Steno::Logger, info: nil) }
- let!(:event_before_threshold) { ServiceUsageEvent.make(created_at: (cutoff_age_in_days + 1).days.ago) }
+ let!(:event_before_threshold) { ServiceUsageEvent.make(created_at: (cutoff_age_in_days + 1).days.ago, state: 'DELETED') }
let!(:event_after_threshold) { ServiceUsageEvent.make(created_at: (cutoff_age_in_days - 1).days.ago) }
subject(:job) do
diff --git a/spec/unit/lib/database/old_record_cleanup_spec.rb b/spec/unit/lib/database/old_record_cleanup_spec.rb
index 7b2258bbaec..167ec52e792 100644
--- a/spec/unit/lib/database/old_record_cleanup_spec.rb
+++ b/spec/unit/lib/database/old_record_cleanup_spec.rb
@@ -1,6 +1,148 @@
require 'spec_helper'
require 'database/old_record_cleanup'
+# Lifecycle-aware cleanup behavior that is identical for app and service usage
+# events. Expects the including context to define :model, :beginning_state,
+# :ending_state and :guid_column.
+RSpec.shared_examples 'usage event lifecycle cleanup' do
+ def make_event(state, guid, created_at:)
+ model.make(state: state, created_at: created_at, guid_column => guid)
+ end
+
+ def run_cleanup(**opts)
+ Database::OldRecordCleanup.new(model, cutoff_age_in_days: 1, keep_running_records: true, **opts).delete
+ end
+
+ it 'keeps an old beginning record when there is no corresponding ending record' do
+ old_beginning = make_event(beginning_state, 'guid1', created_at: 2.days.ago)
+
+ run_cleanup
+
+ expect(old_beginning.reload).to be_present
+ expect(model.count).to eq(1)
+ end
+
+ it 'keeps an old beginning record when the ending record is fresh' do
+ old_beginning = make_event(beginning_state, 'guid1', created_at: 2.days.ago)
+ fresh_ending = make_event(ending_state, 'guid1', created_at: 1.day.ago + 1.minute)
+
+ run_cleanup
+
+ expect(old_beginning.reload).to be_present
+ expect(fresh_ending.reload).to be_present
+ end
+
+ it 'keeps an old beginning record when the ending record was inserted first' do
+ old_ending = make_event(ending_state, 'guid1', created_at: 3.days.ago)
+ old_beginning = make_event(beginning_state, 'guid1', created_at: 2.days.ago)
+
+ run_cleanup
+
+ expect(old_beginning.reload).to be_present
+ expect { old_ending.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ it 'uses insertion order rather than created_at to pair beginnings with endings' do
+ old_beginning = make_event(beginning_state, 'guid1', created_at: 2.days.ago)
+ # Earlier timestamp but higher id: the resource is no longer running.
+ old_ending = make_event(ending_state, 'guid1', created_at: 3.days.ago)
+
+ run_cleanup
+
+ expect { old_beginning.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { old_ending.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ it 'deletes all records of completed runs spanning multiple cycles' do
+ cycle1_beginning = make_event(beginning_state, 'guid1', created_at: 10.days.ago)
+ cycle1_ending = make_event(ending_state, 'guid1', created_at: 9.days.ago)
+ cycle2_beginning = make_event(beginning_state, 'guid1', created_at: 8.days.ago)
+ cycle2_ending = make_event(ending_state, 'guid1', created_at: 7.days.ago)
+
+ run_cleanup
+
+ expect { cycle1_beginning.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { cycle1_ending.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { cycle2_beginning.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { cycle2_ending.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ it 'deletes an old ending record that has no beginning record' do
+ orphan_ending = make_event(ending_state, 'guid1', created_at: 10.days.ago)
+
+ run_cleanup
+
+ expect { orphan_ending.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ it 'keeps an old WAS_RUNNING record when there is no corresponding ending record' do
+ was_running = make_event('WAS_RUNNING', 'guid1', created_at: 2.days.ago)
+
+ run_cleanup
+
+ expect(was_running.reload).to be_present
+ expect(model.count).to eq(1)
+ end
+
+ it 'deletes an old WAS_RUNNING record when a later old ending record exists' do
+ was_running = make_event('WAS_RUNNING', 'guid1', created_at: 5.days.ago)
+ old_ending = make_event(ending_state, 'guid1', created_at: 4.days.ago)
+
+ run_cleanup
+
+ expect { was_running.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { old_ending.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ it 'keeps both the first beginning and a later WAS_RUNNING record of a running resource' do
+ old_beginning = make_event(beginning_state, 'guid1', created_at: 10.days.ago)
+ was_running = make_event('WAS_RUNNING', 'guid1', created_at: 5.days.ago)
+
+ run_cleanup
+
+ expect(old_beginning.reload).to be_present
+ expect(was_running.reload).to be_present
+ end
+
+ it 'prunes superseded baselines of a running resource, keeping the first beginning (true start) and the latest one (current footprint)' do
+ first = make_event(beginning_state, 'guid1', created_at: 5.days.ago)
+ middle = make_event(beginning_state, 'guid1', created_at: 4.days.ago)
+ latest = make_event(beginning_state, 'guid1', created_at: 3.days.ago)
+
+ run_cleanup
+
+ expect(first.reload).to be_present
+ expect { middle.reload }.to raise_error(Sequel::NoExistingObject)
+ expect(latest.reload).to be_present
+ end
+
+ it 'does not prune a superseded baseline until the beginning that supersedes it is itself old' do
+ first = make_event(beginning_state, 'guid1', created_at: 5.days.ago)
+ middle = make_event(beginning_state, 'guid1', created_at: 4.days.ago)
+ fresh_latest = make_event(beginning_state, 'guid1', created_at: 1.day.ago + 1.minute)
+
+ run_cleanup
+
+ expect(first.reload).to be_present
+ expect(middle.reload).to be_present
+ expect(fresh_latest.reload).to be_present
+ end
+
+ it 'treats the first beginning after an ended run as the true start, not as a superseded baseline' do
+ ended_run_beginning = make_event(beginning_state, 'guid1', created_at: 10.days.ago)
+ ended_run_ending = make_event(ending_state, 'guid1', created_at: 9.days.ago)
+ current_run_first = make_event(beginning_state, 'guid1', created_at: 8.days.ago)
+ current_run_latest = make_event(beginning_state, 'guid1', created_at: 7.days.ago)
+
+ run_cleanup
+
+ expect { ended_run_beginning.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { ended_run_ending.reload }.to raise_error(Sequel::NoExistingObject)
+ expect(current_run_first.reload).to be_present
+ expect(current_run_latest.reload).to be_present
+ end
+end
+
RSpec.describe Database::OldRecordCleanup do
describe '#delete' do
let!(:stale_event1) { VCAP::CloudController::Event.make(created_at: 1.day.ago - 1.minute) }
@@ -9,7 +151,7 @@
let!(:fresh_event) { VCAP::CloudController::Event.make(created_at: 1.day.ago + 1.minute) }
it 'deletes records older than specified days' do
- record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, 1)
+ record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, cutoff_age_in_days: 1)
expect do
record_cleanup.delete
@@ -20,23 +162,14 @@
expect { stale_event2.reload }.to raise_error(Sequel::NoExistingObject)
end
- context "when there are no records at all but you're trying to keep at least one" do
- it "doesn't keep one because there aren't any to keep" do
- record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::ServiceUsageEvent, 1, keep_at_least_one_record: true)
-
- expect { record_cleanup.delete }.not_to raise_error
- expect(VCAP::CloudController::ServiceUsageEvent.count).to eq(0)
- end
- end
-
it 'only retrieves the current timestamp from the database once' do
expect(VCAP::CloudController::Event.db).to receive(:fetch).with('SELECT CURRENT_TIMESTAMP as now').once.and_call_original
- record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, 1)
+ record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, cutoff_age_in_days: 1)
record_cleanup.delete
end
it 'keeps the last row when :keep_at_least_one_record is true even if it is older than the cutoff date' do
- record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, 0, keep_at_least_one_record: true)
+ record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, cutoff_age_in_days: 0, keep_at_least_one_record: true)
expect do
record_cleanup.delete
@@ -46,5 +179,235 @@
expect { stale_event1.reload }.to raise_error(Sequel::NoExistingObject)
expect { stale_event2.reload }.to raise_error(Sequel::NoExistingObject)
end
+
+ context "when there are no records at all but you're trying to keep at least one" do
+ it "doesn't keep one because there aren't any to keep" do
+ record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::ServiceUsageEvent, cutoff_age_in_days: 1, keep_at_least_one_record: true)
+
+ expect { record_cleanup.delete }.not_to raise_error
+ expect(VCAP::CloudController::ServiceUsageEvent.count).to eq(0)
+ end
+ end
+
+ context 'when keep_running_records is requested for a model without usage lifecycles' do
+ it 'raises rather than silently deleting the records of running resources' do
+ record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, cutoff_age_in_days: 1, keep_running_records: true)
+
+ expect { record_cleanup.delete }.to raise_error(ArgumentError, /usage_lifecycles/)
+ end
+ end
+
+ describe 'keeping running AppUsageEvent records' do
+ let(:model) { VCAP::CloudController::AppUsageEvent }
+ let(:beginning_state) { 'STARTED' }
+ let(:ending_state) { 'STOPPED' }
+ let(:guid_column) { :app_guid }
+
+ include_examples 'usage event lifecycle cleanup'
+
+ describe 'task lifecycle' do
+ it 'keeps the TASK_STARTED record of a still-running task' do
+ task_started = model.make(created_at: 2.days.ago, state: 'TASK_STARTED', task_guid: 'task1', app_guid: 'app1')
+
+ run_cleanup
+
+ expect(task_started.reload).to be_present
+ end
+
+ it 'deletes the TASK_STARTED and TASK_STOPPED records of a completed task' do
+ task_started = model.make(created_at: 5.days.ago, state: 'TASK_STARTED', task_guid: 'task1', app_guid: 'app1')
+ task_stopped = model.make(created_at: 4.days.ago, state: 'TASK_STOPPED', task_guid: 'task1', app_guid: 'app1')
+
+ run_cleanup
+
+ expect { task_started.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { task_stopped.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ it 'keeps the TASK_STARTED record when the TASK_STOPPED record is fresh' do
+ task_started = model.make(created_at: 2.days.ago, state: 'TASK_STARTED', task_guid: 'task1', app_guid: 'app1')
+ fresh_task_stopped = model.make(created_at: 1.day.ago + 1.minute, state: 'TASK_STOPPED', task_guid: 'task1', app_guid: 'app1')
+
+ run_cleanup
+
+ expect(task_started.reload).to be_present
+ expect(fresh_task_stopped.reload).to be_present
+ end
+
+ it 'correlates task records by task_guid, not by app_guid' do
+ running_task_started = model.make(created_at: 5.days.ago, state: 'TASK_STARTED', task_guid: 'task1', app_guid: 'app1')
+ other_task_stopped = model.make(created_at: 4.days.ago, state: 'TASK_STOPPED', task_guid: 'task2', app_guid: 'app1')
+
+ run_cleanup
+
+ expect(running_task_started.reload).to be_present
+ expect { other_task_stopped.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ it 'keeps the TASK_WAS_RUNNING baseline of a still-running task' do
+ baseline = model.make(created_at: 2.days.ago, state: 'TASK_WAS_RUNNING', task_guid: 'task1', app_guid: '')
+
+ run_cleanup
+
+ expect(baseline.reload).to be_present
+ end
+
+ it 'deletes the TASK_WAS_RUNNING baseline once a later TASK_STOPPED record is also old' do
+ baseline = model.make(created_at: 5.days.ago, state: 'TASK_WAS_RUNNING', task_guid: 'task1', app_guid: '')
+ task_stopped = model.make(created_at: 4.days.ago, state: 'TASK_STOPPED', task_guid: 'task1', app_guid: '')
+
+ run_cleanup
+
+ expect { baseline.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { task_stopped.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ # Task events carry an empty app_guid, so if task baselines shared the
+ # WAS_RUNNING state they would all correlate with each other through the
+ # app lifecycle (keyed by app_guid) and be wrongly pruned as superseded
+ # baselines of one phantom app. The distinct TASK_WAS_RUNNING state keeps
+ # the app lifecycle blind to them.
+ it 'does not prune the baselines of distinct running tasks as superseded baselines of one phantom app' do
+ baselines = %w[task1 task2 task3].each_with_index.map do |task_guid, i|
+ model.make(created_at: (5 - i).days.ago, state: 'TASK_WAS_RUNNING', task_guid: task_guid, app_guid: '')
+ end
+
+ run_cleanup
+
+ baselines.each { |baseline| expect(baseline.reload).to be_present }
+ end
+ end
+
+ it 'deletes records with non-lifecycle states' do
+ buildpack_event1 = model.make(created_at: 3.days.ago, state: 'BUILDPACK_SET', app_guid: 'app1')
+ buildpack_event2 = model.make(created_at: 2.days.ago, state: 'BUILDPACK_SET', app_guid: 'app2')
+
+ run_cleanup
+
+ expect { buildpack_event1.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { buildpack_event2.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ it 'deletes old records with a corresponding stop record even if app_guid is an empty string' do
+ empty_guid_start = model.make(created_at: 5.days.ago, state: 'STARTED', app_guid: '')
+ different_empty_start = model.make(created_at: 4.days.ago, state: 'STARTED', app_guid: '')
+ empty_guid_stop = model.make(created_at: 3.days.ago, state: 'STOPPED', app_guid: '')
+
+ run_cleanup
+
+ # Both STARTs with an empty-string guid have a STOP with an empty-string guid after them.
+ expect { empty_guid_start.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { different_empty_start.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { empty_guid_stop.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ it 'works when cutoff_age_in_days is 0' do
+ old_start = model.make(created_at: 1.second.ago, state: 'STARTED', app_guid: 'running-app')
+
+ Database::OldRecordCleanup.new(model, cutoff_age_in_days: 0, keep_running_records: true).delete
+
+ expect(old_start.reload).to be_present
+ end
+
+ it 'does not error if the table is empty' do
+ model.dataset.delete
+
+ expect { run_cleanup }.not_to raise_error
+ end
+
+ it 'deletes all old records when keep_running_records is false' do
+ old_start = model.make(created_at: 5.days.ago, state: 'STARTED', app_guid: 'app1')
+ old_stop = model.make(created_at: 4.days.ago, state: 'STOPPED', app_guid: 'app1')
+ old_running_start = model.make(created_at: 3.days.ago, state: 'STARTED', app_guid: 'running-app')
+
+ Database::OldRecordCleanup.new(model, cutoff_age_in_days: 1, keep_running_records: false).delete
+
+ expect { old_start.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { old_stop.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { old_running_start.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+
+ it 'keep_at_least_one_record preserves the last record while still pruning its paired START' do
+ old_start = model.make(created_at: 10.days.ago, state: 'STARTED', app_guid: 'app1')
+ last_stop = model.make(created_at: 9.days.ago, state: 'STOPPED', app_guid: 'app1')
+
+ run_cleanup(keep_at_least_one_record: true)
+
+ expect { old_start.reload }.to raise_error(Sequel::NoExistingObject) # paired with STOP, prunable
+ expect(last_stop.reload).to be_present # kept by keep_at_least_one_record
+ end
+
+ it 'prunes a paired set larger than the delete batch size while keeping running records' do
+ old = 5.days.ago
+
+ # 1,500 completed START/STOP pairs (3,000 rows) -> well above the 1,000-row
+ # batch size, so the delete spans multiple batches across the two passes.
+ paired_rows = []
+ 1_500.times do |i|
+ guid = "paired-#{i}"
+ common = { app_name: guid, space_guid: 'sp', space_name: 'sp', org_guid: 'o',
+ instance_count: 1, memory_in_mb_per_instance: 1, created_at: old }
+ paired_rows << common.merge(guid: "start-#{i}", state: 'STARTED', app_guid: guid)
+ paired_rows << common.merge(guid: "stop-#{i}", state: 'STOPPED', app_guid: guid)
+ end
+ model.dataset.multi_insert(paired_rows)
+
+ # Still-running apps (START with no later STOP) that must survive cleanup.
+ running = Array.new(50) do |i|
+ model.make(created_at: old, state: 'STARTED', app_guid: "running-#{i}")
+ end
+
+ run_cleanup
+
+ # Every completed pair is gone; only the running records remain.
+ running.each { |event| expect(event.reload).to be_present }
+ expect(model.count).to eq(running.size)
+ end
+ end
+
+ describe 'keeping running ServiceUsageEvent records' do
+ let(:model) { VCAP::CloudController::ServiceUsageEvent }
+ let(:beginning_state) { 'CREATED' }
+ let(:ending_state) { 'DELETED' }
+ let(:guid_column) { :service_instance_guid }
+
+ include_examples 'usage event lifecycle cleanup'
+
+ describe 'UPDATED records' do
+ it 'keeps the CREATED record and the latest UPDATED record while the service instance exists, pruning superseded UPDATED records' do
+ stale_created = model.make(created_at: 10.days.ago, state: 'CREATED', service_instance_guid: 'guid1')
+ superseded_update = model.make(created_at: 8.days.ago, state: 'UPDATED', service_instance_guid: 'guid1')
+ latest_update = model.make(created_at: 6.days.ago, state: 'UPDATED', service_instance_guid: 'guid1')
+
+ run_cleanup
+
+ expect(stale_created.reload).to be_present
+ expect { superseded_update.reload }.to raise_error(Sequel::NoExistingObject)
+ expect(latest_update.reload).to be_present
+ end
+
+ it 'keeps UPDATED records when the corresponding delete record is fresh' do
+ stale_updated = model.make(created_at: 2.days.ago, state: 'UPDATED', service_instance_guid: 'guid1')
+ fresh_delete = model.make(created_at: 1.day.ago + 1.minute, state: 'DELETED', service_instance_guid: 'guid1')
+
+ run_cleanup
+
+ expect(stale_updated.reload).to be_present
+ expect(fresh_delete.reload).to be_present
+ end
+
+ it 'deletes UPDATED records when there is a corresponding old delete record' do
+ stale_created = model.make(created_at: 10.days.ago, state: 'CREATED', service_instance_guid: 'guid1')
+ stale_updated = model.make(created_at: 8.days.ago, state: 'UPDATED', service_instance_guid: 'guid1')
+ stale_delete = model.make(created_at: 6.days.ago, state: 'DELETED', service_instance_guid: 'guid1')
+
+ run_cleanup
+
+ expect { stale_created.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { stale_updated.reload }.to raise_error(Sequel::NoExistingObject)
+ expect { stale_delete.reload }.to raise_error(Sequel::NoExistingObject)
+ end
+ end
+ end
end
end
diff --git a/spec/unit/lib/database/was_running_backfill_spec.rb b/spec/unit/lib/database/was_running_backfill_spec.rb
new file mode 100644
index 00000000000..baa016b0295
--- /dev/null
+++ b/spec/unit/lib/database/was_running_backfill_spec.rb
@@ -0,0 +1,247 @@
+require 'spec_helper'
+require 'database/was_running_backfill'
+
+RSpec.describe VCAP::WasRunningBackfill do
+ let(:db) { Sequel::Model.db }
+ let(:logger) { double(Steno::Logger, info: nil) }
+
+ def was_running
+ db[:service_usage_events].where(state: 'WAS_RUNNING')
+ end
+
+ def app_was_running
+ db[:app_usage_events].where(state: 'WAS_RUNNING')
+ end
+
+ def task_was_running
+ db[:app_usage_events].where(state: 'TASK_WAS_RUNNING')
+ end
+
+ describe 'WAS_RUNNING' do
+ # The backfill is raw SQL (no CC code), so it can't reference the repository
+ # constants directly. This guard catches any drift between the literal the
+ # backfill writes and the state value the repositories/cleanup recognise.
+ it 'matches the state value used by the usage event repositories' do
+ expect(described_class::WAS_RUNNING).to eq(VCAP::CloudController::Repositories::AppUsageEventRepository::WAS_RUNNING_EVENT_STATE)
+ expect(described_class::WAS_RUNNING).to eq(VCAP::CloudController::Repositories::ServiceUsageEventRepository::WAS_RUNNING_EVENT_STATE)
+ expect(described_class::TASK_WAS_RUNNING).to eq(VCAP::CloudController::Repositories::AppUsageEventRepository::TASK_WAS_RUNNING_EVENT_STATE)
+ end
+ end
+
+ describe '.skip?' do
+ let(:skip_was_running_backfill) { nil }
+
+ before do
+ allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_was_running_backfill).and_return(skip_was_running_backfill)
+ end
+
+ context 'when skip_was_running_backfill is false' do
+ let(:skip_was_running_backfill) { false }
+
+ it 'returns false' do
+ expect(described_class.skip?).to be(false)
+ end
+ end
+
+ context 'when skip_was_running_backfill is true' do
+ let(:skip_was_running_backfill) { true }
+
+ it 'returns true' do
+ expect(described_class.skip?).to be(true)
+ end
+ end
+
+ context 'when skip_was_running_backfill is nil' do
+ let(:skip_was_running_backfill) { nil }
+
+ it 'returns false' do
+ expect(described_class.skip?).to be(false)
+ end
+ end
+
+ context 'when reading the config raises InvalidConfigPath' do
+ before do
+ allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_was_running_backfill).and_raise(VCAP::CloudController::Config::InvalidConfigPath)
+ end
+
+ it 'returns false rather than aborting the migration' do
+ expect(described_class.skip?).to be(false)
+ end
+ end
+ end
+
+ describe '.seed_app_usage_events' do
+ it 'seeds one WAS_RUNNING row per started process across batches, idempotently, skipping stopped processes' do
+ started1 = VCAP::CloudController::ProcessModelFactory.make(state: 'STARTED')
+ started2 = VCAP::CloudController::ProcessModelFactory.make(state: 'STARTED')
+ VCAP::CloudController::ProcessModelFactory.make(state: 'STOPPED')
+
+ # batch_size: 1 forces the keyset loop to iterate once per process.
+ described_class.seed_app_usage_events(db, logger, batch_size: 1)
+
+ expect(app_was_running.select_map(:app_guid)).to contain_exactly(started1.guid, started2.guid)
+ expect { described_class.seed_app_usage_events(db, logger, batch_size: 1) }.not_to change(app_was_running, :count)
+ end
+
+ it 'seeds a separate row per process when an app has multiple started processes' do
+ app = VCAP::CloudController::AppModel.make
+ web = VCAP::CloudController::ProcessModelFactory.make(app: app, type: 'web', state: 'STARTED')
+ worker = VCAP::CloudController::ProcessModelFactory.make(app: app, type: 'worker', state: 'STARTED')
+
+ described_class.seed_app_usage_events(db, logger, batch_size: 1)
+
+ scope = app_was_running.where(parent_app_guid: app.guid)
+ expect(scope.select_map(:app_guid)).to contain_exactly(web.guid, worker.guid)
+ expect(scope.select_map(:process_type)).to contain_exactly('web', 'worker')
+ end
+
+ it 'tolerates legacy NULLs in nullable process and app columns' do
+ process = VCAP::CloudController::ProcessModelFactory.make(state: 'STARTED')
+ # Bypass the model layer, which would backfill these defaults.
+ db[:processes].where(guid: process.guid).update(memory: nil, instances: nil)
+
+ described_class.seed_app_usage_events(db, logger)
+
+ row = app_was_running.first(app_guid: process.guid)
+ expect(row[:memory_in_mb_per_instance]).to eq(0)
+ expect(row[:instance_count]).to eq(0)
+ end
+
+ it 'sweeps WAS_RUNNING rows whose process is not running, e.g. apps stopped concurrently with the backfill' do
+ running = VCAP::CloudController::ProcessModelFactory.make(state: 'STARTED')
+ stopped = VCAP::CloudController::ProcessModelFactory.make(state: 'STOPPED')
+ VCAP::CloudController::AppUsageEvent.make(state: 'WAS_RUNNING', app_guid: stopped.guid)
+ VCAP::CloudController::AppUsageEvent.make(state: 'WAS_RUNNING', app_guid: 'no-such-process')
+
+ described_class.seed_app_usage_events(db, logger)
+
+ expect(app_was_running.select_map(:app_guid)).to contain_exactly(running.guid)
+ end
+ end
+
+ describe '.delete_app_usage_events' do
+ it 'batch-deletes only WAS_RUNNING rows' do
+ VCAP::CloudController::ProcessModelFactory.make(state: 'STARTED')
+ described_class.seed_app_usage_events(db, logger, batch_size: 1)
+ started = VCAP::CloudController::AppUsageEvent.make(state: 'STARTED')
+
+ described_class.delete_app_usage_events(db, batch_size: 1)
+
+ expect(app_was_running.count).to eq(0)
+ expect(db[:app_usage_events].where(guid: started.guid).count).to eq(1)
+ end
+ end
+
+ describe '.seed_task_usage_events' do
+ it 'seeds one TASK_WAS_RUNNING row per running task across batches, idempotently, skipping completed tasks' do
+ running1 = VCAP::CloudController::TaskModel.make(state: 'RUNNING', memory_in_mb: 256)
+ running2 = VCAP::CloudController::TaskModel.make(state: 'RUNNING')
+ VCAP::CloudController::TaskModel.make(state: 'SUCCEEDED')
+
+ # batch_size: 1 forces the keyset loop to iterate once per task.
+ described_class.seed_task_usage_events(db, logger, batch_size: 1)
+
+ expect(task_was_running.select_map(:task_guid)).to contain_exactly(running1.guid, running2.guid)
+
+ row = task_was_running.first(task_guid: running1.guid)
+ expect(row[:previous_state]).to be_nil
+ expect(row[:task_name]).to eq(running1.name)
+ expect(row[:app_guid]).to eq('')
+ expect(row[:app_name]).to eq('')
+ expect(row[:parent_app_guid]).to eq(running1.app.guid)
+ expect(row[:parent_app_name]).to eq(running1.app.name)
+ expect(row[:instance_count]).to eq(1)
+ expect(row[:memory_in_mb_per_instance]).to eq(256)
+ expect(row[:previous_memory_in_mb_per_instance]).to eq(256)
+ expect(row[:package_state]).to eq('STAGED')
+ expect(row[:previous_package_state]).to eq('STAGED')
+ expect(row[:space_guid]).to eq(running1.space.guid)
+ expect(row[:space_name]).to eq(running1.space.name)
+ expect(row[:org_guid]).to eq(running1.space.organization.guid)
+
+ expect { described_class.seed_task_usage_events(db, logger, batch_size: 1) }.not_to change(task_was_running, :count)
+ end
+
+ it 'tolerates a legacy NULL task memory' do
+ task = VCAP::CloudController::TaskModel.make(state: 'RUNNING')
+ # Bypass the model layer, which would backfill the default.
+ db[:tasks].where(guid: task.guid).update(memory_in_mb: nil)
+
+ described_class.seed_task_usage_events(db, logger)
+
+ row = task_was_running.first(task_guid: task.guid)
+ expect(row[:memory_in_mb_per_instance]).to eq(0)
+ end
+
+ it 'sweeps TASK_WAS_RUNNING rows whose task is no longer running, e.g. tasks completed concurrently with the backfill' do
+ running = VCAP::CloudController::TaskModel.make(state: 'RUNNING')
+ completed = VCAP::CloudController::TaskModel.make(state: 'SUCCEEDED')
+ VCAP::CloudController::AppUsageEvent.make(state: 'TASK_WAS_RUNNING', task_guid: completed.guid)
+ VCAP::CloudController::AppUsageEvent.make(state: 'TASK_WAS_RUNNING', task_guid: 'no-such-task')
+
+ described_class.seed_task_usage_events(db, logger)
+
+ expect(task_was_running.select_map(:task_guid)).to contain_exactly(running.guid)
+ end
+ end
+
+ describe '.delete_task_usage_events' do
+ it 'batch-deletes only TASK_WAS_RUNNING rows' do
+ task = VCAP::CloudController::TaskModel.make(state: 'RUNNING')
+ described_class.seed_task_usage_events(db, logger, batch_size: 1)
+ task_started = VCAP::CloudController::AppUsageEvent.make(state: 'TASK_STARTED', task_guid: task.guid)
+ app_baseline = VCAP::CloudController::AppUsageEvent.make(state: 'WAS_RUNNING', app_guid: 'some-process')
+
+ described_class.delete_task_usage_events(db, batch_size: 1)
+
+ expect(task_was_running.count).to eq(0)
+ expect(db[:app_usage_events].where(guid: task_started.guid).count).to eq(1)
+ expect(db[:app_usage_events].where(guid: app_baseline.guid).count).to eq(1)
+ end
+ end
+
+ describe '.seed_service_usage_events' do
+ it 'seeds one WAS_RUNNING row per instance across multiple batches, with the right type, idempotently' do
+ managed = VCAP::CloudController::ManagedServiceInstance.make
+ upsi = VCAP::CloudController::UserProvidedServiceInstance.make
+
+ # batch_size: 1 forces the keyset loop to iterate once per instance.
+ described_class.seed_service_usage_events(db, logger, batch_size: 1)
+
+ expect(was_running.select_map(:service_instance_guid)).to contain_exactly(managed.guid, upsi.guid)
+
+ managed_row = was_running.first(service_instance_guid: managed.guid)
+ expect(managed_row[:service_instance_type]).to eq('managed_service_instance')
+ expect(managed_row[:service_plan_guid]).to eq(managed.service_plan.guid)
+ expect(managed_row[:service_broker_name]).to eq(managed.service_plan.service.service_broker.name)
+
+ upsi_row = was_running.first(service_instance_guid: upsi.guid)
+ expect(upsi_row[:service_instance_type]).to eq('user_provided_service_instance')
+ expect(upsi_row[:service_plan_guid]).to be_nil
+
+ expect { described_class.seed_service_usage_events(db, logger, batch_size: 1) }.not_to change(was_running, :count)
+ end
+
+ it 'sweeps WAS_RUNNING rows whose service instance no longer exists, e.g. instances deleted concurrently with the backfill' do
+ instance = VCAP::CloudController::ManagedServiceInstance.make
+ VCAP::CloudController::ServiceUsageEvent.make(state: 'WAS_RUNNING', service_instance_guid: 'no-such-instance')
+
+ described_class.seed_service_usage_events(db, logger)
+
+ expect(was_running.select_map(:service_instance_guid)).to contain_exactly(instance.guid)
+ end
+ end
+
+ describe '.delete_service_usage_events' do
+ it 'batch-deletes only WAS_RUNNING rows' do
+ instance = VCAP::CloudController::ManagedServiceInstance.make
+ described_class.seed_service_usage_events(db, logger, batch_size: 1)
+ created = VCAP::CloudController::ServiceUsageEvent.make(state: 'CREATED', service_instance_guid: instance.guid)
+
+ described_class.delete_service_usage_events(db, batch_size: 1)
+
+ expect(was_running.count).to eq(0)
+ expect(db[:service_usage_events].where(guid: created.guid).count).to eq(1)
+ end
+ end
+end
diff --git a/spec/unit/models/runtime/task_model_spec.rb b/spec/unit/models/runtime/task_model_spec.rb
index fe1e5790c45..b7c07975404 100644
--- a/spec/unit/models/runtime/task_model_spec.rb
+++ b/spec/unit/models/runtime/task_model_spec.rb
@@ -28,6 +28,30 @@ module VCAP::CloudController
expect(event.task_guid).to eq(task.guid)
expect(event.parent_app_guid).to eq(task.app.guid)
end
+
+ context 'when the TASK_STARTED event has been pruned and a TASK_WAS_RUNNING baseline exists' do
+ let!(:start_event) { AppUsageEvent.make(task_guid: task.guid, state: 'TASK_WAS_RUNNING') }
+
+ it 'still creates a TASK_STOPPED event' do
+ task.update(state: TaskModel::SUCCEEDED_STATE)
+
+ event = AppUsageEvent.find(task_guid: task.guid, state: 'TASK_STOPPED')
+ expect(event).not_to be_nil
+ expect(event.task_guid).to eq(task.guid)
+ expect(event.parent_app_guid).to eq(task.app.guid)
+ end
+ end
+
+ context 'when there is neither a TASK_STARTED event nor a TASK_WAS_RUNNING baseline' do
+ let!(:start_event) { nil }
+
+ it 'does not create a TASK_STOPPED event no consumer could match to a start' do
+ task.update(state: TaskModel::SUCCEEDED_STATE)
+
+ event = AppUsageEvent.find(task_guid: task.guid, state: 'TASK_STOPPED')
+ expect(event).to be_nil
+ end
+ end
end
context 'when the task is moving to the FAILED_STATE' do
@@ -43,6 +67,28 @@ module VCAP::CloudController
expect(event.task_guid).to eq(task.guid)
expect(event.parent_app_guid).to eq(task.app.guid)
end
+
+ context 'when the TASK_STARTED event has been pruned and a TASK_WAS_RUNNING baseline exists' do
+ let!(:start_event) { AppUsageEvent.make(task_guid: task.guid, state: 'TASK_WAS_RUNNING') }
+
+ it 'still creates a TASK_STOPPED event' do
+ task.update(state: TaskModel::FAILED_STATE)
+
+ event = AppUsageEvent.find(task_guid: task.guid, state: 'TASK_STOPPED')
+ expect(event).not_to be_nil
+ end
+ end
+
+ context 'when there is neither a TASK_STARTED event nor a TASK_WAS_RUNNING baseline' do
+ let!(:start_event) { nil }
+
+ it 'does not create a TASK_STOPPED event no consumer could match to a start' do
+ task.update(state: TaskModel::FAILED_STATE)
+
+ event = AppUsageEvent.find(task_guid: task.guid, state: 'TASK_STOPPED')
+ expect(event).to be_nil
+ end
+ end
end
context 'when the task is moving from the PENDING state' do
|