diff --git a/app/jobs/runtime/events_cleanup.rb b/app/jobs/runtime/events_cleanup.rb index a1841576395..873c6a7f319 100644 --- a/app/jobs/runtime/events_cleanup.rb +++ b/app/jobs/runtime/events_cleanup.rb @@ -9,7 +9,7 @@ def initialize(cutoff_age_in_days) end def perform - Database::OldRecordCleanup.new(Event, cutoff_age_in_days).delete + Database::OldRecordCleanup.new(Event, cutoff_age_in_days:).delete end def job_name_in_configuration diff --git a/app/models/runtime/app_usage_event.rb b/app/models/runtime/app_usage_event.rb index 32913f8cdda..e8c28ba7e74 100644 --- a/app/models/runtime/app_usage_event.rb +++ b/app/models/runtime/app_usage_event.rb @@ -9,6 +9,23 @@ class AppUsageEvent < Sequel::Model :buildpack_guid, :buildpack_name, :package_state, :previous_package_state, :parent_app_guid, :parent_app_name, :process_type, :task_name, :task_guid + + def self.usage_lifecycles + [ + { + beginning_states: [ProcessModel::STARTED, Repositories::AppUsageEventRepository::WAS_RUNNING_EVENT_STATE], + ending_state: ProcessModel::STOPPED, + guid_column: :app_guid + }, + { + beginning_states: [Repositories::AppUsageEventRepository::TASK_STARTED_EVENT_STATE, + Repositories::AppUsageEventRepository::TASK_WAS_RUNNING_EVENT_STATE], + ending_state: Repositories::AppUsageEventRepository::TASK_STOPPED_EVENT_STATE, + guid_column: :task_guid + } + ].freeze + end + AppUsageEvent.dataset_module do def supports_window_functions? false diff --git a/app/models/runtime/task_model.rb b/app/models/runtime/task_model.rb index bad22c00eef..576513c5f63 100644 --- a/app/models/runtime/task_model.rb +++ b/app/models/runtime/task_model.rb @@ -137,9 +137,16 @@ def create_start_event def create_stop_event_if_needed app_usage_repo = Repositories::AppUsageEventRepository.new - start_event = app_usage_repo.find_by_task_and_state(task: self, state: 'TASK_STARTED') - existing_stop_event = app_usage_repo.find_by_task_and_state(task: self, state: 'TASK_STOPPED') - return if start_event.nil? || existing_stop_event.present? + return if app_usage_repo.find_by_task_and_state(task: self, state: 'TASK_STOPPED').present? + + # Record the stop only when there is recorded evidence that the task + # started: the TASK_STARTED event, or the TASK_WAS_RUNNING baseline seeded + # for tasks that were already running when the keep-running cleanup was + # introduced. Without either, no consumer ever saw the task start, so a + # stop event would be unmatched noise. + started = app_usage_repo.find_by_task_and_state(task: self, state: 'TASK_STARTED') || + app_usage_repo.find_by_task_and_state(task: self, state: 'TASK_WAS_RUNNING') + return if started.nil? create_stop_event end diff --git a/app/models/services/service_usage_event.rb b/app/models/services/service_usage_event.rb index f5e694169f9..c09557870d1 100644 --- a/app/models/services/service_usage_event.rb +++ b/app/models/services/service_usage_event.rb @@ -7,5 +7,17 @@ class ServiceUsageEvent < Sequel::Model :service_plan_guid, :service_plan_name, :service_guid, :service_label, :service_broker_name, :service_broker_guid + + def self.usage_lifecycles + [ + { + beginning_states: [Repositories::ServiceUsageEventRepository::CREATED_EVENT_STATE, + Repositories::ServiceUsageEventRepository::UPDATED_EVENT_STATE, + Repositories::ServiceUsageEventRepository::WAS_RUNNING_EVENT_STATE], + ending_state: Repositories::ServiceUsageEventRepository::DELETED_EVENT_STATE, + guid_column: :service_instance_guid + } + ].freeze + end end end diff --git a/app/repositories/app_usage_event_repository.rb b/app/repositories/app_usage_event_repository.rb index 51d751a8cd1..bce322d530a 100644 --- a/app/repositories/app_usage_event_repository.rb +++ b/app/repositories/app_usage_event_repository.rb @@ -4,6 +4,16 @@ module VCAP::CloudController module Repositories class AppUsageEventRepository + WAS_RUNNING_EVENT_STATE = 'WAS_RUNNING'.freeze + TASK_STARTED_EVENT_STATE = 'TASK_STARTED'.freeze + TASK_STOPPED_EVENT_STATE = 'TASK_STOPPED'.freeze + # Task baselines get their own state (rather than reusing WAS_RUNNING) + # because task events share the app_usage_events table with app events but + # carry an empty app_guid: a WAS_RUNNING row keyed by app_guid '' would be + # correlated with every other task's baseline by the app lifecycle's + # cleanup and swept by the app backfill's stale-row sweep. + TASK_WAS_RUNNING_EVENT_STATE = 'TASK_WAS_RUNNING'.freeze + def find(guid) AppUsageEvent.find(guid:) end @@ -152,7 +162,7 @@ def purge_and_reseed_started_apps! end def delete_events_older_than(cutoff_age_in_days) - Database::OldRecordCleanup.new(AppUsageEvent, cutoff_age_in_days, keep_at_least_one_record: true).delete + Database::OldRecordCleanup.new(AppUsageEvent, cutoff_age_in_days: cutoff_age_in_days, keep_at_least_one_record: true, keep_running_records: true).delete end private diff --git a/app/repositories/service_usage_event_repository.rb b/app/repositories/service_usage_event_repository.rb index 35fbef5a1f0..baad88a7772 100644 --- a/app/repositories/service_usage_event_repository.rb +++ b/app/repositories/service_usage_event_repository.rb @@ -7,6 +7,7 @@ class ServiceUsageEventRepository DELETED_EVENT_STATE = 'DELETED'.freeze CREATED_EVENT_STATE = 'CREATED'.freeze UPDATED_EVENT_STATE = 'UPDATED'.freeze + WAS_RUNNING_EVENT_STATE = 'WAS_RUNNING'.freeze def find(guid) ServiceUsageEvent.find(guid:) @@ -92,7 +93,7 @@ def purge_and_reseed_service_instances! end def delete_events_older_than(cutoff_age_in_days) - Database::OldRecordCleanup.new(ServiceUsageEvent, cutoff_age_in_days, keep_at_least_one_record: true).delete + Database::OldRecordCleanup.new(ServiceUsageEvent, cutoff_age_in_days: cutoff_age_in_days, keep_at_least_one_record: true, keep_running_records: true).delete end end end diff --git a/db/migrations/20260601120000_add_lifecycle_index_to_usage_events.rb b/db/migrations/20260601120000_add_lifecycle_index_to_usage_events.rb new file mode 100644 index 00000000000..728394f11c0 --- /dev/null +++ b/db/migrations/20260601120000_add_lifecycle_index_to_usage_events.rb @@ -0,0 +1,68 @@ +Sequel.migration do + no_transaction # to use the 'concurrently' option + + up do + if database_type == :postgres + VCAP::Migration.with_concurrent_timeout(self) do + add_index :app_usage_events, %i[state app_guid id], + name: :app_usage_events_lifecycle_index, + if_not_exists: true, + concurrently: true + + add_index :service_usage_events, %i[state service_instance_guid id], + name: :service_usage_events_lifecycle_index, + if_not_exists: true, + concurrently: true + end + + elsif database_type == :mysql + alter_table :app_usage_events do + # rubocop:disable Sequel/ConcurrentIndex + add_index %i[state app_guid id], name: :app_usage_events_lifecycle_index unless @db.indexes(:app_usage_events).include?(:app_usage_events_lifecycle_index) + # rubocop:enable Sequel/ConcurrentIndex + end + + alter_table :service_usage_events do + # rubocop:disable Sequel/ConcurrentIndex + unless @db.indexes(:service_usage_events).include?(:service_usage_events_lifecycle_index) + add_index %i[state service_instance_guid id], + name: :service_usage_events_lifecycle_index + end + # rubocop:enable Sequel/ConcurrentIndex + end + end + end + + down do + if database_type == :postgres + VCAP::Migration.with_concurrent_timeout(self) do + drop_index :app_usage_events, %i[state app_guid id], + name: :app_usage_events_lifecycle_index, + if_exists: true, + concurrently: true + + drop_index :service_usage_events, %i[state service_instance_guid id], + name: :service_usage_events_lifecycle_index, + if_exists: true, + concurrently: true + end + end + + if database_type == :mysql + alter_table :app_usage_events do + # rubocop:disable Sequel/ConcurrentIndex + drop_index %i[state app_guid id], name: :app_usage_events_lifecycle_index if @db.indexes(:app_usage_events).include?(:app_usage_events_lifecycle_index) + # rubocop:enable Sequel/ConcurrentIndex + end + + alter_table :service_usage_events do + # rubocop:disable Sequel/ConcurrentIndex + if @db.indexes(:service_usage_events).include?(:service_usage_events_lifecycle_index) + drop_index %i[state service_instance_guid id], + name: :service_usage_events_lifecycle_index + end + # rubocop:enable Sequel/ConcurrentIndex + end + end + end +end diff --git a/db/migrations/20260601120100_seed_was_running_app_usage_events.rb b/db/migrations/20260601120100_seed_was_running_app_usage_events.rb new file mode 100644 index 00000000000..3749f1c3967 --- /dev/null +++ b/db/migrations/20260601120100_seed_was_running_app_usage_events.rb @@ -0,0 +1,18 @@ +require 'database/was_running_backfill' + +Sequel.migration do + no_transaction # backfill manages its own per-batch transactions + + up do + logger = Steno.logger('cc.backfill.was_running') + if VCAP::WasRunningBackfill.skip? + VCAP::WasRunningBackfill.log_skip(logger, 'app') + else + VCAP::WasRunningBackfill.seed_app_usage_events(self, logger) + end + end + + down do + VCAP::WasRunningBackfill.delete_app_usage_events(self) + end +end diff --git a/db/migrations/20260601120200_seed_was_running_service_usage_events.rb b/db/migrations/20260601120200_seed_was_running_service_usage_events.rb new file mode 100644 index 00000000000..ead0eb7aea1 --- /dev/null +++ b/db/migrations/20260601120200_seed_was_running_service_usage_events.rb @@ -0,0 +1,18 @@ +require 'database/was_running_backfill' + +Sequel.migration do + no_transaction # backfill manages its own per-batch transactions + + up do + logger = Steno.logger('cc.backfill.was_running') + if VCAP::WasRunningBackfill.skip? + VCAP::WasRunningBackfill.log_skip(logger, 'service') + else + VCAP::WasRunningBackfill.seed_service_usage_events(self, logger) + end + end + + down do + VCAP::WasRunningBackfill.delete_service_usage_events(self) + end +end diff --git a/db/migrations/20260601120300_seed_was_running_task_usage_events.rb b/db/migrations/20260601120300_seed_was_running_task_usage_events.rb new file mode 100644 index 00000000000..2e38e888312 --- /dev/null +++ b/db/migrations/20260601120300_seed_was_running_task_usage_events.rb @@ -0,0 +1,18 @@ +require 'database/was_running_backfill' + +Sequel.migration do + no_transaction # backfill manages its own per-batch transactions + + up do + logger = Steno.logger('cc.backfill.was_running') + if VCAP::WasRunningBackfill.skip? + VCAP::WasRunningBackfill.log_skip(logger, 'task') + else + VCAP::WasRunningBackfill.seed_task_usage_events(self, logger) + end + end + + down do + VCAP::WasRunningBackfill.delete_task_usage_events(self) + end +end diff --git a/docs/v2/app_usage_events/list_all_app_usage_events.html b/docs/v2/app_usage_events/list_all_app_usage_events.html index 471dd0e8586..583bce89d12 100644 --- a/docs/v2/app_usage_events/list_all_app_usage_events.html +++ b/docs/v2/app_usage_events/list_all_app_usage_events.html @@ -631,9 +631,11 @@

Body

diff --git a/docs/v2/service_usage_events/list_service_usage_events.html b/docs/v2/service_usage_events/list_service_usage_events.html index 501999684c8..5f1b68b393a 100644 --- a/docs/v2/service_usage_events/list_service_usage_events.html +++ b/docs/v2/service_usage_events/list_service_usage_events.html @@ -290,6 +290,7 @@

Body

  • CREATED
  • DELETED
  • UPDATED
  • +
  • WAS_RUNNING
  • diff --git a/docs/v3/source/includes/resources/app_usage_events/_object.md.erb b/docs/v3/source/includes/resources/app_usage_events/_object.md.erb index 36afafaf3d1..515c6c84385 100644 --- a/docs/v3/source/includes/resources/app_usage_events/_object.md.erb +++ b/docs/v3/source/includes/resources/app_usage_events/_object.md.erb @@ -30,3 +30,14 @@ Name | Type | Description **instance_count.current** | _integer_ or `null` | Current instance count of the app that this event pertains to, if applicable **instance_count.previous** | _integer_ or `null` | Previous instance count of the app that this event pertains to, if applicable **links** | [_links object_](#links) | Links to related resources + +#### WAS_RUNNING and TASK_WAS_RUNNING events + +`WAS_RUNNING` and `TASK_WAS_RUNNING` are synthetic values for `state.current` recorded once per running process (`WAS_RUNNING`) and once per running task (`TASK_WAS_RUNNING`) by a one-time data migration when the keep-running cleanup feature was introduced. They mark every process and task that was already running at the time of the upgrade so that billing consumers can bootstrap from a complete baseline even if the original `STARTED`/`TASK_STARTED` events have been pruned. + +**Consumer interpretation** (read `WAS_RUNNING`/`STARTED` as `TASK_WAS_RUNNING`/`TASK_STARTED` for task events, which are keyed by `task.guid`): + +* If you have not previously recorded a `STARTED` event for this resource, treat `WAS_RUNNING` as equivalent to `STARTED`. +* If you have already recorded `STARTED` (or an earlier `WAS_RUNNING`) for this resource, treat as a redundant baseline confirmation and ignore. +* `created_at` reflects when the backfill migration ran, **not** when the app or task actually started. Treat `WAS_RUNNING` as a baseline marker that the resource was already running as of that timestamp, not as the true start of the running interval. +* `state.previous` on a `WAS_RUNNING` event is always `null`. Subsequent real events for the same resource will continue to report their actual prior process state in `state.previous` (typically `STARTED`). If you perform chain validation, treat `WAS_RUNNING` as equivalent to `STARTED` for the purpose of validating the next event's `state.previous`. diff --git a/docs/v3/source/includes/resources/service_usage_events/_object.md.erb b/docs/v3/source/includes/resources/service_usage_events/_object.md.erb index f904e1ff083..36c785d0a91 100644 --- a/docs/v3/source/includes/resources/service_usage_events/_object.md.erb +++ b/docs/v3/source/includes/resources/service_usage_events/_object.md.erb @@ -26,3 +26,13 @@ Name | Type | Description **service_broker.guid** | _string_ or `null` | Unique identifier of the service broker that this event pertains to, if applicable **service_broker.name** | _string_ or `null` | Name of the service broker that this event pertains to, if applicable **links** | [_links object_](#links) | Links to related resources + +#### WAS_RUNNING events + +`WAS_RUNNING` is a synthetic value for `state` recorded once per existing service instance by a one-time data migration when the keep-running cleanup feature was introduced. It marks every service instance that existed at the time of the upgrade so that billing consumers can bootstrap from a complete baseline of service instances even if the original `CREATED` events have been pruned. + +**Consumer interpretation:** + +* If you have not previously recorded a `CREATED` event for this service instance, treat `WAS_RUNNING` as equivalent to `CREATED`. +* If you have already recorded `CREATED` (or an earlier `WAS_RUNNING`) for this instance, treat as a redundant baseline confirmation and ignore. +* `created_at` reflects when the backfill migration ran, **not** when the service instance was created. Treat `WAS_RUNNING` as a baseline marker that the instance already existed as of that timestamp. diff --git a/lib/cloud_controller/config_schemas/api_schema.rb b/lib/cloud_controller/config_schemas/api_schema.rb index ac952146dc3..7c9b685478c 100644 --- a/lib/cloud_controller/config_schemas/api_schema.rb +++ b/lib/cloud_controller/config_schemas/api_schema.rb @@ -109,6 +109,7 @@ class ApiSchema < VCAP::Config optional(:migration_psql_concurrent_statement_timeout_in_seconds) => Integer, optional(:migration_psql_worker_memory_kb) => Integer, optional(:skip_bigint_id_migration) => bool, + optional(:skip_was_running_backfill) => bool, db: { optional(:database) => Hash, # db connection hash for sequel max_connections: Integer, # max connections in the connection pool diff --git a/lib/cloud_controller/config_schemas/migrate_schema.rb b/lib/cloud_controller/config_schemas/migrate_schema.rb index 971ff0999aa..eff4f6e2d14 100644 --- a/lib/cloud_controller/config_schemas/migrate_schema.rb +++ b/lib/cloud_controller/config_schemas/migrate_schema.rb @@ -10,6 +10,7 @@ class MigrateSchema < VCAP::Config optional(:migration_psql_concurrent_statement_timeout_in_seconds) => Integer, optional(:migration_psql_worker_memory_kb) => Integer, optional(:skip_bigint_id_migration) => bool, + optional(:skip_was_running_backfill) => bool, db: { optional(:database) => Hash, # db connection hash for sequel diff --git a/lib/database/batch_delete.rb b/lib/database/batch_delete.rb index 9fcc4b58fea..377eff097c9 100644 --- a/lib/database/batch_delete.rb +++ b/lib/database/batch_delete.rb @@ -11,10 +11,12 @@ def delete total_count = 0 loop do - set = dataset.limit(amount) - break if set.empty? + # Fetch the batch's ids in the same query that checks for emptiness, so the + # (potentially expensive) filtered dataset is evaluated once per batch. + ids = dataset.limit(amount).select_map(:id) + break if ids.empty? - total_count += delete_batch(set) + total_count += delete_batch(ids) end total_count @@ -22,8 +24,8 @@ def delete private - def delete_batch(set) - dataset.model.where(id: set.select_map(:id)).delete + def delete_batch(ids) + dataset.model.where(id: ids).delete end end end diff --git a/lib/database/old_record_cleanup.rb b/lib/database/old_record_cleanup.rb index 44227d84507..a8b1aa236da 100644 --- a/lib/database/old_record_cleanup.rb +++ b/lib/database/old_record_cleanup.rb @@ -3,25 +3,33 @@ module Database class OldRecordCleanup class NoCurrentTimestampError < StandardError; end - attr_reader :model, :days_ago, :keep_at_least_one_record + attr_reader :model, :cutoff_age_in_days, :keep_at_least_one_record, :keep_running_records - def initialize(model, days_ago, keep_at_least_one_record: false) + def initialize(model, cutoff_age_in_days:, keep_at_least_one_record: false, keep_running_records: false) @model = model - @days_ago = days_ago + @cutoff_age_in_days = cutoff_age_in_days @keep_at_least_one_record = keep_at_least_one_record + @keep_running_records = keep_running_records end + # keep_running_records and keep_at_least_one_record compose: a still-running + # resource (a beginning-state event with no later ending-state event for the + # same resource) is always retained, and keep_at_least_one_record additionally + # protects the single newest row so the table is never fully emptied for + # clients that poll the most recent event. def delete - cutoff_date = current_timestamp_from_database - days_ago.to_i.days - + cutoff_date = current_timestamp_from_database - cutoff_age_in_days.to_i.days old_records = model.dataset.where(Sequel.lit('created_at < ?', cutoff_date)) - if keep_at_least_one_record - last_record = model.order(:id).last - old_records = old_records.where(Sequel.lit('id < ?', last_record.id)) if last_record - end - logger.info("Cleaning up #{old_records.count} #{model.table_name} table rows") - Database::BatchDelete.new(old_records, 1000).delete + if keep_running_records + raise ArgumentError.new("keep_running_records requires #{model} to define .usage_lifecycles") unless model.respond_to?(:usage_lifecycles) + + delete_keeping_running_records(old_records) + else + old_records = exclude_newest_record(old_records) + logger.info("Cleaning up #{old_records.count} #{model.table_name} table rows") + Database::BatchDelete.new(old_records, 1000).delete + end end private @@ -35,5 +43,96 @@ def current_timestamp_from_database def logger @logger ||= Steno.logger('cc.old_record_cleanup') end + + # Deletes old records while retaining a usable billing baseline for + # still-running resources. + # + # For each lifecycle of the model, a beginning-state row (e.g. + # STARTED/CREATED/WAS_RUNNING) is prunable when: + # * a later ending-state row (e.g. STOPPED/DELETED) is also old -- the run is + # over; or + # * it is a superseded baseline: an earlier beginning of the same run and a + # later beginning both exist (and are old). Consumers only need the first + # beginning of the current run (the true start time) and the latest one (the + # current footprint); the in-between rows written by scaling/updating a + # running resource carry no baseline information. + # + # The deletes are ordered deliberately: prunable beginning rows are removed + # FIRST, while the rows that make them prunable still exist, so each beginning + # stays prunable until it is itself deleted. Only then are the ending rows (and + # any other, non-lifecycle states) removed. Reversing the order could strand a + # beginning row whose paired ending was deleted in an earlier batch. + def delete_keeping_running_records(old_records) + lifecycles = model.usage_lifecycles + prunable_beginnings = lifecycles.map { |lifecycle| prunable_beginnings_dataset(old_records, lifecycle) } + + # Everything that is not a beginning-state row of some lifecycle (ending rows + # plus any other, non-lifecycle states) is unconditionally prunable. + all_beginning_states = lifecycles.flat_map { |lifecycle| lifecycle.fetch(:beginning_states) } + unconditional_records = exclude_newest_record(old_records.exclude(state: all_beginning_states)) + + deleted_count = prunable_beginnings.sum { |dataset| Database::BatchDelete.new(dataset, 1000).delete } + deleted_count += Database::BatchDelete.new(unconditional_records, 1000).delete + + logger.info("Cleaned up #{deleted_count} #{model.table_name} table rows") + end + + # Builds the dataset of old beginning-state rows that are prunable for one + # lifecycle. All correlations use the (state, guid, id) lifecycle index; higher + # id implies later creation throughout. The probes only consider OLD rows: a + # superseded beginning is kept until the row superseding it is itself old, + # which keeps the pruning decision stable for consumers reading within the + # retention window. + def prunable_beginnings_dataset(old_records, lifecycle) + beginning_states = lifecycle.fetch(:beginning_states) + ending_state = lifecycle.fetch(:ending_state) + guid_column = lifecycle.fetch(:guid_column) + + old_beginnings = old_records.where(state: beginning_states) + old_endings = old_records.where(state: ending_state) + initial_records = old_beginnings.from_self(alias: :initial_records) + + # The run is over: an ending row for the same resource was created later. + matching_ending = old_endings.from_self(alias: :final_records). + where(Sequel[:final_records][guid_column] => Sequel[:initial_records][guid_column]). + where { Sequel[:final_records][:id] > Sequel[:initial_records][:id] }. + select(1).exists + + # Not the run's true start: an earlier beginning of the same run exists, + # i.e. one with no ending event between the two. + intervening_ending = old_endings.from_self(alias: :intervening_endings). + where(Sequel[:intervening_endings][guid_column] => Sequel[:earlier_beginnings][guid_column]). + where { Sequel[:intervening_endings][:id] > Sequel[:earlier_beginnings][:id] }. + where { Sequel[:intervening_endings][:id] < Sequel[:initial_records][:id] }. + select(1).exists + earlier_beginning_in_same_run = old_beginnings.from_self(alias: :earlier_beginnings). + where(Sequel[:earlier_beginnings][guid_column] => Sequel[:initial_records][guid_column]). + where { Sequel[:earlier_beginnings][:id] < Sequel[:initial_records][:id] }. + where(Sequel.~(intervening_ending)). + select(1).exists + + # Not the latest baseline: a later beginning for the same resource exists. + later_beginning = old_beginnings.from_self(alias: :later_beginnings). + where(Sequel[:later_beginnings][guid_column] => Sequel[:initial_records][guid_column]). + where { Sequel[:later_beginnings][:id] > Sequel[:initial_records][:id] }. + select(1).exists + + superseded_baseline = Sequel.&(earlier_beginning_in_same_run, later_beginning) + exclude_newest_record(initial_records.where(Sequel.|(matching_ending, superseded_baseline))) + end + + # When keep_at_least_one_record is set, never delete the single newest row so + # the table always retains at least one record. + def exclude_newest_record(records) + return records unless keep_at_least_one_record && newest_record_id + + records.where(Sequel.lit('id < ?', newest_record_id)) + end + + def newest_record_id + return @newest_record_id if defined?(@newest_record_id) + + @newest_record_id = model.order(:id).last&.id + end end end diff --git a/lib/database/was_running_backfill.rb b/lib/database/was_running_backfill.rb new file mode 100644 index 00000000000..653f6ab4211 --- /dev/null +++ b/lib/database/was_running_backfill.rb @@ -0,0 +1,305 @@ +# Backfills synthetic WAS_RUNNING usage events (TASK_WAS_RUNNING for tasks) for +# resources that were already running/existing when the keep-running cleanup +# feature was introduced. +# +# Run from thin Sequel migrations as a batched, idempotent, resumable operation +# (mirrors VCAP::BigintMigration): each batch is keyset-paginated by source-table +# id and runs in its own transaction, well under the migration statement timeout. +# Re-running is safe via the NOT EXISTS guard. Uses only raw Sequel — no Cloud +# Controller models/repositories — because the schema, not the app code, is the +# contract at migration time. +# rubocop:disable Metrics/ModuleLength +module VCAP::WasRunningBackfill + WAS_RUNNING = 'WAS_RUNNING'.freeze + # Task baselines use a distinct state because task events share the + # app_usage_events table with app events but carry an empty app_guid -- see + # Repositories::AppUsageEventRepository::TASK_WAS_RUNNING_EVENT_STATE. + TASK_WAS_RUNNING = 'TASK_WAS_RUNNING'.freeze + DEFAULT_BATCH_SIZE = 1000 + + class << self + # Operators can opt out (e.g. very large foundations, or downstream usage-event + # consumers that are not yet ready for the WAS_RUNNING state). Mirrors + # skip_bigint_id_migration. The flag is checked by the seed migrations rather + # than here, so the 'db:was_running_backfill' rake task can still seed the + # baseline after a skipped migration has been recorded as applied. + def skip? + VCAP::CloudController::Config.config&.get(:skip_was_running_backfill) || false + rescue VCAP::CloudController::Config::InvalidConfigPath + false + end + + def log_skip(logger, kind) + logger.info("skipping WAS_RUNNING #{kind} usage event backfill (skip_was_running_backfill is set); " \ + "run 'rake db:was_running_backfill' to seed the baseline later") + end + + def seed_app_usage_events(db, logger, batch_size: DEFAULT_BATCH_SIZE) + uuid_fn = uuid_function(db) + each_batch(db[:processes].where(state: 'STARTED'), batch_size) do |low, high| + db.run(app_usage_events_insert_sql(uuid_fn, low, high)) + logger.info("backfilled WAS_RUNNING app usage events up to process id #{high}") + end + sweep_stale_app_usage_events(db, logger, batch_size) + end + + def seed_service_usage_events(db, logger, batch_size: DEFAULT_BATCH_SIZE) + uuid_fn = uuid_function(db) + each_batch(db[:service_instances], batch_size) do |low, high| + db.run(service_usage_events_insert_sql(uuid_fn, low, high)) + logger.info("backfilled WAS_RUNNING service usage events up to service_instance id #{high}") + end + sweep_stale_service_usage_events(db, logger, batch_size) + end + + def seed_task_usage_events(db, logger, batch_size: DEFAULT_BATCH_SIZE) + uuid_fn = uuid_function(db) + each_batch(db[:tasks].where(state: 'RUNNING'), batch_size) do |low, high| + db.run(task_usage_events_insert_sql(uuid_fn, low, high)) + logger.info("backfilled TASK_WAS_RUNNING app usage events up to task id #{high}") + end + sweep_stale_task_usage_events(db, logger, batch_size) + end + + def delete_app_usage_events(db, batch_size: DEFAULT_BATCH_SIZE) + delete_was_running(db, :app_usage_events, batch_size, state: WAS_RUNNING) + end + + def delete_service_usage_events(db, batch_size: DEFAULT_BATCH_SIZE) + delete_was_running(db, :service_usage_events, batch_size, state: WAS_RUNNING) + end + + def delete_task_usage_events(db, batch_size: DEFAULT_BATCH_SIZE) + delete_was_running(db, :app_usage_events, batch_size, state: TASK_WAS_RUNNING) + end + + private + + # The latest-package / latest-droplet aggregates are scoped to the batch's apps + # so they never scan the whole packages/droplets tables (the cost that would + # otherwise blow the migration statement timeout). + # + # The COALESCEs are defensive: processes.memory/instances and apps.name are + # nullable columns whose defaults are normally backfilled by the model layer, + # but the target event columns are NOT NULL — one legacy NULL row written + # outside the models must not abort the whole migration. + # + # previous_state is NULL, not the current state. purge_and_reseed_started_apps! + # writes previous_state == state (STARTED/STARTED) to mean "no change, current + # snapshot" — safe only because its state is the real STARTED. Our state is the + # synthetic WAS_RUNNING, so STARTED here would invent a STARTED->WAS_RUNNING + # transition, and WAS_RUNNING would imply an earlier WAS_RUNNING event that + # never actually happened. + def app_usage_events_insert_sql(uuid_fn, low, high) + batch_apps = "SELECT app_guid FROM processes WHERE id > #{low} AND id <= #{high} AND state = 'STARTED'" + <<~SQL.squish + INSERT INTO app_usage_events ( + guid, created_at, + state, previous_state, + instance_count, previous_instance_count, + memory_in_mb_per_instance, previous_memory_in_mb_per_instance, + app_guid, app_name, + parent_app_guid, parent_app_name, + process_type, + space_guid, space_name, org_guid, + buildpack_guid, buildpack_name, + package_state, previous_package_state + ) + SELECT + #{uuid_fn}, CURRENT_TIMESTAMP, + '#{WAS_RUNNING}', NULL, + COALESCE(p.instances, 0), COALESCE(p.instances, 0), + COALESCE(p.memory, 0), COALESCE(p.memory, 0), + p.guid, COALESCE(parent_app.name, ''), + parent_app.guid, COALESCE(parent_app.name, ''), + p.type, + spaces.guid, spaces.name, organizations.guid, + desired_droplet.buildpack_receipt_buildpack_guid, desired_droplet.buildpack_receipt_buildpack, + CASE + WHEN latest_droplet.state = 'FAILED' THEN 'FAILED' + WHEN latest_droplet.state = 'STAGED' AND latest_droplet.guid = parent_app.droplet_guid THEN 'STAGED' + WHEN latest_package.state = 'FAILED' THEN 'FAILED' + ELSE 'PENDING' + END, + 'UNKNOWN' + FROM processes p + INNER JOIN apps parent_app ON parent_app.guid = p.app_guid + INNER JOIN spaces ON spaces.guid = parent_app.space_guid + INNER JOIN organizations ON organizations.id = spaces.organization_id + LEFT JOIN droplets desired_droplet ON desired_droplet.guid = parent_app.droplet_guid + LEFT JOIN ( + SELECT pkg.guid, pkg.app_guid, pkg.state FROM packages pkg + INNER JOIN ( + SELECT app_guid, MAX(id) AS max_id FROM packages + WHERE app_guid IN (#{batch_apps}) GROUP BY app_guid + ) lp_ids ON lp_ids.app_guid = pkg.app_guid AND lp_ids.max_id = pkg.id + ) latest_package ON latest_package.app_guid = parent_app.guid + LEFT JOIN ( + SELECT d.guid, d.package_guid, d.state FROM droplets d + INNER JOIN ( + SELECT package_guid, MAX(id) AS max_id FROM droplets + WHERE package_guid IN (SELECT guid FROM packages WHERE app_guid IN (#{batch_apps})) + GROUP BY package_guid + ) ld_ids ON ld_ids.package_guid = d.package_guid AND ld_ids.max_id = d.id + ) latest_droplet ON latest_droplet.package_guid = latest_package.guid + WHERE p.id > #{low} AND p.id <= #{high} + AND p.state = 'STARTED' + AND NOT EXISTS ( + SELECT 1 FROM app_usage_events WHERE state = '#{WAS_RUNNING}' AND app_guid = p.guid + ) + SQL + end + + def service_usage_events_insert_sql(uuid_fn, low, high) + <<~SQL.squish + INSERT INTO service_usage_events ( + guid, created_at, state, + service_instance_guid, service_instance_name, service_instance_type, + service_plan_guid, service_plan_name, + service_guid, service_label, + service_broker_name, service_broker_guid, + space_guid, space_name, org_guid + ) + SELECT + #{uuid_fn}, CURRENT_TIMESTAMP, '#{WAS_RUNNING}', + service_instances.guid, service_instances.name, + CASE WHEN service_instances.is_gateway_service THEN 'managed_service_instance' ELSE 'user_provided_service_instance' END, + service_plans.guid, service_plans.name, + services.guid, services.label, + service_brokers.name, service_brokers.guid, + spaces.guid, spaces.name, organizations.guid + FROM service_instances + INNER JOIN spaces ON spaces.id = service_instances.space_id + INNER JOIN organizations ON organizations.id = spaces.organization_id + LEFT OUTER JOIN service_plans ON service_plans.id = service_instances.service_plan_id + LEFT OUTER JOIN services ON services.id = service_plans.service_id + LEFT OUTER JOIN service_brokers ON service_brokers.id = services.service_broker_id + WHERE service_instances.id > #{low} AND service_instances.id <= #{high} + AND NOT EXISTS ( + SELECT 1 FROM service_usage_events WHERE state = '#{WAS_RUNNING}' AND service_instance_guid = service_instances.guid + ) + SQL + end + + # Mirrors AppUsageEventRepository#create_from_task: task events carry an + # empty app_guid/app_name and are keyed by task_guid instead. The COALESCE + # on memory_in_mb is defensive -- it is a nullable legacy column whose + # default is normally backfilled by the model layer. + # + # previous_state is NULL for the same reason as the app baseline (see + # app_usage_events_insert_sql): TASK_WAS_RUNNING is synthetic, so RUNNING here + # would invent a RUNNING->TASK_WAS_RUNNING transition that never happened. + def task_usage_events_insert_sql(uuid_fn, low, high) + <<~SQL.squish + INSERT INTO app_usage_events ( + guid, created_at, + state, previous_state, + instance_count, previous_instance_count, + memory_in_mb_per_instance, previous_memory_in_mb_per_instance, + app_guid, app_name, + parent_app_guid, parent_app_name, + space_guid, space_name, org_guid, + package_state, previous_package_state, + task_guid, task_name + ) + SELECT + #{uuid_fn}, CURRENT_TIMESTAMP, + '#{TASK_WAS_RUNNING}', NULL, + 1, 1, + COALESCE(t.memory_in_mb, 0), COALESCE(t.memory_in_mb, 0), + '', '', + parent_app.guid, COALESCE(parent_app.name, ''), + spaces.guid, spaces.name, organizations.guid, + 'STAGED', 'STAGED', + t.guid, t.name + FROM tasks t + INNER JOIN apps parent_app ON parent_app.guid = t.app_guid + INNER JOIN spaces ON spaces.guid = parent_app.space_guid + INNER JOIN organizations ON organizations.id = spaces.organization_id + WHERE t.id > #{low} AND t.id <= #{high} + AND t.state = 'RUNNING' + AND NOT EXISTS ( + SELECT 1 FROM app_usage_events WHERE state = '#{TASK_WAS_RUNNING}' AND task_guid = t.guid + ) + SQL + end + + # The API stays live while the backfill runs, so a batch's INSERT..SELECT can + # race a concurrent stop/delete: the statement's snapshot still sees the + # resource as running and inserts a WAS_RUNNING row with a HIGHER id than the + # concurrent STOPPED/DELETED event. No later ending event would ever arrive + # for that resource, so the keep-running cleanup would retain the row forever + # and consumers would read the stopped resource as still running. Sweep such + # rows afterwards in id-keyed batches (like the rollback deletes) -- the + # predicate is re-checked against current resource state each batch, and no + # single DELETE (or its lock hold) grows large enough to risk the timeout. + def sweep_stale_app_usage_events(db, logger, batch_size) + stale = db[:app_usage_events]. + where(state: WAS_RUNNING). + where(Sequel.lit("NOT EXISTS (SELECT 1 FROM processes WHERE processes.guid = app_usage_events.app_guid AND processes.state = 'STARTED')")) + deleted = batch_delete(db, :app_usage_events, stale, batch_size) + logger.info("swept #{deleted} stale WAS_RUNNING app usage events") if deleted.positive? + end + + def sweep_stale_service_usage_events(db, logger, batch_size) + stale = db[:service_usage_events]. + where(state: WAS_RUNNING). + where(Sequel.lit('NOT EXISTS (SELECT 1 FROM service_instances WHERE service_instances.guid = service_usage_events.service_instance_guid)')) + deleted = batch_delete(db, :service_usage_events, stale, batch_size) + logger.info("swept #{deleted} stale WAS_RUNNING service usage events") if deleted.positive? + end + + def sweep_stale_task_usage_events(db, logger, batch_size) + stale = db[:app_usage_events]. + where(state: TASK_WAS_RUNNING). + where(Sequel.lit("NOT EXISTS (SELECT 1 FROM tasks WHERE tasks.guid = app_usage_events.task_guid AND tasks.state = 'RUNNING')")) + deleted = batch_delete(db, :app_usage_events, stale, batch_size) + logger.info("swept #{deleted} stale TASK_WAS_RUNNING app usage events") if deleted.positive? + end + + # Keyset-paginate over a source dataset by id, yielding the (exclusive-low, + # inclusive-high) id bounds of each batch inside its own transaction. + def each_batch(source, batch_size) + cursor = 0 + loop do + high = source.where(Sequel.lit('id > ?', cursor)).order(:id).limit(batch_size).max(:id) + break if high.nil? + + # READ COMMITTED keeps MySQL's INSERT..SELECT from taking shared next-key + # locks on every scanned source row while the API serves traffic (safe: + # CF MySQL releases run with binlog_format=ROW). On Postgres it is the + # default isolation level anyway. + source.db.transaction(isolation: :committed) { yield(cursor, high) } + cursor = high + end + end + + # Delete every row matching `dataset` in batches of `batch_size` ids so + # neither the statement nor its lock hold ever grows large enough to risk the + # migration statement timeout. Returns the number of rows deleted. Re-selecting + # the dataset each pass is what lets the sweeps re-check current resource state. + def batch_delete(db, table, dataset, batch_size) + deleted = 0 + loop do + ids = dataset.limit(batch_size).select_map(:id) + break if ids.empty? + + deleted += db[table].where(id: ids).delete + end + deleted + end + + def delete_was_running(db, table, batch_size, state:) + batch_delete(db, table, db[table].where(state: state), batch_size) + end + + def uuid_function(db) + case db.database_type + when :postgres then 'get_uuid()' + when :mysql then 'UUID()' + else raise "unsupported database: #{db.database_type}" + end + end + end +end +# rubocop:enable Metrics/ModuleLength diff --git a/lib/tasks/db.rake b/lib/tasks/db.rake index 8cbdb229c9b..2530577e626 100644 --- a/lib/tasks/db.rake +++ b/lib/tasks/db.rake @@ -192,6 +192,23 @@ namespace :db do VCAP::BigintMigration.backfill(logger, db, args.table.to_sym, batch_size: args.batch_size.to_i, iterations: args.iterations.to_i) end + desc 'Seed WAS_RUNNING usage events for running apps, running tasks, and existing service instances ' \ + '(e.g. after the seed migrations were skipped via skip_was_running_backfill)' + task :was_running_backfill, %i[batch_size] => :environment do |_t, args| + args.with_defaults(batch_size: 1000) + + RakeConfig.context = :migrate + + require 'database/was_running_backfill' + logging_output + logger = Steno.logger('cc.db.was_running_backfill') + RakeConfig.config.load_db_encryption_key + db = VCAP::CloudController::DB.connect(RakeConfig.config.get(:db), logger) + VCAP::WasRunningBackfill.seed_app_usage_events(db, logger, batch_size: args.batch_size.to_i) + VCAP::WasRunningBackfill.seed_task_usage_events(db, logger, batch_size: args.batch_size.to_i) + VCAP::WasRunningBackfill.seed_service_usage_events(db, logger, batch_size: args.batch_size.to_i) + end + namespace :dev do desc 'Migrate the database set in spec/support/bootstrap/db_config' task migrate: :environment do diff --git a/spec/api/documentation/app_usage_event_api_spec.rb b/spec/api/documentation/app_usage_event_api_spec.rb index 4c7ff0c07a9..f8e45e421e0 100644 --- a/spec/api/documentation/app_usage_event_api_spec.rb +++ b/spec/api/documentation/app_usage_event_api_spec.rb @@ -47,7 +47,7 @@ "The desired state of the app or 'BUILDPACK_SET' when buildpack info has been set.", required: false, readonly: true, - valid_values: %w[STARTED STOPPED BUILDPACK_SET TASK_STARTED TASK_STOPPED] + valid_values: %w[STARTED STOPPED WAS_RUNNING BUILDPACK_SET TASK_STARTED TASK_STOPPED TASK_WAS_RUNNING] field :task_guid, 'The GUID of the task if one exists.', required: false, readonly: true, experimental: true field :task_name, 'The NAME of the task if one exists.', required: false, readonly: true, experimental: true diff --git a/spec/api/documentation/service_usage_events_api_spec.rb b/spec/api/documentation/service_usage_events_api_spec.rb index 4d3e5dc3cda..94c707d71b3 100644 --- a/spec/api/documentation/service_usage_events_api_spec.rb +++ b/spec/api/documentation/service_usage_events_api_spec.rb @@ -13,7 +13,7 @@ get '/v2/service_usage_events' do field :guid, 'The guid of the event.', required: false - field :state, 'The desired state of the service.', required: false, readonly: true, valid_values: %w[CREATED DELETED UPDATED] + field :state, 'The desired state of the service.', required: false, readonly: true, valid_values: %w[CREATED DELETED UPDATED WAS_RUNNING] field :org_guid, 'The GUID of the organization.', required: false, readonly: true field :space_guid, 'The GUID of the space.', required: false, readonly: true field :space_name, 'The name of the space.', required: false, readonly: true diff --git a/spec/migrations/20260601120000_add_lifecycle_index_to_usage_events_spec.rb b/spec/migrations/20260601120000_add_lifecycle_index_to_usage_events_spec.rb new file mode 100644 index 00000000000..3180fbf9456 --- /dev/null +++ b/spec/migrations/20260601120000_add_lifecycle_index_to_usage_events_spec.rb @@ -0,0 +1,44 @@ +require 'spec_helper' +require 'migrations/helpers/migration_shared_context' + +RSpec.describe 'migration to add the lifecycle index to the usage event tables', isolation: :truncation, type: :migration do + include_context 'migration' do + let(:migration_filename) { '20260601120000_add_lifecycle_index_to_usage_events.rb' } + end + + let(:run_migration) do + Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true) + end + + let(:revert_migration) do + Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true) + end + + it 'adds the lifecycle index to both usage event tables (idempotently) and removes it on revert' do + # Before migration: the lifecycle indexes should not exist. + expect(db.indexes(:app_usage_events)).not_to include(:app_usage_events_lifecycle_index) + expect(db.indexes(:service_usage_events)).not_to include(:service_usage_events_lifecycle_index) + + # Up migration adds both indexes with the expected column order. + expect { run_migration }.not_to raise_error + expect(db.indexes(:app_usage_events)).to include(:app_usage_events_lifecycle_index) + expect(db.indexes(:service_usage_events)).to include(:service_usage_events_lifecycle_index) + expect(db.indexes(:app_usage_events)[:app_usage_events_lifecycle_index][:columns]).to eq(%i[state app_guid id]) + expect(db.indexes(:service_usage_events)[:service_usage_events_lifecycle_index][:columns]).to eq(%i[state service_instance_guid id]) + + # Up migration is idempotent: running again does not fail. + expect { run_migration }.not_to raise_error + expect(db.indexes(:app_usage_events)).to include(:app_usage_events_lifecycle_index) + expect(db.indexes(:service_usage_events)).to include(:service_usage_events_lifecycle_index) + + # Down migration removes both indexes. + expect { revert_migration }.not_to raise_error + expect(db.indexes(:app_usage_events)).not_to include(:app_usage_events_lifecycle_index) + expect(db.indexes(:service_usage_events)).not_to include(:service_usage_events_lifecycle_index) + + # Down migration is idempotent: running again does not fail. + expect { revert_migration }.not_to raise_error + expect(db.indexes(:app_usage_events)).not_to include(:app_usage_events_lifecycle_index) + expect(db.indexes(:service_usage_events)).not_to include(:service_usage_events_lifecycle_index) + end +end diff --git a/spec/migrations/20260601120100_seed_was_running_app_usage_events_spec.rb b/spec/migrations/20260601120100_seed_was_running_app_usage_events_spec.rb new file mode 100644 index 00000000000..877cc3f04de --- /dev/null +++ b/spec/migrations/20260601120100_seed_was_running_app_usage_events_spec.rb @@ -0,0 +1,140 @@ +require 'spec_helper' +require 'migrations/helpers/migration_shared_context' + +RSpec.describe 'migration to seed WAS_RUNNING events for currently-running app processes', isolation: :truncation, type: :migration do + include_context 'migration' do + let(:migration_filename) { '20260601120100_seed_was_running_app_usage_events.rb' } + end + + let(:run_migration) do + Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true) + end + + let(:revert_migration) do + Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true) + end + + # Builds an org/space scaffold and returns the space guid that apps can reference. + def seed_space(suffix) + quota_id = db[:quota_definitions].insert(guid: "quota-#{suffix}", name: "quota-#{suffix}", non_basic_services_allowed: true, + total_services: 10, memory_limit: 1024, total_routes: 10) + org_id = db[:organizations].insert(guid: "org-#{suffix}", name: "org-#{suffix}", quota_definition_id: quota_id) + db[:spaces].insert(guid: "space-#{suffix}", name: "space-#{suffix}", organization_id: org_id) + "space-#{suffix}" + end + + def seed_app_event(suffix, state:, app_guid:) + db[:app_usage_events].insert(guid: "event-#{suffix}", created_at: Time.now.utc, state: state, + instance_count: 1, memory_in_mb_per_instance: 1, app_guid: app_guid, app_name: "app-#{suffix}", + space_guid: "space-#{suffix}", space_name: "space-#{suffix}", org_guid: "org-#{suffix}") + end + + describe 'up migration' do + it 'seeds WAS_RUNNING rows only for started processes, skips stopped ones, and preserves existing rows' do + space_guid = seed_space('main') + + # A running process with no droplet -> package_state PENDING + db[:apps].insert(guid: 'app-pending', name: 'pending-app', space_guid: space_guid) + db[:processes].insert(guid: 'proc-pending', app_guid: 'app-pending', state: 'STARTED', instances: 3, memory: 512, type: 'web') + + # A running process whose app points at a STAGED droplet -> package_state STAGED. + # Insert order avoids the apps.droplet_guid <-> droplets <-> packages.app_guid FK cycle. + db[:apps].insert(guid: 'app-staged', name: 'staged-app', space_guid: space_guid) + db[:packages].insert(guid: 'pkg-staged', app_guid: 'app-staged', state: 'READY') + db[:droplets].insert(guid: 'drop-staged', package_guid: 'pkg-staged', state: 'STAGED') + db[:apps].where(guid: 'app-staged').update(droplet_guid: 'drop-staged') + db[:processes].insert(guid: 'proc-staged', app_guid: 'app-staged', state: 'STARTED', instances: 1, memory: 256, type: 'web') + + # A running process whose latest droplet FAILED -> package_state FAILED + db[:apps].insert(guid: 'app-faildrop', name: 'faildrop-app', space_guid: space_guid) + db[:packages].insert(guid: 'pkg-faildrop', app_guid: 'app-faildrop', state: 'READY') + db[:droplets].insert(guid: 'drop-faildrop', package_guid: 'pkg-faildrop', state: 'FAILED') + db[:apps].where(guid: 'app-faildrop').update(droplet_guid: 'drop-faildrop') + db[:processes].insert(guid: 'proc-faildrop', app_guid: 'app-faildrop', state: 'STARTED', instances: 1, memory: 128, type: 'web') + + # A running process whose latest package FAILED (and has no droplet) -> package_state FAILED + db[:apps].insert(guid: 'app-failpkg', name: 'failpkg-app', space_guid: space_guid) + db[:packages].insert(guid: 'pkg-failpkg', app_guid: 'app-failpkg', state: 'FAILED') + db[:processes].insert(guid: 'proc-failpkg', app_guid: 'app-failpkg', state: 'STARTED', instances: 1, memory: 128, type: 'web') + + # A stopped process -> no WAS_RUNNING row + db[:apps].insert(guid: 'app-stopped', name: 'stopped-app', space_guid: space_guid) + db[:processes].insert(guid: 'proc-stopped', app_guid: 'app-stopped', state: 'STOPPED', instances: 1, memory: 128, type: 'web') + + # A running process that already has a WAS_RUNNING row -> not duplicated + db[:apps].insert(guid: 'app-existing', name: 'existing-app', space_guid: space_guid) + db[:processes].insert(guid: 'proc-existing', app_guid: 'app-existing', state: 'STARTED', instances: 1, memory: 128, type: 'web') + seed_app_event('existing', state: 'WAS_RUNNING', app_guid: 'proc-existing') + + # An unrelated pre-existing row that must be preserved (no truncate) + preexisting_id = seed_app_event('unrelated', state: 'STARTED', app_guid: 'some-other-guid') + + run_migration + + was_running = db[:app_usage_events].where(state: 'WAS_RUNNING') + # One row each for proc-pending, proc-staged, proc-faildrop and proc-failpkg, plus the + # pre-seeded proc-existing row (not duplicated). + expect(was_running.count).to eq(5) + expect(was_running.where(app_guid: 'proc-stopped').count).to eq(0) + expect(was_running.where(app_guid: 'proc-existing').count).to eq(1) + expect(db[:app_usage_events].where(id: preexisting_id).count).to eq(1) + + pending_row = was_running.where(app_guid: 'proc-pending').first + expect(pending_row[:guid]).to be_present + expect(pending_row[:previous_state]).to be_nil + expect(pending_row[:app_name]).to eq('pending-app') + expect(pending_row[:parent_app_guid]).to eq('app-pending') + expect(pending_row[:parent_app_name]).to eq('pending-app') + expect(pending_row[:process_type]).to eq('web') + expect(pending_row[:space_guid]).to eq(space_guid) + expect(pending_row[:space_name]).to eq('space-main') + expect(pending_row[:org_guid]).to eq('org-main') + expect(pending_row[:instance_count]).to eq(3) + expect(pending_row[:previous_instance_count]).to eq(3) + expect(pending_row[:memory_in_mb_per_instance]).to eq(512) + expect(pending_row[:previous_memory_in_mb_per_instance]).to eq(512) + expect(pending_row[:package_state]).to eq('PENDING') + expect(pending_row[:previous_package_state]).to eq('UNKNOWN') + + expect(was_running.where(app_guid: 'proc-staged').first[:package_state]).to eq('STAGED') + expect(was_running.where(app_guid: 'proc-faildrop').first[:package_state]).to eq('FAILED') + expect(was_running.where(app_guid: 'proc-failpkg').first[:package_state]).to eq('FAILED') + + # Idempotency of the seeding itself (the NOT EXISTS guard) is covered in + # spec/unit/lib/database/was_running_backfill_spec.rb, where seed_app_usage_events + # runs twice; the migrator does not re-apply an already-recorded migration. + end + + context 'when skip_was_running_backfill is set' do + before do + allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_was_running_backfill).and_return(true) + end + + it 'does not seed any WAS_RUNNING rows' do + space_guid = seed_space('main') + db[:apps].insert(guid: 'app-skip', name: 'skip-app', space_guid: space_guid) + db[:processes].insert(guid: 'proc-skip', app_guid: 'app-skip', state: 'STARTED', instances: 1, memory: 128, type: 'web') + + run_migration + + expect(db[:app_usage_events].where(state: 'WAS_RUNNING').count).to eq(0) + end + end + end + + describe 'down migration' do + it 'removes only the WAS_RUNNING rows' do + space_guid = seed_space('main') + db[:apps].insert(guid: 'app-down', name: 'down-app', space_guid: space_guid) + db[:processes].insert(guid: 'proc-down', app_guid: 'app-down', state: 'STARTED', instances: 1, memory: 128, type: 'web') + unrelated_id = seed_app_event('unrelated', state: 'STARTED', app_guid: 'some-other-guid') + + run_migration + expect(db[:app_usage_events].where(state: 'WAS_RUNNING').count).to eq(1) + + revert_migration + expect(db[:app_usage_events].where(state: 'WAS_RUNNING').count).to eq(0) + expect(db[:app_usage_events].where(id: unrelated_id).count).to eq(1) + end + end +end diff --git a/spec/migrations/20260601120200_seed_was_running_service_usage_events_spec.rb b/spec/migrations/20260601120200_seed_was_running_service_usage_events_spec.rb new file mode 100644 index 00000000000..7d0f79a296b --- /dev/null +++ b/spec/migrations/20260601120200_seed_was_running_service_usage_events_spec.rb @@ -0,0 +1,124 @@ +require 'spec_helper' +require 'migrations/helpers/migration_shared_context' + +RSpec.describe 'migration to seed WAS_RUNNING events for existing service instances', isolation: :truncation, type: :migration do + include_context 'migration' do + let(:migration_filename) { '20260601120200_seed_was_running_service_usage_events.rb' } + end + + let(:run_migration) do + Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true) + end + + let(:revert_migration) do + Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true) + end + + # Builds an org/space scaffold and returns the space id that instances can reference. + def seed_space(suffix) + quota_id = db[:quota_definitions].insert(guid: "quota-#{suffix}", name: "quota-#{suffix}", non_basic_services_allowed: true, + total_services: 10, memory_limit: 1024, total_routes: 10) + org_id = db[:organizations].insert(guid: "org-#{suffix}", name: "org-#{suffix}", quota_definition_id: quota_id) + db[:spaces].insert(guid: "space-#{suffix}", name: "space-#{suffix}", organization_id: org_id) + end + + # Builds a broker -> service -> plan chain and returns the plan id. + def seed_plan(suffix) + broker_id = db[:service_brokers].insert(guid: "broker-#{suffix}", name: "broker-#{suffix}", broker_url: 'http://example.com', auth_password: 'pw') + service_id = db[:services].insert(guid: "service-#{suffix}", label: "service-#{suffix}", description: 'desc', bindable: true, service_broker_id: broker_id) + db[:service_plans].insert(guid: "plan-#{suffix}", name: "plan-#{suffix}", description: 'desc', free: true, service_id: service_id, unique_id: "plan-unique-#{suffix}") + end + + def seed_service_event(suffix, state:, service_instance_guid:) + db[:service_usage_events].insert(guid: "event-#{suffix}", created_at: Time.now.utc, state: state, + org_guid: 'org-main', space_guid: 'space-main', space_name: 'space-main', + service_instance_guid: service_instance_guid, service_instance_name: "instance-#{suffix}", + service_instance_type: 'managed_service_instance') + end + + describe 'up migration' do + it 'seeds one WAS_RUNNING row per instance with the correct type and preserves existing rows' do + space_id = seed_space('main') + plan_id = seed_plan('main') + + # A managed instance -> managed_service_instance type with full broker chain. + db[:service_instances].insert(guid: 'managed-guid', name: 'my-instance', space_id: space_id, is_gateway_service: true, service_plan_id: plan_id) + # A user-provided instance -> user_provided_service_instance type with NULL plan/service/broker. + db[:service_instances].insert(guid: 'upsi-guid', name: 'upsi', space_id: space_id, is_gateway_service: false) + + # A managed instance that already has a WAS_RUNNING row -> not duplicated. + db[:service_instances].insert(guid: 'existing-guid', name: 'existing', space_id: space_id, is_gateway_service: true, service_plan_id: plan_id) + seed_service_event('existing', state: 'WAS_RUNNING', service_instance_guid: 'existing-guid') + + # An unrelated pre-existing row that must be preserved (no truncate). + preexisting_id = seed_service_event('unrelated', state: 'CREATED', service_instance_guid: 'managed-guid') + + run_migration + + was_running = db[:service_usage_events].where(state: 'WAS_RUNNING') + # One row each for managed-guid and upsi-guid, plus the pre-seeded existing-guid row (not duplicated). + expect(was_running.count).to eq(3) + expect(was_running.where(service_instance_guid: 'existing-guid').count).to eq(1) + expect(db[:service_usage_events].where(id: preexisting_id).count).to eq(1) + + managed_row = was_running.where(service_instance_guid: 'managed-guid').first + expect(managed_row[:guid]).to be_present + expect(managed_row[:service_instance_name]).to eq('my-instance') + expect(managed_row[:service_instance_type]).to eq('managed_service_instance') + expect(managed_row[:service_plan_guid]).to eq('plan-main') + expect(managed_row[:service_plan_name]).to eq('plan-main') + expect(managed_row[:service_guid]).to eq('service-main') + expect(managed_row[:service_label]).to eq('service-main') + expect(managed_row[:service_broker_name]).to eq('broker-main') + expect(managed_row[:service_broker_guid]).to eq('broker-main') + expect(managed_row[:space_guid]).to eq('space-main') + expect(managed_row[:space_name]).to eq('space-main') + expect(managed_row[:org_guid]).to eq('org-main') + + upsi_row = was_running.where(service_instance_guid: 'upsi-guid').first + expect(upsi_row[:service_instance_type]).to eq('user_provided_service_instance') + expect(upsi_row[:service_plan_guid]).to be_nil + expect(upsi_row[:service_plan_name]).to be_nil + expect(upsi_row[:service_guid]).to be_nil + expect(upsi_row[:service_label]).to be_nil + expect(upsi_row[:service_broker_name]).to be_nil + expect(upsi_row[:service_broker_guid]).to be_nil + + # Idempotency of the seeding itself (the NOT EXISTS guard) is covered in + # spec/unit/lib/database/was_running_backfill_spec.rb, where seed_service_usage_events + # runs twice; the migrator does not re-apply an already-recorded migration. + end + + context 'when skip_was_running_backfill is set' do + before do + allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_was_running_backfill).and_return(true) + end + + it 'does not seed any WAS_RUNNING rows' do + space_id = seed_space('main') + plan_id = seed_plan('main') + db[:service_instances].insert(guid: 'managed-guid', name: 'my-instance', space_id: space_id, is_gateway_service: true, service_plan_id: plan_id) + + run_migration + + expect(db[:service_usage_events].where(state: 'WAS_RUNNING').count).to eq(0) + end + end + end + + describe 'down migration' do + it 'removes only the WAS_RUNNING rows' do + space_id = seed_space('main') + plan_id = seed_plan('main') + db[:service_instances].insert(guid: 'managed-guid', name: 'my-instance', space_id: space_id, is_gateway_service: true, service_plan_id: plan_id) + unrelated_id = seed_service_event('unrelated', state: 'CREATED', service_instance_guid: 'managed-guid') + + run_migration + expect(db[:service_usage_events].where(state: 'WAS_RUNNING').count).to eq(1) + + revert_migration + expect(db[:service_usage_events].where(state: 'WAS_RUNNING').count).to eq(0) + expect(db[:service_usage_events].where(id: unrelated_id).count).to eq(1) + end + end +end diff --git a/spec/migrations/20260601120300_seed_was_running_task_usage_events_spec.rb b/spec/migrations/20260601120300_seed_was_running_task_usage_events_spec.rb new file mode 100644 index 00000000000..e701c883c8e --- /dev/null +++ b/spec/migrations/20260601120300_seed_was_running_task_usage_events_spec.rb @@ -0,0 +1,117 @@ +require 'spec_helper' +require 'migrations/helpers/migration_shared_context' + +RSpec.describe 'migration to seed TASK_WAS_RUNNING events for currently-running tasks', isolation: :truncation, type: :migration do + include_context 'migration' do + let(:migration_filename) { '20260601120300_seed_was_running_task_usage_events.rb' } + end + + let(:run_migration) do + Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true) + end + + let(:revert_migration) do + Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true) + end + + # Builds an org/space/app scaffold and returns the app guid that tasks can reference. + def seed_app(suffix) + quota_id = db[:quota_definitions].insert(guid: "quota-#{suffix}", name: "quota-#{suffix}", non_basic_services_allowed: true, + total_services: 10, memory_limit: 1024, total_routes: 10) + org_id = db[:organizations].insert(guid: "org-#{suffix}", name: "org-#{suffix}", quota_definition_id: quota_id) + db[:spaces].insert(guid: "space-#{suffix}", name: "space-#{suffix}", organization_id: org_id) + db[:apps].insert(guid: "app-#{suffix}", name: "app-#{suffix}", space_guid: "space-#{suffix}") + "app-#{suffix}" + end + + def seed_task(suffix, state:, app_guid:, memory_in_mb: 256) + db[:tasks].insert(guid: "task-#{suffix}", name: "task-#{suffix}", command: 'bundle exec rake', state: state, + app_guid: app_guid, droplet_guid: "droplet-#{suffix}", memory_in_mb: memory_in_mb) + end + + def seed_task_event(suffix, state:, task_guid:) + db[:app_usage_events].insert(guid: "event-#{suffix}", created_at: Time.now.utc, state: state, + instance_count: 1, memory_in_mb_per_instance: 1, app_guid: '', app_name: '', + space_guid: "space-#{suffix}", space_name: "space-#{suffix}", org_guid: "org-#{suffix}", + task_guid: task_guid, task_name: "task-#{suffix}") + end + + describe 'up migration' do + it 'seeds TASK_WAS_RUNNING rows only for running tasks, skips finished ones, and preserves existing rows' do + app_guid = seed_app('main') + + seed_task('running', state: 'RUNNING', app_guid: app_guid, memory_in_mb: 512) + seed_task('succeeded', state: 'SUCCEEDED', app_guid: app_guid) + seed_task('pending', state: 'PENDING', app_guid: app_guid) + + # A running task that already has a TASK_WAS_RUNNING row -> not duplicated + seed_task('existing', state: 'RUNNING', app_guid: app_guid) + seed_task_event('existing', state: 'TASK_WAS_RUNNING', task_guid: 'task-existing') + + # An unrelated pre-existing row that must be preserved (no truncate) + preexisting_id = seed_task_event('unrelated', state: 'TASK_STARTED', task_guid: 'some-other-task') + + run_migration + + task_was_running = db[:app_usage_events].where(state: 'TASK_WAS_RUNNING') + # One row for task-running, plus the pre-seeded task-existing row (not duplicated). + expect(task_was_running.count).to eq(2) + expect(task_was_running.where(task_guid: 'task-succeeded').count).to eq(0) + expect(task_was_running.where(task_guid: 'task-pending').count).to eq(0) + expect(task_was_running.where(task_guid: 'task-existing').count).to eq(1) + expect(db[:app_usage_events].where(id: preexisting_id).count).to eq(1) + + row = task_was_running.where(task_guid: 'task-running').first + expect(row[:guid]).to be_present + expect(row[:previous_state]).to be_nil + expect(row[:task_name]).to eq('task-running') + expect(row[:app_guid]).to eq('') + expect(row[:app_name]).to eq('') + expect(row[:parent_app_guid]).to eq(app_guid) + expect(row[:parent_app_name]).to eq('app-main') + expect(row[:space_guid]).to eq('space-main') + expect(row[:space_name]).to eq('space-main') + expect(row[:org_guid]).to eq('org-main') + expect(row[:instance_count]).to eq(1) + expect(row[:previous_instance_count]).to eq(1) + expect(row[:memory_in_mb_per_instance]).to eq(512) + expect(row[:previous_memory_in_mb_per_instance]).to eq(512) + expect(row[:package_state]).to eq('STAGED') + expect(row[:previous_package_state]).to eq('STAGED') + + # Idempotency of the seeding itself (the NOT EXISTS guard) is covered in + # spec/unit/lib/database/was_running_backfill_spec.rb, where seed_task_usage_events + # runs twice; the migrator does not re-apply an already-recorded migration. + end + + context 'when skip_was_running_backfill is set' do + before do + allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_was_running_backfill).and_return(true) + end + + it 'does not seed any TASK_WAS_RUNNING rows' do + app_guid = seed_app('main') + seed_task('skip', state: 'RUNNING', app_guid: app_guid) + + run_migration + + expect(db[:app_usage_events].where(state: 'TASK_WAS_RUNNING').count).to eq(0) + end + end + end + + describe 'down migration' do + it 'removes only the TASK_WAS_RUNNING rows' do + app_guid = seed_app('main') + seed_task('down', state: 'RUNNING', app_guid: app_guid) + unrelated_id = seed_task_event('unrelated', state: 'TASK_STARTED', task_guid: 'some-other-task') + + run_migration + expect(db[:app_usage_events].where(state: 'TASK_WAS_RUNNING').count).to eq(1) + + revert_migration + expect(db[:app_usage_events].where(state: 'TASK_WAS_RUNNING').count).to eq(0) + expect(db[:app_usage_events].where(id: unrelated_id).count).to eq(1) + end + end +end diff --git a/spec/migrations/helpers/bigint_migration_step1_shared_context.rb b/spec/migrations/helpers/bigint_migration_step1_shared_context.rb index 81b48f484a7..3b34a4af58b 100644 --- a/spec/migrations/helpers/bigint_migration_step1_shared_context.rb +++ b/spec/migrations/helpers/bigint_migration_step1_shared_context.rb @@ -10,6 +10,9 @@ let(:skip_bigint_id_migration) { nil } before do + # Default so config reads from other migrations during the shared after-hook's forward run + # (e.g. the WAS_RUNNING backfill's skip? check) don't trip these strict argument matchers. + allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).and_call_original allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_bigint_id_migration).and_return(skip_bigint_id_migration) allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:migration_psql_concurrent_statement_timeout_in_seconds).and_return(300) end diff --git a/spec/migrations/helpers/bigint_migration_step3_shared_context.rb b/spec/migrations/helpers/bigint_migration_step3_shared_context.rb index 2378c849909..5f9832d02bd 100644 --- a/spec/migrations/helpers/bigint_migration_step3_shared_context.rb +++ b/spec/migrations/helpers/bigint_migration_step3_shared_context.rb @@ -14,6 +14,9 @@ let(:logger) { double(:logger, info: nil) } before do + # Default so config reads from other migrations during the shared after-hook's forward run + # (e.g. the WAS_RUNNING backfill's skip? check) don't trip these strict argument matchers. + allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).and_call_original allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_bigint_id_migration).and_return(skip_bigint_id_migration) allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:migration_psql_concurrent_statement_timeout_in_seconds).and_return(300) end @@ -119,6 +122,9 @@ let(:logger) { double(:logger, info: nil) } before do + # Default so config reads from other migrations during the shared after-hook's forward run + # (e.g. the WAS_RUNNING backfill's skip? check) don't trip these strict argument matchers. + allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).and_call_original allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_bigint_id_migration).and_return(skip_bigint_id_migration) allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:migration_psql_concurrent_statement_timeout_in_seconds).and_return(300) end diff --git a/spec/unit/jobs/runtime/app_usage_events_cleanup_spec.rb b/spec/unit/jobs/runtime/app_usage_events_cleanup_spec.rb index 8018ee0ee69..ce4f1df9956 100644 --- a/spec/unit/jobs/runtime/app_usage_events_cleanup_spec.rb +++ b/spec/unit/jobs/runtime/app_usage_events_cleanup_spec.rb @@ -5,7 +5,7 @@ module Jobs::Runtime RSpec.describe AppUsageEventsCleanup, job_context: :worker do let(:cutoff_age_in_days) { 30 } let(:logger) { double(Steno::Logger, info: nil) } - let!(:event_before_threshold) { AppUsageEvent.make(created_at: (cutoff_age_in_days + 1).days.ago) } + let!(:event_before_threshold) { AppUsageEvent.make(created_at: (cutoff_age_in_days + 1).days.ago, state: 'STOPPED') } let!(:event_after_threshold) { AppUsageEvent.make(created_at: (cutoff_age_in_days - 1).days.ago) } subject(:job) do diff --git a/spec/unit/jobs/services/service_usage_events_cleanup_spec.rb b/spec/unit/jobs/services/service_usage_events_cleanup_spec.rb index 181ad89e18d..da11beb39b5 100644 --- a/spec/unit/jobs/services/service_usage_events_cleanup_spec.rb +++ b/spec/unit/jobs/services/service_usage_events_cleanup_spec.rb @@ -5,7 +5,7 @@ module Jobs::Services RSpec.describe ServiceUsageEventsCleanup, job_context: :worker do let(:cutoff_age_in_days) { 30 } let(:logger) { double(Steno::Logger, info: nil) } - let!(:event_before_threshold) { ServiceUsageEvent.make(created_at: (cutoff_age_in_days + 1).days.ago) } + let!(:event_before_threshold) { ServiceUsageEvent.make(created_at: (cutoff_age_in_days + 1).days.ago, state: 'DELETED') } let!(:event_after_threshold) { ServiceUsageEvent.make(created_at: (cutoff_age_in_days - 1).days.ago) } subject(:job) do diff --git a/spec/unit/lib/database/old_record_cleanup_spec.rb b/spec/unit/lib/database/old_record_cleanup_spec.rb index 7b2258bbaec..167ec52e792 100644 --- a/spec/unit/lib/database/old_record_cleanup_spec.rb +++ b/spec/unit/lib/database/old_record_cleanup_spec.rb @@ -1,6 +1,148 @@ require 'spec_helper' require 'database/old_record_cleanup' +# Lifecycle-aware cleanup behavior that is identical for app and service usage +# events. Expects the including context to define :model, :beginning_state, +# :ending_state and :guid_column. +RSpec.shared_examples 'usage event lifecycle cleanup' do + def make_event(state, guid, created_at:) + model.make(state: state, created_at: created_at, guid_column => guid) + end + + def run_cleanup(**opts) + Database::OldRecordCleanup.new(model, cutoff_age_in_days: 1, keep_running_records: true, **opts).delete + end + + it 'keeps an old beginning record when there is no corresponding ending record' do + old_beginning = make_event(beginning_state, 'guid1', created_at: 2.days.ago) + + run_cleanup + + expect(old_beginning.reload).to be_present + expect(model.count).to eq(1) + end + + it 'keeps an old beginning record when the ending record is fresh' do + old_beginning = make_event(beginning_state, 'guid1', created_at: 2.days.ago) + fresh_ending = make_event(ending_state, 'guid1', created_at: 1.day.ago + 1.minute) + + run_cleanup + + expect(old_beginning.reload).to be_present + expect(fresh_ending.reload).to be_present + end + + it 'keeps an old beginning record when the ending record was inserted first' do + old_ending = make_event(ending_state, 'guid1', created_at: 3.days.ago) + old_beginning = make_event(beginning_state, 'guid1', created_at: 2.days.ago) + + run_cleanup + + expect(old_beginning.reload).to be_present + expect { old_ending.reload }.to raise_error(Sequel::NoExistingObject) + end + + it 'uses insertion order rather than created_at to pair beginnings with endings' do + old_beginning = make_event(beginning_state, 'guid1', created_at: 2.days.ago) + # Earlier timestamp but higher id: the resource is no longer running. + old_ending = make_event(ending_state, 'guid1', created_at: 3.days.ago) + + run_cleanup + + expect { old_beginning.reload }.to raise_error(Sequel::NoExistingObject) + expect { old_ending.reload }.to raise_error(Sequel::NoExistingObject) + end + + it 'deletes all records of completed runs spanning multiple cycles' do + cycle1_beginning = make_event(beginning_state, 'guid1', created_at: 10.days.ago) + cycle1_ending = make_event(ending_state, 'guid1', created_at: 9.days.ago) + cycle2_beginning = make_event(beginning_state, 'guid1', created_at: 8.days.ago) + cycle2_ending = make_event(ending_state, 'guid1', created_at: 7.days.ago) + + run_cleanup + + expect { cycle1_beginning.reload }.to raise_error(Sequel::NoExistingObject) + expect { cycle1_ending.reload }.to raise_error(Sequel::NoExistingObject) + expect { cycle2_beginning.reload }.to raise_error(Sequel::NoExistingObject) + expect { cycle2_ending.reload }.to raise_error(Sequel::NoExistingObject) + end + + it 'deletes an old ending record that has no beginning record' do + orphan_ending = make_event(ending_state, 'guid1', created_at: 10.days.ago) + + run_cleanup + + expect { orphan_ending.reload }.to raise_error(Sequel::NoExistingObject) + end + + it 'keeps an old WAS_RUNNING record when there is no corresponding ending record' do + was_running = make_event('WAS_RUNNING', 'guid1', created_at: 2.days.ago) + + run_cleanup + + expect(was_running.reload).to be_present + expect(model.count).to eq(1) + end + + it 'deletes an old WAS_RUNNING record when a later old ending record exists' do + was_running = make_event('WAS_RUNNING', 'guid1', created_at: 5.days.ago) + old_ending = make_event(ending_state, 'guid1', created_at: 4.days.ago) + + run_cleanup + + expect { was_running.reload }.to raise_error(Sequel::NoExistingObject) + expect { old_ending.reload }.to raise_error(Sequel::NoExistingObject) + end + + it 'keeps both the first beginning and a later WAS_RUNNING record of a running resource' do + old_beginning = make_event(beginning_state, 'guid1', created_at: 10.days.ago) + was_running = make_event('WAS_RUNNING', 'guid1', created_at: 5.days.ago) + + run_cleanup + + expect(old_beginning.reload).to be_present + expect(was_running.reload).to be_present + end + + it 'prunes superseded baselines of a running resource, keeping the first beginning (true start) and the latest one (current footprint)' do + first = make_event(beginning_state, 'guid1', created_at: 5.days.ago) + middle = make_event(beginning_state, 'guid1', created_at: 4.days.ago) + latest = make_event(beginning_state, 'guid1', created_at: 3.days.ago) + + run_cleanup + + expect(first.reload).to be_present + expect { middle.reload }.to raise_error(Sequel::NoExistingObject) + expect(latest.reload).to be_present + end + + it 'does not prune a superseded baseline until the beginning that supersedes it is itself old' do + first = make_event(beginning_state, 'guid1', created_at: 5.days.ago) + middle = make_event(beginning_state, 'guid1', created_at: 4.days.ago) + fresh_latest = make_event(beginning_state, 'guid1', created_at: 1.day.ago + 1.minute) + + run_cleanup + + expect(first.reload).to be_present + expect(middle.reload).to be_present + expect(fresh_latest.reload).to be_present + end + + it 'treats the first beginning after an ended run as the true start, not as a superseded baseline' do + ended_run_beginning = make_event(beginning_state, 'guid1', created_at: 10.days.ago) + ended_run_ending = make_event(ending_state, 'guid1', created_at: 9.days.ago) + current_run_first = make_event(beginning_state, 'guid1', created_at: 8.days.ago) + current_run_latest = make_event(beginning_state, 'guid1', created_at: 7.days.ago) + + run_cleanup + + expect { ended_run_beginning.reload }.to raise_error(Sequel::NoExistingObject) + expect { ended_run_ending.reload }.to raise_error(Sequel::NoExistingObject) + expect(current_run_first.reload).to be_present + expect(current_run_latest.reload).to be_present + end +end + RSpec.describe Database::OldRecordCleanup do describe '#delete' do let!(:stale_event1) { VCAP::CloudController::Event.make(created_at: 1.day.ago - 1.minute) } @@ -9,7 +151,7 @@ let!(:fresh_event) { VCAP::CloudController::Event.make(created_at: 1.day.ago + 1.minute) } it 'deletes records older than specified days' do - record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, 1) + record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, cutoff_age_in_days: 1) expect do record_cleanup.delete @@ -20,23 +162,14 @@ expect { stale_event2.reload }.to raise_error(Sequel::NoExistingObject) end - context "when there are no records at all but you're trying to keep at least one" do - it "doesn't keep one because there aren't any to keep" do - record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::ServiceUsageEvent, 1, keep_at_least_one_record: true) - - expect { record_cleanup.delete }.not_to raise_error - expect(VCAP::CloudController::ServiceUsageEvent.count).to eq(0) - end - end - it 'only retrieves the current timestamp from the database once' do expect(VCAP::CloudController::Event.db).to receive(:fetch).with('SELECT CURRENT_TIMESTAMP as now').once.and_call_original - record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, 1) + record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, cutoff_age_in_days: 1) record_cleanup.delete end it 'keeps the last row when :keep_at_least_one_record is true even if it is older than the cutoff date' do - record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, 0, keep_at_least_one_record: true) + record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, cutoff_age_in_days: 0, keep_at_least_one_record: true) expect do record_cleanup.delete @@ -46,5 +179,235 @@ expect { stale_event1.reload }.to raise_error(Sequel::NoExistingObject) expect { stale_event2.reload }.to raise_error(Sequel::NoExistingObject) end + + context "when there are no records at all but you're trying to keep at least one" do + it "doesn't keep one because there aren't any to keep" do + record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::ServiceUsageEvent, cutoff_age_in_days: 1, keep_at_least_one_record: true) + + expect { record_cleanup.delete }.not_to raise_error + expect(VCAP::CloudController::ServiceUsageEvent.count).to eq(0) + end + end + + context 'when keep_running_records is requested for a model without usage lifecycles' do + it 'raises rather than silently deleting the records of running resources' do + record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, cutoff_age_in_days: 1, keep_running_records: true) + + expect { record_cleanup.delete }.to raise_error(ArgumentError, /usage_lifecycles/) + end + end + + describe 'keeping running AppUsageEvent records' do + let(:model) { VCAP::CloudController::AppUsageEvent } + let(:beginning_state) { 'STARTED' } + let(:ending_state) { 'STOPPED' } + let(:guid_column) { :app_guid } + + include_examples 'usage event lifecycle cleanup' + + describe 'task lifecycle' do + it 'keeps the TASK_STARTED record of a still-running task' do + task_started = model.make(created_at: 2.days.ago, state: 'TASK_STARTED', task_guid: 'task1', app_guid: 'app1') + + run_cleanup + + expect(task_started.reload).to be_present + end + + it 'deletes the TASK_STARTED and TASK_STOPPED records of a completed task' do + task_started = model.make(created_at: 5.days.ago, state: 'TASK_STARTED', task_guid: 'task1', app_guid: 'app1') + task_stopped = model.make(created_at: 4.days.ago, state: 'TASK_STOPPED', task_guid: 'task1', app_guid: 'app1') + + run_cleanup + + expect { task_started.reload }.to raise_error(Sequel::NoExistingObject) + expect { task_stopped.reload }.to raise_error(Sequel::NoExistingObject) + end + + it 'keeps the TASK_STARTED record when the TASK_STOPPED record is fresh' do + task_started = model.make(created_at: 2.days.ago, state: 'TASK_STARTED', task_guid: 'task1', app_guid: 'app1') + fresh_task_stopped = model.make(created_at: 1.day.ago + 1.minute, state: 'TASK_STOPPED', task_guid: 'task1', app_guid: 'app1') + + run_cleanup + + expect(task_started.reload).to be_present + expect(fresh_task_stopped.reload).to be_present + end + + it 'correlates task records by task_guid, not by app_guid' do + running_task_started = model.make(created_at: 5.days.ago, state: 'TASK_STARTED', task_guid: 'task1', app_guid: 'app1') + other_task_stopped = model.make(created_at: 4.days.ago, state: 'TASK_STOPPED', task_guid: 'task2', app_guid: 'app1') + + run_cleanup + + expect(running_task_started.reload).to be_present + expect { other_task_stopped.reload }.to raise_error(Sequel::NoExistingObject) + end + + it 'keeps the TASK_WAS_RUNNING baseline of a still-running task' do + baseline = model.make(created_at: 2.days.ago, state: 'TASK_WAS_RUNNING', task_guid: 'task1', app_guid: '') + + run_cleanup + + expect(baseline.reload).to be_present + end + + it 'deletes the TASK_WAS_RUNNING baseline once a later TASK_STOPPED record is also old' do + baseline = model.make(created_at: 5.days.ago, state: 'TASK_WAS_RUNNING', task_guid: 'task1', app_guid: '') + task_stopped = model.make(created_at: 4.days.ago, state: 'TASK_STOPPED', task_guid: 'task1', app_guid: '') + + run_cleanup + + expect { baseline.reload }.to raise_error(Sequel::NoExistingObject) + expect { task_stopped.reload }.to raise_error(Sequel::NoExistingObject) + end + + # Task events carry an empty app_guid, so if task baselines shared the + # WAS_RUNNING state they would all correlate with each other through the + # app lifecycle (keyed by app_guid) and be wrongly pruned as superseded + # baselines of one phantom app. The distinct TASK_WAS_RUNNING state keeps + # the app lifecycle blind to them. + it 'does not prune the baselines of distinct running tasks as superseded baselines of one phantom app' do + baselines = %w[task1 task2 task3].each_with_index.map do |task_guid, i| + model.make(created_at: (5 - i).days.ago, state: 'TASK_WAS_RUNNING', task_guid: task_guid, app_guid: '') + end + + run_cleanup + + baselines.each { |baseline| expect(baseline.reload).to be_present } + end + end + + it 'deletes records with non-lifecycle states' do + buildpack_event1 = model.make(created_at: 3.days.ago, state: 'BUILDPACK_SET', app_guid: 'app1') + buildpack_event2 = model.make(created_at: 2.days.ago, state: 'BUILDPACK_SET', app_guid: 'app2') + + run_cleanup + + expect { buildpack_event1.reload }.to raise_error(Sequel::NoExistingObject) + expect { buildpack_event2.reload }.to raise_error(Sequel::NoExistingObject) + end + + it 'deletes old records with a corresponding stop record even if app_guid is an empty string' do + empty_guid_start = model.make(created_at: 5.days.ago, state: 'STARTED', app_guid: '') + different_empty_start = model.make(created_at: 4.days.ago, state: 'STARTED', app_guid: '') + empty_guid_stop = model.make(created_at: 3.days.ago, state: 'STOPPED', app_guid: '') + + run_cleanup + + # Both STARTs with an empty-string guid have a STOP with an empty-string guid after them. + expect { empty_guid_start.reload }.to raise_error(Sequel::NoExistingObject) + expect { different_empty_start.reload }.to raise_error(Sequel::NoExistingObject) + expect { empty_guid_stop.reload }.to raise_error(Sequel::NoExistingObject) + end + + it 'works when cutoff_age_in_days is 0' do + old_start = model.make(created_at: 1.second.ago, state: 'STARTED', app_guid: 'running-app') + + Database::OldRecordCleanup.new(model, cutoff_age_in_days: 0, keep_running_records: true).delete + + expect(old_start.reload).to be_present + end + + it 'does not error if the table is empty' do + model.dataset.delete + + expect { run_cleanup }.not_to raise_error + end + + it 'deletes all old records when keep_running_records is false' do + old_start = model.make(created_at: 5.days.ago, state: 'STARTED', app_guid: 'app1') + old_stop = model.make(created_at: 4.days.ago, state: 'STOPPED', app_guid: 'app1') + old_running_start = model.make(created_at: 3.days.ago, state: 'STARTED', app_guid: 'running-app') + + Database::OldRecordCleanup.new(model, cutoff_age_in_days: 1, keep_running_records: false).delete + + expect { old_start.reload }.to raise_error(Sequel::NoExistingObject) + expect { old_stop.reload }.to raise_error(Sequel::NoExistingObject) + expect { old_running_start.reload }.to raise_error(Sequel::NoExistingObject) + end + + it 'keep_at_least_one_record preserves the last record while still pruning its paired START' do + old_start = model.make(created_at: 10.days.ago, state: 'STARTED', app_guid: 'app1') + last_stop = model.make(created_at: 9.days.ago, state: 'STOPPED', app_guid: 'app1') + + run_cleanup(keep_at_least_one_record: true) + + expect { old_start.reload }.to raise_error(Sequel::NoExistingObject) # paired with STOP, prunable + expect(last_stop.reload).to be_present # kept by keep_at_least_one_record + end + + it 'prunes a paired set larger than the delete batch size while keeping running records' do + old = 5.days.ago + + # 1,500 completed START/STOP pairs (3,000 rows) -> well above the 1,000-row + # batch size, so the delete spans multiple batches across the two passes. + paired_rows = [] + 1_500.times do |i| + guid = "paired-#{i}" + common = { app_name: guid, space_guid: 'sp', space_name: 'sp', org_guid: 'o', + instance_count: 1, memory_in_mb_per_instance: 1, created_at: old } + paired_rows << common.merge(guid: "start-#{i}", state: 'STARTED', app_guid: guid) + paired_rows << common.merge(guid: "stop-#{i}", state: 'STOPPED', app_guid: guid) + end + model.dataset.multi_insert(paired_rows) + + # Still-running apps (START with no later STOP) that must survive cleanup. + running = Array.new(50) do |i| + model.make(created_at: old, state: 'STARTED', app_guid: "running-#{i}") + end + + run_cleanup + + # Every completed pair is gone; only the running records remain. + running.each { |event| expect(event.reload).to be_present } + expect(model.count).to eq(running.size) + end + end + + describe 'keeping running ServiceUsageEvent records' do + let(:model) { VCAP::CloudController::ServiceUsageEvent } + let(:beginning_state) { 'CREATED' } + let(:ending_state) { 'DELETED' } + let(:guid_column) { :service_instance_guid } + + include_examples 'usage event lifecycle cleanup' + + describe 'UPDATED records' do + it 'keeps the CREATED record and the latest UPDATED record while the service instance exists, pruning superseded UPDATED records' do + stale_created = model.make(created_at: 10.days.ago, state: 'CREATED', service_instance_guid: 'guid1') + superseded_update = model.make(created_at: 8.days.ago, state: 'UPDATED', service_instance_guid: 'guid1') + latest_update = model.make(created_at: 6.days.ago, state: 'UPDATED', service_instance_guid: 'guid1') + + run_cleanup + + expect(stale_created.reload).to be_present + expect { superseded_update.reload }.to raise_error(Sequel::NoExistingObject) + expect(latest_update.reload).to be_present + end + + it 'keeps UPDATED records when the corresponding delete record is fresh' do + stale_updated = model.make(created_at: 2.days.ago, state: 'UPDATED', service_instance_guid: 'guid1') + fresh_delete = model.make(created_at: 1.day.ago + 1.minute, state: 'DELETED', service_instance_guid: 'guid1') + + run_cleanup + + expect(stale_updated.reload).to be_present + expect(fresh_delete.reload).to be_present + end + + it 'deletes UPDATED records when there is a corresponding old delete record' do + stale_created = model.make(created_at: 10.days.ago, state: 'CREATED', service_instance_guid: 'guid1') + stale_updated = model.make(created_at: 8.days.ago, state: 'UPDATED', service_instance_guid: 'guid1') + stale_delete = model.make(created_at: 6.days.ago, state: 'DELETED', service_instance_guid: 'guid1') + + run_cleanup + + expect { stale_created.reload }.to raise_error(Sequel::NoExistingObject) + expect { stale_updated.reload }.to raise_error(Sequel::NoExistingObject) + expect { stale_delete.reload }.to raise_error(Sequel::NoExistingObject) + end + end + end end end diff --git a/spec/unit/lib/database/was_running_backfill_spec.rb b/spec/unit/lib/database/was_running_backfill_spec.rb new file mode 100644 index 00000000000..baa016b0295 --- /dev/null +++ b/spec/unit/lib/database/was_running_backfill_spec.rb @@ -0,0 +1,247 @@ +require 'spec_helper' +require 'database/was_running_backfill' + +RSpec.describe VCAP::WasRunningBackfill do + let(:db) { Sequel::Model.db } + let(:logger) { double(Steno::Logger, info: nil) } + + def was_running + db[:service_usage_events].where(state: 'WAS_RUNNING') + end + + def app_was_running + db[:app_usage_events].where(state: 'WAS_RUNNING') + end + + def task_was_running + db[:app_usage_events].where(state: 'TASK_WAS_RUNNING') + end + + describe 'WAS_RUNNING' do + # The backfill is raw SQL (no CC code), so it can't reference the repository + # constants directly. This guard catches any drift between the literal the + # backfill writes and the state value the repositories/cleanup recognise. + it 'matches the state value used by the usage event repositories' do + expect(described_class::WAS_RUNNING).to eq(VCAP::CloudController::Repositories::AppUsageEventRepository::WAS_RUNNING_EVENT_STATE) + expect(described_class::WAS_RUNNING).to eq(VCAP::CloudController::Repositories::ServiceUsageEventRepository::WAS_RUNNING_EVENT_STATE) + expect(described_class::TASK_WAS_RUNNING).to eq(VCAP::CloudController::Repositories::AppUsageEventRepository::TASK_WAS_RUNNING_EVENT_STATE) + end + end + + describe '.skip?' do + let(:skip_was_running_backfill) { nil } + + before do + allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_was_running_backfill).and_return(skip_was_running_backfill) + end + + context 'when skip_was_running_backfill is false' do + let(:skip_was_running_backfill) { false } + + it 'returns false' do + expect(described_class.skip?).to be(false) + end + end + + context 'when skip_was_running_backfill is true' do + let(:skip_was_running_backfill) { true } + + it 'returns true' do + expect(described_class.skip?).to be(true) + end + end + + context 'when skip_was_running_backfill is nil' do + let(:skip_was_running_backfill) { nil } + + it 'returns false' do + expect(described_class.skip?).to be(false) + end + end + + context 'when reading the config raises InvalidConfigPath' do + before do + allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_was_running_backfill).and_raise(VCAP::CloudController::Config::InvalidConfigPath) + end + + it 'returns false rather than aborting the migration' do + expect(described_class.skip?).to be(false) + end + end + end + + describe '.seed_app_usage_events' do + it 'seeds one WAS_RUNNING row per started process across batches, idempotently, skipping stopped processes' do + started1 = VCAP::CloudController::ProcessModelFactory.make(state: 'STARTED') + started2 = VCAP::CloudController::ProcessModelFactory.make(state: 'STARTED') + VCAP::CloudController::ProcessModelFactory.make(state: 'STOPPED') + + # batch_size: 1 forces the keyset loop to iterate once per process. + described_class.seed_app_usage_events(db, logger, batch_size: 1) + + expect(app_was_running.select_map(:app_guid)).to contain_exactly(started1.guid, started2.guid) + expect { described_class.seed_app_usage_events(db, logger, batch_size: 1) }.not_to change(app_was_running, :count) + end + + it 'seeds a separate row per process when an app has multiple started processes' do + app = VCAP::CloudController::AppModel.make + web = VCAP::CloudController::ProcessModelFactory.make(app: app, type: 'web', state: 'STARTED') + worker = VCAP::CloudController::ProcessModelFactory.make(app: app, type: 'worker', state: 'STARTED') + + described_class.seed_app_usage_events(db, logger, batch_size: 1) + + scope = app_was_running.where(parent_app_guid: app.guid) + expect(scope.select_map(:app_guid)).to contain_exactly(web.guid, worker.guid) + expect(scope.select_map(:process_type)).to contain_exactly('web', 'worker') + end + + it 'tolerates legacy NULLs in nullable process and app columns' do + process = VCAP::CloudController::ProcessModelFactory.make(state: 'STARTED') + # Bypass the model layer, which would backfill these defaults. + db[:processes].where(guid: process.guid).update(memory: nil, instances: nil) + + described_class.seed_app_usage_events(db, logger) + + row = app_was_running.first(app_guid: process.guid) + expect(row[:memory_in_mb_per_instance]).to eq(0) + expect(row[:instance_count]).to eq(0) + end + + it 'sweeps WAS_RUNNING rows whose process is not running, e.g. apps stopped concurrently with the backfill' do + running = VCAP::CloudController::ProcessModelFactory.make(state: 'STARTED') + stopped = VCAP::CloudController::ProcessModelFactory.make(state: 'STOPPED') + VCAP::CloudController::AppUsageEvent.make(state: 'WAS_RUNNING', app_guid: stopped.guid) + VCAP::CloudController::AppUsageEvent.make(state: 'WAS_RUNNING', app_guid: 'no-such-process') + + described_class.seed_app_usage_events(db, logger) + + expect(app_was_running.select_map(:app_guid)).to contain_exactly(running.guid) + end + end + + describe '.delete_app_usage_events' do + it 'batch-deletes only WAS_RUNNING rows' do + VCAP::CloudController::ProcessModelFactory.make(state: 'STARTED') + described_class.seed_app_usage_events(db, logger, batch_size: 1) + started = VCAP::CloudController::AppUsageEvent.make(state: 'STARTED') + + described_class.delete_app_usage_events(db, batch_size: 1) + + expect(app_was_running.count).to eq(0) + expect(db[:app_usage_events].where(guid: started.guid).count).to eq(1) + end + end + + describe '.seed_task_usage_events' do + it 'seeds one TASK_WAS_RUNNING row per running task across batches, idempotently, skipping completed tasks' do + running1 = VCAP::CloudController::TaskModel.make(state: 'RUNNING', memory_in_mb: 256) + running2 = VCAP::CloudController::TaskModel.make(state: 'RUNNING') + VCAP::CloudController::TaskModel.make(state: 'SUCCEEDED') + + # batch_size: 1 forces the keyset loop to iterate once per task. + described_class.seed_task_usage_events(db, logger, batch_size: 1) + + expect(task_was_running.select_map(:task_guid)).to contain_exactly(running1.guid, running2.guid) + + row = task_was_running.first(task_guid: running1.guid) + expect(row[:previous_state]).to be_nil + expect(row[:task_name]).to eq(running1.name) + expect(row[:app_guid]).to eq('') + expect(row[:app_name]).to eq('') + expect(row[:parent_app_guid]).to eq(running1.app.guid) + expect(row[:parent_app_name]).to eq(running1.app.name) + expect(row[:instance_count]).to eq(1) + expect(row[:memory_in_mb_per_instance]).to eq(256) + expect(row[:previous_memory_in_mb_per_instance]).to eq(256) + expect(row[:package_state]).to eq('STAGED') + expect(row[:previous_package_state]).to eq('STAGED') + expect(row[:space_guid]).to eq(running1.space.guid) + expect(row[:space_name]).to eq(running1.space.name) + expect(row[:org_guid]).to eq(running1.space.organization.guid) + + expect { described_class.seed_task_usage_events(db, logger, batch_size: 1) }.not_to change(task_was_running, :count) + end + + it 'tolerates a legacy NULL task memory' do + task = VCAP::CloudController::TaskModel.make(state: 'RUNNING') + # Bypass the model layer, which would backfill the default. + db[:tasks].where(guid: task.guid).update(memory_in_mb: nil) + + described_class.seed_task_usage_events(db, logger) + + row = task_was_running.first(task_guid: task.guid) + expect(row[:memory_in_mb_per_instance]).to eq(0) + end + + it 'sweeps TASK_WAS_RUNNING rows whose task is no longer running, e.g. tasks completed concurrently with the backfill' do + running = VCAP::CloudController::TaskModel.make(state: 'RUNNING') + completed = VCAP::CloudController::TaskModel.make(state: 'SUCCEEDED') + VCAP::CloudController::AppUsageEvent.make(state: 'TASK_WAS_RUNNING', task_guid: completed.guid) + VCAP::CloudController::AppUsageEvent.make(state: 'TASK_WAS_RUNNING', task_guid: 'no-such-task') + + described_class.seed_task_usage_events(db, logger) + + expect(task_was_running.select_map(:task_guid)).to contain_exactly(running.guid) + end + end + + describe '.delete_task_usage_events' do + it 'batch-deletes only TASK_WAS_RUNNING rows' do + task = VCAP::CloudController::TaskModel.make(state: 'RUNNING') + described_class.seed_task_usage_events(db, logger, batch_size: 1) + task_started = VCAP::CloudController::AppUsageEvent.make(state: 'TASK_STARTED', task_guid: task.guid) + app_baseline = VCAP::CloudController::AppUsageEvent.make(state: 'WAS_RUNNING', app_guid: 'some-process') + + described_class.delete_task_usage_events(db, batch_size: 1) + + expect(task_was_running.count).to eq(0) + expect(db[:app_usage_events].where(guid: task_started.guid).count).to eq(1) + expect(db[:app_usage_events].where(guid: app_baseline.guid).count).to eq(1) + end + end + + describe '.seed_service_usage_events' do + it 'seeds one WAS_RUNNING row per instance across multiple batches, with the right type, idempotently' do + managed = VCAP::CloudController::ManagedServiceInstance.make + upsi = VCAP::CloudController::UserProvidedServiceInstance.make + + # batch_size: 1 forces the keyset loop to iterate once per instance. + described_class.seed_service_usage_events(db, logger, batch_size: 1) + + expect(was_running.select_map(:service_instance_guid)).to contain_exactly(managed.guid, upsi.guid) + + managed_row = was_running.first(service_instance_guid: managed.guid) + expect(managed_row[:service_instance_type]).to eq('managed_service_instance') + expect(managed_row[:service_plan_guid]).to eq(managed.service_plan.guid) + expect(managed_row[:service_broker_name]).to eq(managed.service_plan.service.service_broker.name) + + upsi_row = was_running.first(service_instance_guid: upsi.guid) + expect(upsi_row[:service_instance_type]).to eq('user_provided_service_instance') + expect(upsi_row[:service_plan_guid]).to be_nil + + expect { described_class.seed_service_usage_events(db, logger, batch_size: 1) }.not_to change(was_running, :count) + end + + it 'sweeps WAS_RUNNING rows whose service instance no longer exists, e.g. instances deleted concurrently with the backfill' do + instance = VCAP::CloudController::ManagedServiceInstance.make + VCAP::CloudController::ServiceUsageEvent.make(state: 'WAS_RUNNING', service_instance_guid: 'no-such-instance') + + described_class.seed_service_usage_events(db, logger) + + expect(was_running.select_map(:service_instance_guid)).to contain_exactly(instance.guid) + end + end + + describe '.delete_service_usage_events' do + it 'batch-deletes only WAS_RUNNING rows' do + instance = VCAP::CloudController::ManagedServiceInstance.make + described_class.seed_service_usage_events(db, logger, batch_size: 1) + created = VCAP::CloudController::ServiceUsageEvent.make(state: 'CREATED', service_instance_guid: instance.guid) + + described_class.delete_service_usage_events(db, batch_size: 1) + + expect(was_running.count).to eq(0) + expect(db[:service_usage_events].where(guid: created.guid).count).to eq(1) + end + end +end diff --git a/spec/unit/models/runtime/task_model_spec.rb b/spec/unit/models/runtime/task_model_spec.rb index fe1e5790c45..b7c07975404 100644 --- a/spec/unit/models/runtime/task_model_spec.rb +++ b/spec/unit/models/runtime/task_model_spec.rb @@ -28,6 +28,30 @@ module VCAP::CloudController expect(event.task_guid).to eq(task.guid) expect(event.parent_app_guid).to eq(task.app.guid) end + + context 'when the TASK_STARTED event has been pruned and a TASK_WAS_RUNNING baseline exists' do + let!(:start_event) { AppUsageEvent.make(task_guid: task.guid, state: 'TASK_WAS_RUNNING') } + + it 'still creates a TASK_STOPPED event' do + task.update(state: TaskModel::SUCCEEDED_STATE) + + event = AppUsageEvent.find(task_guid: task.guid, state: 'TASK_STOPPED') + expect(event).not_to be_nil + expect(event.task_guid).to eq(task.guid) + expect(event.parent_app_guid).to eq(task.app.guid) + end + end + + context 'when there is neither a TASK_STARTED event nor a TASK_WAS_RUNNING baseline' do + let!(:start_event) { nil } + + it 'does not create a TASK_STOPPED event no consumer could match to a start' do + task.update(state: TaskModel::SUCCEEDED_STATE) + + event = AppUsageEvent.find(task_guid: task.guid, state: 'TASK_STOPPED') + expect(event).to be_nil + end + end end context 'when the task is moving to the FAILED_STATE' do @@ -43,6 +67,28 @@ module VCAP::CloudController expect(event.task_guid).to eq(task.guid) expect(event.parent_app_guid).to eq(task.app.guid) end + + context 'when the TASK_STARTED event has been pruned and a TASK_WAS_RUNNING baseline exists' do + let!(:start_event) { AppUsageEvent.make(task_guid: task.guid, state: 'TASK_WAS_RUNNING') } + + it 'still creates a TASK_STOPPED event' do + task.update(state: TaskModel::FAILED_STATE) + + event = AppUsageEvent.find(task_guid: task.guid, state: 'TASK_STOPPED') + expect(event).not_to be_nil + end + end + + context 'when there is neither a TASK_STARTED event nor a TASK_WAS_RUNNING baseline' do + let!(:start_event) { nil } + + it 'does not create a TASK_STOPPED event no consumer could match to a start' do + task.update(state: TaskModel::FAILED_STATE) + + event = AppUsageEvent.find(task_guid: task.guid, state: 'TASK_STOPPED') + expect(event).to be_nil + end + end end context 'when the task is moving from the PENDING state' do