From 4fc337c20a26b403f895f765f9293e3f1aaefd04 Mon Sep 17 00:00:00 2001 From: David Riddle Date: Wed, 10 Jun 2026 11:49:16 -0500 Subject: [PATCH 1/4] Keep usage event records of running apps, service instances, and tasks App and service usage event cleanup previously pruned every record older than the cutoff, including the opening STARTED/CREATED event of a resource that is still running -- which makes it impossible to reconstruct current usage once that event ages out. Database::OldRecordCleanup can now optionally keep "running" records. For each lifecycle a model declares via usage_lifecycles (beginning states, ending state, guid column), a beginning-state event (STARTED/CREATED/TASK_STARTED, and the WAS_RUNNING/TASK_WAS_RUNNING baselines) is retained unless: * a later ending-state event (STOPPED/DELETED/TASK_STOPPED) for the same resource also falls outside the retention window -- the run is over; or * it is a superseded baseline: an earlier beginning of the same run and a later beginning both exist outside the window. Consumers only need the first beginning of the current run (the true start time) and the latest one (the current footprint), so the in-between events written by scaling an app or updating a service instance are pruned and cutoff_age_in_days keeps bounding the table size for long-running, frequently-changed resources. The app and service usage event repositories enable the behavior with keep_running_records: true; requesting it for a model without usage_lifecycles raises instead of silently deleting the records of running resources. Task events get their own lifecycle (TASK_STARTED/TASK_WAS_RUNNING -> TASK_STOPPED, keyed by task_guid), so the start events of long-running tasks survive cleanup as well. The task baseline state is distinct from WAS_RUNNING because task events share the app_usage_events table but carry an empty app_guid: reusing WAS_RUNNING would let the app lifecycle correlate every task baseline through app_guid = '' and wrongly prune them as superseded baselines of one phantom app (and the app backfill's stale-row sweep would delete them outright). Deletion runs in ordered passes -- prunable beginning rows first, while the rows that make them prunable still exist, then everything else -- so a beginning row cannot be stranded when its pair is removed in an earlier batch. The cleanup log line now reports the row counts BatchDelete returns instead of issuing extra COUNT queries, and BatchDelete fetches each batch's ids in the same query that checks for emptiness, halving evaluations of the (potentially expensive) filtered dataset. Also renames the positional days_ago to a cutoff_age_in_days keyword. --- app/jobs/runtime/events_cleanup.rb | 2 +- app/models/runtime/app_usage_event.rb | 17 + app/models/services/service_usage_event.rb | 12 + .../app_usage_event_repository.rb | 12 +- .../service_usage_event_repository.rb | 3 +- lib/database/batch_delete.rb | 12 +- lib/database/old_record_cleanup.rb | 121 +++++- .../runtime/app_usage_events_cleanup_spec.rb | 2 +- .../service_usage_events_cleanup_spec.rb | 2 +- .../lib/database/old_record_cleanup_spec.rb | 387 +++++++++++++++++- 10 files changed, 537 insertions(+), 33 deletions(-) diff --git a/app/jobs/runtime/events_cleanup.rb b/app/jobs/runtime/events_cleanup.rb index a1841576395..873c6a7f319 100644 --- a/app/jobs/runtime/events_cleanup.rb +++ b/app/jobs/runtime/events_cleanup.rb @@ -9,7 +9,7 @@ def initialize(cutoff_age_in_days) end def perform - Database::OldRecordCleanup.new(Event, cutoff_age_in_days).delete + Database::OldRecordCleanup.new(Event, cutoff_age_in_days:).delete end def job_name_in_configuration diff --git a/app/models/runtime/app_usage_event.rb b/app/models/runtime/app_usage_event.rb index 32913f8cdda..e8c28ba7e74 100644 --- a/app/models/runtime/app_usage_event.rb +++ b/app/models/runtime/app_usage_event.rb @@ -9,6 +9,23 @@ class AppUsageEvent < Sequel::Model :buildpack_guid, :buildpack_name, :package_state, :previous_package_state, :parent_app_guid, :parent_app_name, :process_type, :task_name, :task_guid + + def self.usage_lifecycles + [ + { + beginning_states: [ProcessModel::STARTED, Repositories::AppUsageEventRepository::WAS_RUNNING_EVENT_STATE], + ending_state: ProcessModel::STOPPED, + guid_column: :app_guid + }, + { + beginning_states: [Repositories::AppUsageEventRepository::TASK_STARTED_EVENT_STATE, + Repositories::AppUsageEventRepository::TASK_WAS_RUNNING_EVENT_STATE], + ending_state: Repositories::AppUsageEventRepository::TASK_STOPPED_EVENT_STATE, + guid_column: :task_guid + } + ].freeze + end + AppUsageEvent.dataset_module do def supports_window_functions? false diff --git a/app/models/services/service_usage_event.rb b/app/models/services/service_usage_event.rb index f5e694169f9..c09557870d1 100644 --- a/app/models/services/service_usage_event.rb +++ b/app/models/services/service_usage_event.rb @@ -7,5 +7,17 @@ class ServiceUsageEvent < Sequel::Model :service_plan_guid, :service_plan_name, :service_guid, :service_label, :service_broker_name, :service_broker_guid + + def self.usage_lifecycles + [ + { + beginning_states: [Repositories::ServiceUsageEventRepository::CREATED_EVENT_STATE, + Repositories::ServiceUsageEventRepository::UPDATED_EVENT_STATE, + Repositories::ServiceUsageEventRepository::WAS_RUNNING_EVENT_STATE], + ending_state: Repositories::ServiceUsageEventRepository::DELETED_EVENT_STATE, + guid_column: :service_instance_guid + } + ].freeze + end end end diff --git a/app/repositories/app_usage_event_repository.rb b/app/repositories/app_usage_event_repository.rb index 51d751a8cd1..bce322d530a 100644 --- a/app/repositories/app_usage_event_repository.rb +++ b/app/repositories/app_usage_event_repository.rb @@ -4,6 +4,16 @@ module VCAP::CloudController module Repositories class AppUsageEventRepository + WAS_RUNNING_EVENT_STATE = 'WAS_RUNNING'.freeze + TASK_STARTED_EVENT_STATE = 'TASK_STARTED'.freeze + TASK_STOPPED_EVENT_STATE = 'TASK_STOPPED'.freeze + # Task baselines get their own state (rather than reusing WAS_RUNNING) + # because task events share the app_usage_events table with app events but + # carry an empty app_guid: a WAS_RUNNING row keyed by app_guid '' would be + # correlated with every other task's baseline by the app lifecycle's + # cleanup and swept by the app backfill's stale-row sweep. + TASK_WAS_RUNNING_EVENT_STATE = 'TASK_WAS_RUNNING'.freeze + def find(guid) AppUsageEvent.find(guid:) end @@ -152,7 +162,7 @@ def purge_and_reseed_started_apps! end def delete_events_older_than(cutoff_age_in_days) - Database::OldRecordCleanup.new(AppUsageEvent, cutoff_age_in_days, keep_at_least_one_record: true).delete + Database::OldRecordCleanup.new(AppUsageEvent, cutoff_age_in_days: cutoff_age_in_days, keep_at_least_one_record: true, keep_running_records: true).delete end private diff --git a/app/repositories/service_usage_event_repository.rb b/app/repositories/service_usage_event_repository.rb index 35fbef5a1f0..baad88a7772 100644 --- a/app/repositories/service_usage_event_repository.rb +++ b/app/repositories/service_usage_event_repository.rb @@ -7,6 +7,7 @@ class ServiceUsageEventRepository DELETED_EVENT_STATE = 'DELETED'.freeze CREATED_EVENT_STATE = 'CREATED'.freeze UPDATED_EVENT_STATE = 'UPDATED'.freeze + WAS_RUNNING_EVENT_STATE = 'WAS_RUNNING'.freeze def find(guid) ServiceUsageEvent.find(guid:) @@ -92,7 +93,7 @@ def purge_and_reseed_service_instances! end def delete_events_older_than(cutoff_age_in_days) - Database::OldRecordCleanup.new(ServiceUsageEvent, cutoff_age_in_days, keep_at_least_one_record: true).delete + Database::OldRecordCleanup.new(ServiceUsageEvent, cutoff_age_in_days: cutoff_age_in_days, keep_at_least_one_record: true, keep_running_records: true).delete end end end diff --git a/lib/database/batch_delete.rb b/lib/database/batch_delete.rb index 9fcc4b58fea..377eff097c9 100644 --- a/lib/database/batch_delete.rb +++ b/lib/database/batch_delete.rb @@ -11,10 +11,12 @@ def delete total_count = 0 loop do - set = dataset.limit(amount) - break if set.empty? + # Fetch the batch's ids in the same query that checks for emptiness, so the + # (potentially expensive) filtered dataset is evaluated once per batch. + ids = dataset.limit(amount).select_map(:id) + break if ids.empty? - total_count += delete_batch(set) + total_count += delete_batch(ids) end total_count @@ -22,8 +24,8 @@ def delete private - def delete_batch(set) - dataset.model.where(id: set.select_map(:id)).delete + def delete_batch(ids) + dataset.model.where(id: ids).delete end end end diff --git a/lib/database/old_record_cleanup.rb b/lib/database/old_record_cleanup.rb index 44227d84507..a8b1aa236da 100644 --- a/lib/database/old_record_cleanup.rb +++ b/lib/database/old_record_cleanup.rb @@ -3,25 +3,33 @@ module Database class OldRecordCleanup class NoCurrentTimestampError < StandardError; end - attr_reader :model, :days_ago, :keep_at_least_one_record + attr_reader :model, :cutoff_age_in_days, :keep_at_least_one_record, :keep_running_records - def initialize(model, days_ago, keep_at_least_one_record: false) + def initialize(model, cutoff_age_in_days:, keep_at_least_one_record: false, keep_running_records: false) @model = model - @days_ago = days_ago + @cutoff_age_in_days = cutoff_age_in_days @keep_at_least_one_record = keep_at_least_one_record + @keep_running_records = keep_running_records end + # keep_running_records and keep_at_least_one_record compose: a still-running + # resource (a beginning-state event with no later ending-state event for the + # same resource) is always retained, and keep_at_least_one_record additionally + # protects the single newest row so the table is never fully emptied for + # clients that poll the most recent event. def delete - cutoff_date = current_timestamp_from_database - days_ago.to_i.days - + cutoff_date = current_timestamp_from_database - cutoff_age_in_days.to_i.days old_records = model.dataset.where(Sequel.lit('created_at < ?', cutoff_date)) - if keep_at_least_one_record - last_record = model.order(:id).last - old_records = old_records.where(Sequel.lit('id < ?', last_record.id)) if last_record - end - logger.info("Cleaning up #{old_records.count} #{model.table_name} table rows") - Database::BatchDelete.new(old_records, 1000).delete + if keep_running_records + raise ArgumentError.new("keep_running_records requires #{model} to define .usage_lifecycles") unless model.respond_to?(:usage_lifecycles) + + delete_keeping_running_records(old_records) + else + old_records = exclude_newest_record(old_records) + logger.info("Cleaning up #{old_records.count} #{model.table_name} table rows") + Database::BatchDelete.new(old_records, 1000).delete + end end private @@ -35,5 +43,96 @@ def current_timestamp_from_database def logger @logger ||= Steno.logger('cc.old_record_cleanup') end + + # Deletes old records while retaining a usable billing baseline for + # still-running resources. + # + # For each lifecycle of the model, a beginning-state row (e.g. + # STARTED/CREATED/WAS_RUNNING) is prunable when: + # * a later ending-state row (e.g. STOPPED/DELETED) is also old -- the run is + # over; or + # * it is a superseded baseline: an earlier beginning of the same run and a + # later beginning both exist (and are old). Consumers only need the first + # beginning of the current run (the true start time) and the latest one (the + # current footprint); the in-between rows written by scaling/updating a + # running resource carry no baseline information. + # + # The deletes are ordered deliberately: prunable beginning rows are removed + # FIRST, while the rows that make them prunable still exist, so each beginning + # stays prunable until it is itself deleted. Only then are the ending rows (and + # any other, non-lifecycle states) removed. Reversing the order could strand a + # beginning row whose paired ending was deleted in an earlier batch. + def delete_keeping_running_records(old_records) + lifecycles = model.usage_lifecycles + prunable_beginnings = lifecycles.map { |lifecycle| prunable_beginnings_dataset(old_records, lifecycle) } + + # Everything that is not a beginning-state row of some lifecycle (ending rows + # plus any other, non-lifecycle states) is unconditionally prunable. + all_beginning_states = lifecycles.flat_map { |lifecycle| lifecycle.fetch(:beginning_states) } + unconditional_records = exclude_newest_record(old_records.exclude(state: all_beginning_states)) + + deleted_count = prunable_beginnings.sum { |dataset| Database::BatchDelete.new(dataset, 1000).delete } + deleted_count += Database::BatchDelete.new(unconditional_records, 1000).delete + + logger.info("Cleaned up #{deleted_count} #{model.table_name} table rows") + end + + # Builds the dataset of old beginning-state rows that are prunable for one + # lifecycle. All correlations use the (state, guid, id) lifecycle index; higher + # id implies later creation throughout. The probes only consider OLD rows: a + # superseded beginning is kept until the row superseding it is itself old, + # which keeps the pruning decision stable for consumers reading within the + # retention window. + def prunable_beginnings_dataset(old_records, lifecycle) + beginning_states = lifecycle.fetch(:beginning_states) + ending_state = lifecycle.fetch(:ending_state) + guid_column = lifecycle.fetch(:guid_column) + + old_beginnings = old_records.where(state: beginning_states) + old_endings = old_records.where(state: ending_state) + initial_records = old_beginnings.from_self(alias: :initial_records) + + # The run is over: an ending row for the same resource was created later. + matching_ending = old_endings.from_self(alias: :final_records). + where(Sequel[:final_records][guid_column] => Sequel[:initial_records][guid_column]). + where { Sequel[:final_records][:id] > Sequel[:initial_records][:id] }. + select(1).exists + + # Not the run's true start: an earlier beginning of the same run exists, + # i.e. one with no ending event between the two. + intervening_ending = old_endings.from_self(alias: :intervening_endings). + where(Sequel[:intervening_endings][guid_column] => Sequel[:earlier_beginnings][guid_column]). + where { Sequel[:intervening_endings][:id] > Sequel[:earlier_beginnings][:id] }. + where { Sequel[:intervening_endings][:id] < Sequel[:initial_records][:id] }. + select(1).exists + earlier_beginning_in_same_run = old_beginnings.from_self(alias: :earlier_beginnings). + where(Sequel[:earlier_beginnings][guid_column] => Sequel[:initial_records][guid_column]). + where { Sequel[:earlier_beginnings][:id] < Sequel[:initial_records][:id] }. + where(Sequel.~(intervening_ending)). + select(1).exists + + # Not the latest baseline: a later beginning for the same resource exists. + later_beginning = old_beginnings.from_self(alias: :later_beginnings). + where(Sequel[:later_beginnings][guid_column] => Sequel[:initial_records][guid_column]). + where { Sequel[:later_beginnings][:id] > Sequel[:initial_records][:id] }. + select(1).exists + + superseded_baseline = Sequel.&(earlier_beginning_in_same_run, later_beginning) + exclude_newest_record(initial_records.where(Sequel.|(matching_ending, superseded_baseline))) + end + + # When keep_at_least_one_record is set, never delete the single newest row so + # the table always retains at least one record. + def exclude_newest_record(records) + return records unless keep_at_least_one_record && newest_record_id + + records.where(Sequel.lit('id < ?', newest_record_id)) + end + + def newest_record_id + return @newest_record_id if defined?(@newest_record_id) + + @newest_record_id = model.order(:id).last&.id + end end end diff --git a/spec/unit/jobs/runtime/app_usage_events_cleanup_spec.rb b/spec/unit/jobs/runtime/app_usage_events_cleanup_spec.rb index 8018ee0ee69..ce4f1df9956 100644 --- a/spec/unit/jobs/runtime/app_usage_events_cleanup_spec.rb +++ b/spec/unit/jobs/runtime/app_usage_events_cleanup_spec.rb @@ -5,7 +5,7 @@ module Jobs::Runtime RSpec.describe AppUsageEventsCleanup, job_context: :worker do let(:cutoff_age_in_days) { 30 } let(:logger) { double(Steno::Logger, info: nil) } - let!(:event_before_threshold) { AppUsageEvent.make(created_at: (cutoff_age_in_days + 1).days.ago) } + let!(:event_before_threshold) { AppUsageEvent.make(created_at: (cutoff_age_in_days + 1).days.ago, state: 'STOPPED') } let!(:event_after_threshold) { AppUsageEvent.make(created_at: (cutoff_age_in_days - 1).days.ago) } subject(:job) do diff --git a/spec/unit/jobs/services/service_usage_events_cleanup_spec.rb b/spec/unit/jobs/services/service_usage_events_cleanup_spec.rb index 181ad89e18d..da11beb39b5 100644 --- a/spec/unit/jobs/services/service_usage_events_cleanup_spec.rb +++ b/spec/unit/jobs/services/service_usage_events_cleanup_spec.rb @@ -5,7 +5,7 @@ module Jobs::Services RSpec.describe ServiceUsageEventsCleanup, job_context: :worker do let(:cutoff_age_in_days) { 30 } let(:logger) { double(Steno::Logger, info: nil) } - let!(:event_before_threshold) { ServiceUsageEvent.make(created_at: (cutoff_age_in_days + 1).days.ago) } + let!(:event_before_threshold) { ServiceUsageEvent.make(created_at: (cutoff_age_in_days + 1).days.ago, state: 'DELETED') } let!(:event_after_threshold) { ServiceUsageEvent.make(created_at: (cutoff_age_in_days - 1).days.ago) } subject(:job) do diff --git a/spec/unit/lib/database/old_record_cleanup_spec.rb b/spec/unit/lib/database/old_record_cleanup_spec.rb index 7b2258bbaec..167ec52e792 100644 --- a/spec/unit/lib/database/old_record_cleanup_spec.rb +++ b/spec/unit/lib/database/old_record_cleanup_spec.rb @@ -1,6 +1,148 @@ require 'spec_helper' require 'database/old_record_cleanup' +# Lifecycle-aware cleanup behavior that is identical for app and service usage +# events. Expects the including context to define :model, :beginning_state, +# :ending_state and :guid_column. +RSpec.shared_examples 'usage event lifecycle cleanup' do + def make_event(state, guid, created_at:) + model.make(state: state, created_at: created_at, guid_column => guid) + end + + def run_cleanup(**opts) + Database::OldRecordCleanup.new(model, cutoff_age_in_days: 1, keep_running_records: true, **opts).delete + end + + it 'keeps an old beginning record when there is no corresponding ending record' do + old_beginning = make_event(beginning_state, 'guid1', created_at: 2.days.ago) + + run_cleanup + + expect(old_beginning.reload).to be_present + expect(model.count).to eq(1) + end + + it 'keeps an old beginning record when the ending record is fresh' do + old_beginning = make_event(beginning_state, 'guid1', created_at: 2.days.ago) + fresh_ending = make_event(ending_state, 'guid1', created_at: 1.day.ago + 1.minute) + + run_cleanup + + expect(old_beginning.reload).to be_present + expect(fresh_ending.reload).to be_present + end + + it 'keeps an old beginning record when the ending record was inserted first' do + old_ending = make_event(ending_state, 'guid1', created_at: 3.days.ago) + old_beginning = make_event(beginning_state, 'guid1', created_at: 2.days.ago) + + run_cleanup + + expect(old_beginning.reload).to be_present + expect { old_ending.reload }.to raise_error(Sequel::NoExistingObject) + end + + it 'uses insertion order rather than created_at to pair beginnings with endings' do + old_beginning = make_event(beginning_state, 'guid1', created_at: 2.days.ago) + # Earlier timestamp but higher id: the resource is no longer running. + old_ending = make_event(ending_state, 'guid1', created_at: 3.days.ago) + + run_cleanup + + expect { old_beginning.reload }.to raise_error(Sequel::NoExistingObject) + expect { old_ending.reload }.to raise_error(Sequel::NoExistingObject) + end + + it 'deletes all records of completed runs spanning multiple cycles' do + cycle1_beginning = make_event(beginning_state, 'guid1', created_at: 10.days.ago) + cycle1_ending = make_event(ending_state, 'guid1', created_at: 9.days.ago) + cycle2_beginning = make_event(beginning_state, 'guid1', created_at: 8.days.ago) + cycle2_ending = make_event(ending_state, 'guid1', created_at: 7.days.ago) + + run_cleanup + + expect { cycle1_beginning.reload }.to raise_error(Sequel::NoExistingObject) + expect { cycle1_ending.reload }.to raise_error(Sequel::NoExistingObject) + expect { cycle2_beginning.reload }.to raise_error(Sequel::NoExistingObject) + expect { cycle2_ending.reload }.to raise_error(Sequel::NoExistingObject) + end + + it 'deletes an old ending record that has no beginning record' do + orphan_ending = make_event(ending_state, 'guid1', created_at: 10.days.ago) + + run_cleanup + + expect { orphan_ending.reload }.to raise_error(Sequel::NoExistingObject) + end + + it 'keeps an old WAS_RUNNING record when there is no corresponding ending record' do + was_running = make_event('WAS_RUNNING', 'guid1', created_at: 2.days.ago) + + run_cleanup + + expect(was_running.reload).to be_present + expect(model.count).to eq(1) + end + + it 'deletes an old WAS_RUNNING record when a later old ending record exists' do + was_running = make_event('WAS_RUNNING', 'guid1', created_at: 5.days.ago) + old_ending = make_event(ending_state, 'guid1', created_at: 4.days.ago) + + run_cleanup + + expect { was_running.reload }.to raise_error(Sequel::NoExistingObject) + expect { old_ending.reload }.to raise_error(Sequel::NoExistingObject) + end + + it 'keeps both the first beginning and a later WAS_RUNNING record of a running resource' do + old_beginning = make_event(beginning_state, 'guid1', created_at: 10.days.ago) + was_running = make_event('WAS_RUNNING', 'guid1', created_at: 5.days.ago) + + run_cleanup + + expect(old_beginning.reload).to be_present + expect(was_running.reload).to be_present + end + + it 'prunes superseded baselines of a running resource, keeping the first beginning (true start) and the latest one (current footprint)' do + first = make_event(beginning_state, 'guid1', created_at: 5.days.ago) + middle = make_event(beginning_state, 'guid1', created_at: 4.days.ago) + latest = make_event(beginning_state, 'guid1', created_at: 3.days.ago) + + run_cleanup + + expect(first.reload).to be_present + expect { middle.reload }.to raise_error(Sequel::NoExistingObject) + expect(latest.reload).to be_present + end + + it 'does not prune a superseded baseline until the beginning that supersedes it is itself old' do + first = make_event(beginning_state, 'guid1', created_at: 5.days.ago) + middle = make_event(beginning_state, 'guid1', created_at: 4.days.ago) + fresh_latest = make_event(beginning_state, 'guid1', created_at: 1.day.ago + 1.minute) + + run_cleanup + + expect(first.reload).to be_present + expect(middle.reload).to be_present + expect(fresh_latest.reload).to be_present + end + + it 'treats the first beginning after an ended run as the true start, not as a superseded baseline' do + ended_run_beginning = make_event(beginning_state, 'guid1', created_at: 10.days.ago) + ended_run_ending = make_event(ending_state, 'guid1', created_at: 9.days.ago) + current_run_first = make_event(beginning_state, 'guid1', created_at: 8.days.ago) + current_run_latest = make_event(beginning_state, 'guid1', created_at: 7.days.ago) + + run_cleanup + + expect { ended_run_beginning.reload }.to raise_error(Sequel::NoExistingObject) + expect { ended_run_ending.reload }.to raise_error(Sequel::NoExistingObject) + expect(current_run_first.reload).to be_present + expect(current_run_latest.reload).to be_present + end +end + RSpec.describe Database::OldRecordCleanup do describe '#delete' do let!(:stale_event1) { VCAP::CloudController::Event.make(created_at: 1.day.ago - 1.minute) } @@ -9,7 +151,7 @@ let!(:fresh_event) { VCAP::CloudController::Event.make(created_at: 1.day.ago + 1.minute) } it 'deletes records older than specified days' do - record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, 1) + record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, cutoff_age_in_days: 1) expect do record_cleanup.delete @@ -20,23 +162,14 @@ expect { stale_event2.reload }.to raise_error(Sequel::NoExistingObject) end - context "when there are no records at all but you're trying to keep at least one" do - it "doesn't keep one because there aren't any to keep" do - record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::ServiceUsageEvent, 1, keep_at_least_one_record: true) - - expect { record_cleanup.delete }.not_to raise_error - expect(VCAP::CloudController::ServiceUsageEvent.count).to eq(0) - end - end - it 'only retrieves the current timestamp from the database once' do expect(VCAP::CloudController::Event.db).to receive(:fetch).with('SELECT CURRENT_TIMESTAMP as now').once.and_call_original - record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, 1) + record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, cutoff_age_in_days: 1) record_cleanup.delete end it 'keeps the last row when :keep_at_least_one_record is true even if it is older than the cutoff date' do - record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, 0, keep_at_least_one_record: true) + record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, cutoff_age_in_days: 0, keep_at_least_one_record: true) expect do record_cleanup.delete @@ -46,5 +179,235 @@ expect { stale_event1.reload }.to raise_error(Sequel::NoExistingObject) expect { stale_event2.reload }.to raise_error(Sequel::NoExistingObject) end + + context "when there are no records at all but you're trying to keep at least one" do + it "doesn't keep one because there aren't any to keep" do + record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::ServiceUsageEvent, cutoff_age_in_days: 1, keep_at_least_one_record: true) + + expect { record_cleanup.delete }.not_to raise_error + expect(VCAP::CloudController::ServiceUsageEvent.count).to eq(0) + end + end + + context 'when keep_running_records is requested for a model without usage lifecycles' do + it 'raises rather than silently deleting the records of running resources' do + record_cleanup = Database::OldRecordCleanup.new(VCAP::CloudController::Event, cutoff_age_in_days: 1, keep_running_records: true) + + expect { record_cleanup.delete }.to raise_error(ArgumentError, /usage_lifecycles/) + end + end + + describe 'keeping running AppUsageEvent records' do + let(:model) { VCAP::CloudController::AppUsageEvent } + let(:beginning_state) { 'STARTED' } + let(:ending_state) { 'STOPPED' } + let(:guid_column) { :app_guid } + + include_examples 'usage event lifecycle cleanup' + + describe 'task lifecycle' do + it 'keeps the TASK_STARTED record of a still-running task' do + task_started = model.make(created_at: 2.days.ago, state: 'TASK_STARTED', task_guid: 'task1', app_guid: 'app1') + + run_cleanup + + expect(task_started.reload).to be_present + end + + it 'deletes the TASK_STARTED and TASK_STOPPED records of a completed task' do + task_started = model.make(created_at: 5.days.ago, state: 'TASK_STARTED', task_guid: 'task1', app_guid: 'app1') + task_stopped = model.make(created_at: 4.days.ago, state: 'TASK_STOPPED', task_guid: 'task1', app_guid: 'app1') + + run_cleanup + + expect { task_started.reload }.to raise_error(Sequel::NoExistingObject) + expect { task_stopped.reload }.to raise_error(Sequel::NoExistingObject) + end + + it 'keeps the TASK_STARTED record when the TASK_STOPPED record is fresh' do + task_started = model.make(created_at: 2.days.ago, state: 'TASK_STARTED', task_guid: 'task1', app_guid: 'app1') + fresh_task_stopped = model.make(created_at: 1.day.ago + 1.minute, state: 'TASK_STOPPED', task_guid: 'task1', app_guid: 'app1') + + run_cleanup + + expect(task_started.reload).to be_present + expect(fresh_task_stopped.reload).to be_present + end + + it 'correlates task records by task_guid, not by app_guid' do + running_task_started = model.make(created_at: 5.days.ago, state: 'TASK_STARTED', task_guid: 'task1', app_guid: 'app1') + other_task_stopped = model.make(created_at: 4.days.ago, state: 'TASK_STOPPED', task_guid: 'task2', app_guid: 'app1') + + run_cleanup + + expect(running_task_started.reload).to be_present + expect { other_task_stopped.reload }.to raise_error(Sequel::NoExistingObject) + end + + it 'keeps the TASK_WAS_RUNNING baseline of a still-running task' do + baseline = model.make(created_at: 2.days.ago, state: 'TASK_WAS_RUNNING', task_guid: 'task1', app_guid: '') + + run_cleanup + + expect(baseline.reload).to be_present + end + + it 'deletes the TASK_WAS_RUNNING baseline once a later TASK_STOPPED record is also old' do + baseline = model.make(created_at: 5.days.ago, state: 'TASK_WAS_RUNNING', task_guid: 'task1', app_guid: '') + task_stopped = model.make(created_at: 4.days.ago, state: 'TASK_STOPPED', task_guid: 'task1', app_guid: '') + + run_cleanup + + expect { baseline.reload }.to raise_error(Sequel::NoExistingObject) + expect { task_stopped.reload }.to raise_error(Sequel::NoExistingObject) + end + + # Task events carry an empty app_guid, so if task baselines shared the + # WAS_RUNNING state they would all correlate with each other through the + # app lifecycle (keyed by app_guid) and be wrongly pruned as superseded + # baselines of one phantom app. The distinct TASK_WAS_RUNNING state keeps + # the app lifecycle blind to them. + it 'does not prune the baselines of distinct running tasks as superseded baselines of one phantom app' do + baselines = %w[task1 task2 task3].each_with_index.map do |task_guid, i| + model.make(created_at: (5 - i).days.ago, state: 'TASK_WAS_RUNNING', task_guid: task_guid, app_guid: '') + end + + run_cleanup + + baselines.each { |baseline| expect(baseline.reload).to be_present } + end + end + + it 'deletes records with non-lifecycle states' do + buildpack_event1 = model.make(created_at: 3.days.ago, state: 'BUILDPACK_SET', app_guid: 'app1') + buildpack_event2 = model.make(created_at: 2.days.ago, state: 'BUILDPACK_SET', app_guid: 'app2') + + run_cleanup + + expect { buildpack_event1.reload }.to raise_error(Sequel::NoExistingObject) + expect { buildpack_event2.reload }.to raise_error(Sequel::NoExistingObject) + end + + it 'deletes old records with a corresponding stop record even if app_guid is an empty string' do + empty_guid_start = model.make(created_at: 5.days.ago, state: 'STARTED', app_guid: '') + different_empty_start = model.make(created_at: 4.days.ago, state: 'STARTED', app_guid: '') + empty_guid_stop = model.make(created_at: 3.days.ago, state: 'STOPPED', app_guid: '') + + run_cleanup + + # Both STARTs with an empty-string guid have a STOP with an empty-string guid after them. + expect { empty_guid_start.reload }.to raise_error(Sequel::NoExistingObject) + expect { different_empty_start.reload }.to raise_error(Sequel::NoExistingObject) + expect { empty_guid_stop.reload }.to raise_error(Sequel::NoExistingObject) + end + + it 'works when cutoff_age_in_days is 0' do + old_start = model.make(created_at: 1.second.ago, state: 'STARTED', app_guid: 'running-app') + + Database::OldRecordCleanup.new(model, cutoff_age_in_days: 0, keep_running_records: true).delete + + expect(old_start.reload).to be_present + end + + it 'does not error if the table is empty' do + model.dataset.delete + + expect { run_cleanup }.not_to raise_error + end + + it 'deletes all old records when keep_running_records is false' do + old_start = model.make(created_at: 5.days.ago, state: 'STARTED', app_guid: 'app1') + old_stop = model.make(created_at: 4.days.ago, state: 'STOPPED', app_guid: 'app1') + old_running_start = model.make(created_at: 3.days.ago, state: 'STARTED', app_guid: 'running-app') + + Database::OldRecordCleanup.new(model, cutoff_age_in_days: 1, keep_running_records: false).delete + + expect { old_start.reload }.to raise_error(Sequel::NoExistingObject) + expect { old_stop.reload }.to raise_error(Sequel::NoExistingObject) + expect { old_running_start.reload }.to raise_error(Sequel::NoExistingObject) + end + + it 'keep_at_least_one_record preserves the last record while still pruning its paired START' do + old_start = model.make(created_at: 10.days.ago, state: 'STARTED', app_guid: 'app1') + last_stop = model.make(created_at: 9.days.ago, state: 'STOPPED', app_guid: 'app1') + + run_cleanup(keep_at_least_one_record: true) + + expect { old_start.reload }.to raise_error(Sequel::NoExistingObject) # paired with STOP, prunable + expect(last_stop.reload).to be_present # kept by keep_at_least_one_record + end + + it 'prunes a paired set larger than the delete batch size while keeping running records' do + old = 5.days.ago + + # 1,500 completed START/STOP pairs (3,000 rows) -> well above the 1,000-row + # batch size, so the delete spans multiple batches across the two passes. + paired_rows = [] + 1_500.times do |i| + guid = "paired-#{i}" + common = { app_name: guid, space_guid: 'sp', space_name: 'sp', org_guid: 'o', + instance_count: 1, memory_in_mb_per_instance: 1, created_at: old } + paired_rows << common.merge(guid: "start-#{i}", state: 'STARTED', app_guid: guid) + paired_rows << common.merge(guid: "stop-#{i}", state: 'STOPPED', app_guid: guid) + end + model.dataset.multi_insert(paired_rows) + + # Still-running apps (START with no later STOP) that must survive cleanup. + running = Array.new(50) do |i| + model.make(created_at: old, state: 'STARTED', app_guid: "running-#{i}") + end + + run_cleanup + + # Every completed pair is gone; only the running records remain. + running.each { |event| expect(event.reload).to be_present } + expect(model.count).to eq(running.size) + end + end + + describe 'keeping running ServiceUsageEvent records' do + let(:model) { VCAP::CloudController::ServiceUsageEvent } + let(:beginning_state) { 'CREATED' } + let(:ending_state) { 'DELETED' } + let(:guid_column) { :service_instance_guid } + + include_examples 'usage event lifecycle cleanup' + + describe 'UPDATED records' do + it 'keeps the CREATED record and the latest UPDATED record while the service instance exists, pruning superseded UPDATED records' do + stale_created = model.make(created_at: 10.days.ago, state: 'CREATED', service_instance_guid: 'guid1') + superseded_update = model.make(created_at: 8.days.ago, state: 'UPDATED', service_instance_guid: 'guid1') + latest_update = model.make(created_at: 6.days.ago, state: 'UPDATED', service_instance_guid: 'guid1') + + run_cleanup + + expect(stale_created.reload).to be_present + expect { superseded_update.reload }.to raise_error(Sequel::NoExistingObject) + expect(latest_update.reload).to be_present + end + + it 'keeps UPDATED records when the corresponding delete record is fresh' do + stale_updated = model.make(created_at: 2.days.ago, state: 'UPDATED', service_instance_guid: 'guid1') + fresh_delete = model.make(created_at: 1.day.ago + 1.minute, state: 'DELETED', service_instance_guid: 'guid1') + + run_cleanup + + expect(stale_updated.reload).to be_present + expect(fresh_delete.reload).to be_present + end + + it 'deletes UPDATED records when there is a corresponding old delete record' do + stale_created = model.make(created_at: 10.days.ago, state: 'CREATED', service_instance_guid: 'guid1') + stale_updated = model.make(created_at: 8.days.ago, state: 'UPDATED', service_instance_guid: 'guid1') + stale_delete = model.make(created_at: 6.days.ago, state: 'DELETED', service_instance_guid: 'guid1') + + run_cleanup + + expect { stale_created.reload }.to raise_error(Sequel::NoExistingObject) + expect { stale_updated.reload }.to raise_error(Sequel::NoExistingObject) + expect { stale_delete.reload }.to raise_error(Sequel::NoExistingObject) + end + end + end end end From 8508a76c62264066ecd08cb8709ebb9bb8bff108 Mon Sep 17 00:00:00 2001 From: David Riddle Date: Wed, 10 Jun 2026 11:49:56 -0500 Subject: [PATCH 2/4] Add lifecycle index to usage event tables Add a composite [state, , id] index on app_usage_events and service_usage_events to support the keep-running cleanup's correlated lookups of related lifecycle events and the backfill's existence checks. Created concurrently on Postgres. The task lifecycle's correlated lookups (keyed by task_guid) are served by the existing app_usage_events_task_guid_index -- a task has only a handful of events, so probing by task_guid alone stays cheap and no [state, task_guid, id] index is needed. --- ...000_add_lifecycle_index_to_usage_events.rb | 68 +++++++++++++++++++ ...dd_lifecycle_index_to_usage_events_spec.rb | 44 ++++++++++++ 2 files changed, 112 insertions(+) create mode 100644 db/migrations/20260601120000_add_lifecycle_index_to_usage_events.rb create mode 100644 spec/migrations/20260601120000_add_lifecycle_index_to_usage_events_spec.rb diff --git a/db/migrations/20260601120000_add_lifecycle_index_to_usage_events.rb b/db/migrations/20260601120000_add_lifecycle_index_to_usage_events.rb new file mode 100644 index 00000000000..728394f11c0 --- /dev/null +++ b/db/migrations/20260601120000_add_lifecycle_index_to_usage_events.rb @@ -0,0 +1,68 @@ +Sequel.migration do + no_transaction # to use the 'concurrently' option + + up do + if database_type == :postgres + VCAP::Migration.with_concurrent_timeout(self) do + add_index :app_usage_events, %i[state app_guid id], + name: :app_usage_events_lifecycle_index, + if_not_exists: true, + concurrently: true + + add_index :service_usage_events, %i[state service_instance_guid id], + name: :service_usage_events_lifecycle_index, + if_not_exists: true, + concurrently: true + end + + elsif database_type == :mysql + alter_table :app_usage_events do + # rubocop:disable Sequel/ConcurrentIndex + add_index %i[state app_guid id], name: :app_usage_events_lifecycle_index unless @db.indexes(:app_usage_events).include?(:app_usage_events_lifecycle_index) + # rubocop:enable Sequel/ConcurrentIndex + end + + alter_table :service_usage_events do + # rubocop:disable Sequel/ConcurrentIndex + unless @db.indexes(:service_usage_events).include?(:service_usage_events_lifecycle_index) + add_index %i[state service_instance_guid id], + name: :service_usage_events_lifecycle_index + end + # rubocop:enable Sequel/ConcurrentIndex + end + end + end + + down do + if database_type == :postgres + VCAP::Migration.with_concurrent_timeout(self) do + drop_index :app_usage_events, %i[state app_guid id], + name: :app_usage_events_lifecycle_index, + if_exists: true, + concurrently: true + + drop_index :service_usage_events, %i[state service_instance_guid id], + name: :service_usage_events_lifecycle_index, + if_exists: true, + concurrently: true + end + end + + if database_type == :mysql + alter_table :app_usage_events do + # rubocop:disable Sequel/ConcurrentIndex + drop_index %i[state app_guid id], name: :app_usage_events_lifecycle_index if @db.indexes(:app_usage_events).include?(:app_usage_events_lifecycle_index) + # rubocop:enable Sequel/ConcurrentIndex + end + + alter_table :service_usage_events do + # rubocop:disable Sequel/ConcurrentIndex + if @db.indexes(:service_usage_events).include?(:service_usage_events_lifecycle_index) + drop_index %i[state service_instance_guid id], + name: :service_usage_events_lifecycle_index + end + # rubocop:enable Sequel/ConcurrentIndex + end + end + end +end diff --git a/spec/migrations/20260601120000_add_lifecycle_index_to_usage_events_spec.rb b/spec/migrations/20260601120000_add_lifecycle_index_to_usage_events_spec.rb new file mode 100644 index 00000000000..3180fbf9456 --- /dev/null +++ b/spec/migrations/20260601120000_add_lifecycle_index_to_usage_events_spec.rb @@ -0,0 +1,44 @@ +require 'spec_helper' +require 'migrations/helpers/migration_shared_context' + +RSpec.describe 'migration to add the lifecycle index to the usage event tables', isolation: :truncation, type: :migration do + include_context 'migration' do + let(:migration_filename) { '20260601120000_add_lifecycle_index_to_usage_events.rb' } + end + + let(:run_migration) do + Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true) + end + + let(:revert_migration) do + Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true) + end + + it 'adds the lifecycle index to both usage event tables (idempotently) and removes it on revert' do + # Before migration: the lifecycle indexes should not exist. + expect(db.indexes(:app_usage_events)).not_to include(:app_usage_events_lifecycle_index) + expect(db.indexes(:service_usage_events)).not_to include(:service_usage_events_lifecycle_index) + + # Up migration adds both indexes with the expected column order. + expect { run_migration }.not_to raise_error + expect(db.indexes(:app_usage_events)).to include(:app_usage_events_lifecycle_index) + expect(db.indexes(:service_usage_events)).to include(:service_usage_events_lifecycle_index) + expect(db.indexes(:app_usage_events)[:app_usage_events_lifecycle_index][:columns]).to eq(%i[state app_guid id]) + expect(db.indexes(:service_usage_events)[:service_usage_events_lifecycle_index][:columns]).to eq(%i[state service_instance_guid id]) + + # Up migration is idempotent: running again does not fail. + expect { run_migration }.not_to raise_error + expect(db.indexes(:app_usage_events)).to include(:app_usage_events_lifecycle_index) + expect(db.indexes(:service_usage_events)).to include(:service_usage_events_lifecycle_index) + + # Down migration removes both indexes. + expect { revert_migration }.not_to raise_error + expect(db.indexes(:app_usage_events)).not_to include(:app_usage_events_lifecycle_index) + expect(db.indexes(:service_usage_events)).not_to include(:service_usage_events_lifecycle_index) + + # Down migration is idempotent: running again does not fail. + expect { revert_migration }.not_to raise_error + expect(db.indexes(:app_usage_events)).not_to include(:app_usage_events_lifecycle_index) + expect(db.indexes(:service_usage_events)).not_to include(:service_usage_events_lifecycle_index) + end +end From bfa08b5805b22dc90000a656759b0541f7394c8f Mon Sep 17 00:00:00 2001 From: David Riddle Date: Wed, 10 Jun 2026 11:50:11 -0500 Subject: [PATCH 3/4] Backfill WAS_RUNNING events for running apps, tasks, and service instances Seed a synthetic WAS_RUNNING usage event for every currently-running app process, a TASK_WAS_RUNNING event for every currently-running task, and a WAS_RUNNING event for every existing service instance, so billing consumers can bootstrap a complete baseline even after the original STARTED/TASK_STARTED/CREATED events have been pruned. The backfill is a batched, idempotent VCAP::WasRunningBackfill helper invoked from thin no_transaction migrations (mirrors the bigint-migration pattern): each batch keysets over the started processes / running tasks / service instances by id and runs in its own READ COMMITTED transaction, so no statement risks the migration timeout and MySQL's INSERT..SELECT takes no shared next-key locks on the scanned source rows while the API serves traffic. The app backfill scopes the package/droplet aggregates to each batch's apps (index-backed) to keep package_state fidelity without scanning the whole tables, and COALESCEs nullable legacy process/app/task columns so a single NULL row cannot abort a deploy. Because the API stays live during migrations, a batch can race a concurrent stop/delete and insert a baseline row that no later ending event would ever prune; a post-seed sweep removes WAS_RUNNING/TASK_WAS_RUNNING rows whose resource is no longer running/present. A skip_was_running_backfill config flag lets operators opt out (checked by the migrations, not the helper, since the migrations are recorded as applied either way); 'rake db:was_running_backfill' re-runs the seeding later for operators who skipped. Rollback deletes are batched too. Document the WAS_RUNNING/TASK_WAS_RUNNING states and their created_at (migration-time) semantics on the V3 resources, and list the new states in the legacy V2 usage-event docs because V2 reads the same event rows. --- ...20100_seed_was_running_app_usage_events.rb | 18 ++ ...0_seed_was_running_service_usage_events.rb | 18 ++ ...0300_seed_was_running_task_usage_events.rb | 18 ++ .../list_all_app_usage_events.html | 2 + .../list_service_usage_events.html | 1 + .../resources/app_usage_events/_object.md.erb | 11 + .../service_usage_events/_object.md.erb | 10 + .../config_schemas/api_schema.rb | 1 + .../config_schemas/migrate_schema.rb | 1 + lib/database/was_running_backfill.rb | 305 ++++++++++++++++++ lib/tasks/db.rake | 17 + .../documentation/app_usage_event_api_spec.rb | 2 +- .../service_usage_events_api_spec.rb | 2 +- ..._seed_was_running_app_usage_events_spec.rb | 140 ++++++++ ...d_was_running_service_usage_events_spec.rb | 124 +++++++ ...seed_was_running_task_usage_events_spec.rb | 117 +++++++ .../bigint_migration_step1_shared_context.rb | 3 + .../bigint_migration_step3_shared_context.rb | 6 + .../lib/database/was_running_backfill_spec.rb | 247 ++++++++++++++ 19 files changed, 1041 insertions(+), 2 deletions(-) create mode 100644 db/migrations/20260601120100_seed_was_running_app_usage_events.rb create mode 100644 db/migrations/20260601120200_seed_was_running_service_usage_events.rb create mode 100644 db/migrations/20260601120300_seed_was_running_task_usage_events.rb create mode 100644 lib/database/was_running_backfill.rb create mode 100644 spec/migrations/20260601120100_seed_was_running_app_usage_events_spec.rb create mode 100644 spec/migrations/20260601120200_seed_was_running_service_usage_events_spec.rb create mode 100644 spec/migrations/20260601120300_seed_was_running_task_usage_events_spec.rb create mode 100644 spec/unit/lib/database/was_running_backfill_spec.rb diff --git a/db/migrations/20260601120100_seed_was_running_app_usage_events.rb b/db/migrations/20260601120100_seed_was_running_app_usage_events.rb new file mode 100644 index 00000000000..3749f1c3967 --- /dev/null +++ b/db/migrations/20260601120100_seed_was_running_app_usage_events.rb @@ -0,0 +1,18 @@ +require 'database/was_running_backfill' + +Sequel.migration do + no_transaction # backfill manages its own per-batch transactions + + up do + logger = Steno.logger('cc.backfill.was_running') + if VCAP::WasRunningBackfill.skip? + VCAP::WasRunningBackfill.log_skip(logger, 'app') + else + VCAP::WasRunningBackfill.seed_app_usage_events(self, logger) + end + end + + down do + VCAP::WasRunningBackfill.delete_app_usage_events(self) + end +end diff --git a/db/migrations/20260601120200_seed_was_running_service_usage_events.rb b/db/migrations/20260601120200_seed_was_running_service_usage_events.rb new file mode 100644 index 00000000000..ead0eb7aea1 --- /dev/null +++ b/db/migrations/20260601120200_seed_was_running_service_usage_events.rb @@ -0,0 +1,18 @@ +require 'database/was_running_backfill' + +Sequel.migration do + no_transaction # backfill manages its own per-batch transactions + + up do + logger = Steno.logger('cc.backfill.was_running') + if VCAP::WasRunningBackfill.skip? + VCAP::WasRunningBackfill.log_skip(logger, 'service') + else + VCAP::WasRunningBackfill.seed_service_usage_events(self, logger) + end + end + + down do + VCAP::WasRunningBackfill.delete_service_usage_events(self) + end +end diff --git a/db/migrations/20260601120300_seed_was_running_task_usage_events.rb b/db/migrations/20260601120300_seed_was_running_task_usage_events.rb new file mode 100644 index 00000000000..2e38e888312 --- /dev/null +++ b/db/migrations/20260601120300_seed_was_running_task_usage_events.rb @@ -0,0 +1,18 @@ +require 'database/was_running_backfill' + +Sequel.migration do + no_transaction # backfill manages its own per-batch transactions + + up do + logger = Steno.logger('cc.backfill.was_running') + if VCAP::WasRunningBackfill.skip? + VCAP::WasRunningBackfill.log_skip(logger, 'task') + else + VCAP::WasRunningBackfill.seed_task_usage_events(self, logger) + end + end + + down do + VCAP::WasRunningBackfill.delete_task_usage_events(self) + end +end diff --git a/docs/v2/app_usage_events/list_all_app_usage_events.html b/docs/v2/app_usage_events/list_all_app_usage_events.html index 471dd0e8586..583bce89d12 100644 --- a/docs/v2/app_usage_events/list_all_app_usage_events.html +++ b/docs/v2/app_usage_events/list_all_app_usage_events.html @@ -631,9 +631,11 @@

Body

  • STARTED
  • STOPPED
  • +
  • WAS_RUNNING
  • BUILDPACK_SET
  • TASK_STARTED
  • TASK_STOPPED
  • +
  • TASK_WAS_RUNNING
diff --git a/docs/v2/service_usage_events/list_service_usage_events.html b/docs/v2/service_usage_events/list_service_usage_events.html index 501999684c8..5f1b68b393a 100644 --- a/docs/v2/service_usage_events/list_service_usage_events.html +++ b/docs/v2/service_usage_events/list_service_usage_events.html @@ -290,6 +290,7 @@

Body

  • CREATED
  • DELETED
  • UPDATED
  • +
  • WAS_RUNNING
  • diff --git a/docs/v3/source/includes/resources/app_usage_events/_object.md.erb b/docs/v3/source/includes/resources/app_usage_events/_object.md.erb index 36afafaf3d1..515c6c84385 100644 --- a/docs/v3/source/includes/resources/app_usage_events/_object.md.erb +++ b/docs/v3/source/includes/resources/app_usage_events/_object.md.erb @@ -30,3 +30,14 @@ Name | Type | Description **instance_count.current** | _integer_ or `null` | Current instance count of the app that this event pertains to, if applicable **instance_count.previous** | _integer_ or `null` | Previous instance count of the app that this event pertains to, if applicable **links** | [_links object_](#links) | Links to related resources + +#### WAS_RUNNING and TASK_WAS_RUNNING events + +`WAS_RUNNING` and `TASK_WAS_RUNNING` are synthetic values for `state.current` recorded once per running process (`WAS_RUNNING`) and once per running task (`TASK_WAS_RUNNING`) by a one-time data migration when the keep-running cleanup feature was introduced. They mark every process and task that was already running at the time of the upgrade so that billing consumers can bootstrap from a complete baseline even if the original `STARTED`/`TASK_STARTED` events have been pruned. + +**Consumer interpretation** (read `WAS_RUNNING`/`STARTED` as `TASK_WAS_RUNNING`/`TASK_STARTED` for task events, which are keyed by `task.guid`): + +* If you have not previously recorded a `STARTED` event for this resource, treat `WAS_RUNNING` as equivalent to `STARTED`. +* If you have already recorded `STARTED` (or an earlier `WAS_RUNNING`) for this resource, treat as a redundant baseline confirmation and ignore. +* `created_at` reflects when the backfill migration ran, **not** when the app or task actually started. Treat `WAS_RUNNING` as a baseline marker that the resource was already running as of that timestamp, not as the true start of the running interval. +* `state.previous` on a `WAS_RUNNING` event is always `null`. Subsequent real events for the same resource will continue to report their actual prior process state in `state.previous` (typically `STARTED`). If you perform chain validation, treat `WAS_RUNNING` as equivalent to `STARTED` for the purpose of validating the next event's `state.previous`. diff --git a/docs/v3/source/includes/resources/service_usage_events/_object.md.erb b/docs/v3/source/includes/resources/service_usage_events/_object.md.erb index f904e1ff083..36c785d0a91 100644 --- a/docs/v3/source/includes/resources/service_usage_events/_object.md.erb +++ b/docs/v3/source/includes/resources/service_usage_events/_object.md.erb @@ -26,3 +26,13 @@ Name | Type | Description **service_broker.guid** | _string_ or `null` | Unique identifier of the service broker that this event pertains to, if applicable **service_broker.name** | _string_ or `null` | Name of the service broker that this event pertains to, if applicable **links** | [_links object_](#links) | Links to related resources + +#### WAS_RUNNING events + +`WAS_RUNNING` is a synthetic value for `state` recorded once per existing service instance by a one-time data migration when the keep-running cleanup feature was introduced. It marks every service instance that existed at the time of the upgrade so that billing consumers can bootstrap from a complete baseline of service instances even if the original `CREATED` events have been pruned. + +**Consumer interpretation:** + +* If you have not previously recorded a `CREATED` event for this service instance, treat `WAS_RUNNING` as equivalent to `CREATED`. +* If you have already recorded `CREATED` (or an earlier `WAS_RUNNING`) for this instance, treat as a redundant baseline confirmation and ignore. +* `created_at` reflects when the backfill migration ran, **not** when the service instance was created. Treat `WAS_RUNNING` as a baseline marker that the instance already existed as of that timestamp. diff --git a/lib/cloud_controller/config_schemas/api_schema.rb b/lib/cloud_controller/config_schemas/api_schema.rb index ac952146dc3..7c9b685478c 100644 --- a/lib/cloud_controller/config_schemas/api_schema.rb +++ b/lib/cloud_controller/config_schemas/api_schema.rb @@ -109,6 +109,7 @@ class ApiSchema < VCAP::Config optional(:migration_psql_concurrent_statement_timeout_in_seconds) => Integer, optional(:migration_psql_worker_memory_kb) => Integer, optional(:skip_bigint_id_migration) => bool, + optional(:skip_was_running_backfill) => bool, db: { optional(:database) => Hash, # db connection hash for sequel max_connections: Integer, # max connections in the connection pool diff --git a/lib/cloud_controller/config_schemas/migrate_schema.rb b/lib/cloud_controller/config_schemas/migrate_schema.rb index 971ff0999aa..eff4f6e2d14 100644 --- a/lib/cloud_controller/config_schemas/migrate_schema.rb +++ b/lib/cloud_controller/config_schemas/migrate_schema.rb @@ -10,6 +10,7 @@ class MigrateSchema < VCAP::Config optional(:migration_psql_concurrent_statement_timeout_in_seconds) => Integer, optional(:migration_psql_worker_memory_kb) => Integer, optional(:skip_bigint_id_migration) => bool, + optional(:skip_was_running_backfill) => bool, db: { optional(:database) => Hash, # db connection hash for sequel diff --git a/lib/database/was_running_backfill.rb b/lib/database/was_running_backfill.rb new file mode 100644 index 00000000000..653f6ab4211 --- /dev/null +++ b/lib/database/was_running_backfill.rb @@ -0,0 +1,305 @@ +# Backfills synthetic WAS_RUNNING usage events (TASK_WAS_RUNNING for tasks) for +# resources that were already running/existing when the keep-running cleanup +# feature was introduced. +# +# Run from thin Sequel migrations as a batched, idempotent, resumable operation +# (mirrors VCAP::BigintMigration): each batch is keyset-paginated by source-table +# id and runs in its own transaction, well under the migration statement timeout. +# Re-running is safe via the NOT EXISTS guard. Uses only raw Sequel — no Cloud +# Controller models/repositories — because the schema, not the app code, is the +# contract at migration time. +# rubocop:disable Metrics/ModuleLength +module VCAP::WasRunningBackfill + WAS_RUNNING = 'WAS_RUNNING'.freeze + # Task baselines use a distinct state because task events share the + # app_usage_events table with app events but carry an empty app_guid -- see + # Repositories::AppUsageEventRepository::TASK_WAS_RUNNING_EVENT_STATE. + TASK_WAS_RUNNING = 'TASK_WAS_RUNNING'.freeze + DEFAULT_BATCH_SIZE = 1000 + + class << self + # Operators can opt out (e.g. very large foundations, or downstream usage-event + # consumers that are not yet ready for the WAS_RUNNING state). Mirrors + # skip_bigint_id_migration. The flag is checked by the seed migrations rather + # than here, so the 'db:was_running_backfill' rake task can still seed the + # baseline after a skipped migration has been recorded as applied. + def skip? + VCAP::CloudController::Config.config&.get(:skip_was_running_backfill) || false + rescue VCAP::CloudController::Config::InvalidConfigPath + false + end + + def log_skip(logger, kind) + logger.info("skipping WAS_RUNNING #{kind} usage event backfill (skip_was_running_backfill is set); " \ + "run 'rake db:was_running_backfill' to seed the baseline later") + end + + def seed_app_usage_events(db, logger, batch_size: DEFAULT_BATCH_SIZE) + uuid_fn = uuid_function(db) + each_batch(db[:processes].where(state: 'STARTED'), batch_size) do |low, high| + db.run(app_usage_events_insert_sql(uuid_fn, low, high)) + logger.info("backfilled WAS_RUNNING app usage events up to process id #{high}") + end + sweep_stale_app_usage_events(db, logger, batch_size) + end + + def seed_service_usage_events(db, logger, batch_size: DEFAULT_BATCH_SIZE) + uuid_fn = uuid_function(db) + each_batch(db[:service_instances], batch_size) do |low, high| + db.run(service_usage_events_insert_sql(uuid_fn, low, high)) + logger.info("backfilled WAS_RUNNING service usage events up to service_instance id #{high}") + end + sweep_stale_service_usage_events(db, logger, batch_size) + end + + def seed_task_usage_events(db, logger, batch_size: DEFAULT_BATCH_SIZE) + uuid_fn = uuid_function(db) + each_batch(db[:tasks].where(state: 'RUNNING'), batch_size) do |low, high| + db.run(task_usage_events_insert_sql(uuid_fn, low, high)) + logger.info("backfilled TASK_WAS_RUNNING app usage events up to task id #{high}") + end + sweep_stale_task_usage_events(db, logger, batch_size) + end + + def delete_app_usage_events(db, batch_size: DEFAULT_BATCH_SIZE) + delete_was_running(db, :app_usage_events, batch_size, state: WAS_RUNNING) + end + + def delete_service_usage_events(db, batch_size: DEFAULT_BATCH_SIZE) + delete_was_running(db, :service_usage_events, batch_size, state: WAS_RUNNING) + end + + def delete_task_usage_events(db, batch_size: DEFAULT_BATCH_SIZE) + delete_was_running(db, :app_usage_events, batch_size, state: TASK_WAS_RUNNING) + end + + private + + # The latest-package / latest-droplet aggregates are scoped to the batch's apps + # so they never scan the whole packages/droplets tables (the cost that would + # otherwise blow the migration statement timeout). + # + # The COALESCEs are defensive: processes.memory/instances and apps.name are + # nullable columns whose defaults are normally backfilled by the model layer, + # but the target event columns are NOT NULL — one legacy NULL row written + # outside the models must not abort the whole migration. + # + # previous_state is NULL, not the current state. purge_and_reseed_started_apps! + # writes previous_state == state (STARTED/STARTED) to mean "no change, current + # snapshot" — safe only because its state is the real STARTED. Our state is the + # synthetic WAS_RUNNING, so STARTED here would invent a STARTED->WAS_RUNNING + # transition, and WAS_RUNNING would imply an earlier WAS_RUNNING event that + # never actually happened. + def app_usage_events_insert_sql(uuid_fn, low, high) + batch_apps = "SELECT app_guid FROM processes WHERE id > #{low} AND id <= #{high} AND state = 'STARTED'" + <<~SQL.squish + INSERT INTO app_usage_events ( + guid, created_at, + state, previous_state, + instance_count, previous_instance_count, + memory_in_mb_per_instance, previous_memory_in_mb_per_instance, + app_guid, app_name, + parent_app_guid, parent_app_name, + process_type, + space_guid, space_name, org_guid, + buildpack_guid, buildpack_name, + package_state, previous_package_state + ) + SELECT + #{uuid_fn}, CURRENT_TIMESTAMP, + '#{WAS_RUNNING}', NULL, + COALESCE(p.instances, 0), COALESCE(p.instances, 0), + COALESCE(p.memory, 0), COALESCE(p.memory, 0), + p.guid, COALESCE(parent_app.name, ''), + parent_app.guid, COALESCE(parent_app.name, ''), + p.type, + spaces.guid, spaces.name, organizations.guid, + desired_droplet.buildpack_receipt_buildpack_guid, desired_droplet.buildpack_receipt_buildpack, + CASE + WHEN latest_droplet.state = 'FAILED' THEN 'FAILED' + WHEN latest_droplet.state = 'STAGED' AND latest_droplet.guid = parent_app.droplet_guid THEN 'STAGED' + WHEN latest_package.state = 'FAILED' THEN 'FAILED' + ELSE 'PENDING' + END, + 'UNKNOWN' + FROM processes p + INNER JOIN apps parent_app ON parent_app.guid = p.app_guid + INNER JOIN spaces ON spaces.guid = parent_app.space_guid + INNER JOIN organizations ON organizations.id = spaces.organization_id + LEFT JOIN droplets desired_droplet ON desired_droplet.guid = parent_app.droplet_guid + LEFT JOIN ( + SELECT pkg.guid, pkg.app_guid, pkg.state FROM packages pkg + INNER JOIN ( + SELECT app_guid, MAX(id) AS max_id FROM packages + WHERE app_guid IN (#{batch_apps}) GROUP BY app_guid + ) lp_ids ON lp_ids.app_guid = pkg.app_guid AND lp_ids.max_id = pkg.id + ) latest_package ON latest_package.app_guid = parent_app.guid + LEFT JOIN ( + SELECT d.guid, d.package_guid, d.state FROM droplets d + INNER JOIN ( + SELECT package_guid, MAX(id) AS max_id FROM droplets + WHERE package_guid IN (SELECT guid FROM packages WHERE app_guid IN (#{batch_apps})) + GROUP BY package_guid + ) ld_ids ON ld_ids.package_guid = d.package_guid AND ld_ids.max_id = d.id + ) latest_droplet ON latest_droplet.package_guid = latest_package.guid + WHERE p.id > #{low} AND p.id <= #{high} + AND p.state = 'STARTED' + AND NOT EXISTS ( + SELECT 1 FROM app_usage_events WHERE state = '#{WAS_RUNNING}' AND app_guid = p.guid + ) + SQL + end + + def service_usage_events_insert_sql(uuid_fn, low, high) + <<~SQL.squish + INSERT INTO service_usage_events ( + guid, created_at, state, + service_instance_guid, service_instance_name, service_instance_type, + service_plan_guid, service_plan_name, + service_guid, service_label, + service_broker_name, service_broker_guid, + space_guid, space_name, org_guid + ) + SELECT + #{uuid_fn}, CURRENT_TIMESTAMP, '#{WAS_RUNNING}', + service_instances.guid, service_instances.name, + CASE WHEN service_instances.is_gateway_service THEN 'managed_service_instance' ELSE 'user_provided_service_instance' END, + service_plans.guid, service_plans.name, + services.guid, services.label, + service_brokers.name, service_brokers.guid, + spaces.guid, spaces.name, organizations.guid + FROM service_instances + INNER JOIN spaces ON spaces.id = service_instances.space_id + INNER JOIN organizations ON organizations.id = spaces.organization_id + LEFT OUTER JOIN service_plans ON service_plans.id = service_instances.service_plan_id + LEFT OUTER JOIN services ON services.id = service_plans.service_id + LEFT OUTER JOIN service_brokers ON service_brokers.id = services.service_broker_id + WHERE service_instances.id > #{low} AND service_instances.id <= #{high} + AND NOT EXISTS ( + SELECT 1 FROM service_usage_events WHERE state = '#{WAS_RUNNING}' AND service_instance_guid = service_instances.guid + ) + SQL + end + + # Mirrors AppUsageEventRepository#create_from_task: task events carry an + # empty app_guid/app_name and are keyed by task_guid instead. The COALESCE + # on memory_in_mb is defensive -- it is a nullable legacy column whose + # default is normally backfilled by the model layer. + # + # previous_state is NULL for the same reason as the app baseline (see + # app_usage_events_insert_sql): TASK_WAS_RUNNING is synthetic, so RUNNING here + # would invent a RUNNING->TASK_WAS_RUNNING transition that never happened. + def task_usage_events_insert_sql(uuid_fn, low, high) + <<~SQL.squish + INSERT INTO app_usage_events ( + guid, created_at, + state, previous_state, + instance_count, previous_instance_count, + memory_in_mb_per_instance, previous_memory_in_mb_per_instance, + app_guid, app_name, + parent_app_guid, parent_app_name, + space_guid, space_name, org_guid, + package_state, previous_package_state, + task_guid, task_name + ) + SELECT + #{uuid_fn}, CURRENT_TIMESTAMP, + '#{TASK_WAS_RUNNING}', NULL, + 1, 1, + COALESCE(t.memory_in_mb, 0), COALESCE(t.memory_in_mb, 0), + '', '', + parent_app.guid, COALESCE(parent_app.name, ''), + spaces.guid, spaces.name, organizations.guid, + 'STAGED', 'STAGED', + t.guid, t.name + FROM tasks t + INNER JOIN apps parent_app ON parent_app.guid = t.app_guid + INNER JOIN spaces ON spaces.guid = parent_app.space_guid + INNER JOIN organizations ON organizations.id = spaces.organization_id + WHERE t.id > #{low} AND t.id <= #{high} + AND t.state = 'RUNNING' + AND NOT EXISTS ( + SELECT 1 FROM app_usage_events WHERE state = '#{TASK_WAS_RUNNING}' AND task_guid = t.guid + ) + SQL + end + + # The API stays live while the backfill runs, so a batch's INSERT..SELECT can + # race a concurrent stop/delete: the statement's snapshot still sees the + # resource as running and inserts a WAS_RUNNING row with a HIGHER id than the + # concurrent STOPPED/DELETED event. No later ending event would ever arrive + # for that resource, so the keep-running cleanup would retain the row forever + # and consumers would read the stopped resource as still running. Sweep such + # rows afterwards in id-keyed batches (like the rollback deletes) -- the + # predicate is re-checked against current resource state each batch, and no + # single DELETE (or its lock hold) grows large enough to risk the timeout. + def sweep_stale_app_usage_events(db, logger, batch_size) + stale = db[:app_usage_events]. + where(state: WAS_RUNNING). + where(Sequel.lit("NOT EXISTS (SELECT 1 FROM processes WHERE processes.guid = app_usage_events.app_guid AND processes.state = 'STARTED')")) + deleted = batch_delete(db, :app_usage_events, stale, batch_size) + logger.info("swept #{deleted} stale WAS_RUNNING app usage events") if deleted.positive? + end + + def sweep_stale_service_usage_events(db, logger, batch_size) + stale = db[:service_usage_events]. + where(state: WAS_RUNNING). + where(Sequel.lit('NOT EXISTS (SELECT 1 FROM service_instances WHERE service_instances.guid = service_usage_events.service_instance_guid)')) + deleted = batch_delete(db, :service_usage_events, stale, batch_size) + logger.info("swept #{deleted} stale WAS_RUNNING service usage events") if deleted.positive? + end + + def sweep_stale_task_usage_events(db, logger, batch_size) + stale = db[:app_usage_events]. + where(state: TASK_WAS_RUNNING). + where(Sequel.lit("NOT EXISTS (SELECT 1 FROM tasks WHERE tasks.guid = app_usage_events.task_guid AND tasks.state = 'RUNNING')")) + deleted = batch_delete(db, :app_usage_events, stale, batch_size) + logger.info("swept #{deleted} stale TASK_WAS_RUNNING app usage events") if deleted.positive? + end + + # Keyset-paginate over a source dataset by id, yielding the (exclusive-low, + # inclusive-high) id bounds of each batch inside its own transaction. + def each_batch(source, batch_size) + cursor = 0 + loop do + high = source.where(Sequel.lit('id > ?', cursor)).order(:id).limit(batch_size).max(:id) + break if high.nil? + + # READ COMMITTED keeps MySQL's INSERT..SELECT from taking shared next-key + # locks on every scanned source row while the API serves traffic (safe: + # CF MySQL releases run with binlog_format=ROW). On Postgres it is the + # default isolation level anyway. + source.db.transaction(isolation: :committed) { yield(cursor, high) } + cursor = high + end + end + + # Delete every row matching `dataset` in batches of `batch_size` ids so + # neither the statement nor its lock hold ever grows large enough to risk the + # migration statement timeout. Returns the number of rows deleted. Re-selecting + # the dataset each pass is what lets the sweeps re-check current resource state. + def batch_delete(db, table, dataset, batch_size) + deleted = 0 + loop do + ids = dataset.limit(batch_size).select_map(:id) + break if ids.empty? + + deleted += db[table].where(id: ids).delete + end + deleted + end + + def delete_was_running(db, table, batch_size, state:) + batch_delete(db, table, db[table].where(state: state), batch_size) + end + + def uuid_function(db) + case db.database_type + when :postgres then 'get_uuid()' + when :mysql then 'UUID()' + else raise "unsupported database: #{db.database_type}" + end + end + end +end +# rubocop:enable Metrics/ModuleLength diff --git a/lib/tasks/db.rake b/lib/tasks/db.rake index 8cbdb229c9b..2530577e626 100644 --- a/lib/tasks/db.rake +++ b/lib/tasks/db.rake @@ -192,6 +192,23 @@ namespace :db do VCAP::BigintMigration.backfill(logger, db, args.table.to_sym, batch_size: args.batch_size.to_i, iterations: args.iterations.to_i) end + desc 'Seed WAS_RUNNING usage events for running apps, running tasks, and existing service instances ' \ + '(e.g. after the seed migrations were skipped via skip_was_running_backfill)' + task :was_running_backfill, %i[batch_size] => :environment do |_t, args| + args.with_defaults(batch_size: 1000) + + RakeConfig.context = :migrate + + require 'database/was_running_backfill' + logging_output + logger = Steno.logger('cc.db.was_running_backfill') + RakeConfig.config.load_db_encryption_key + db = VCAP::CloudController::DB.connect(RakeConfig.config.get(:db), logger) + VCAP::WasRunningBackfill.seed_app_usage_events(db, logger, batch_size: args.batch_size.to_i) + VCAP::WasRunningBackfill.seed_task_usage_events(db, logger, batch_size: args.batch_size.to_i) + VCAP::WasRunningBackfill.seed_service_usage_events(db, logger, batch_size: args.batch_size.to_i) + end + namespace :dev do desc 'Migrate the database set in spec/support/bootstrap/db_config' task migrate: :environment do diff --git a/spec/api/documentation/app_usage_event_api_spec.rb b/spec/api/documentation/app_usage_event_api_spec.rb index 4c7ff0c07a9..f8e45e421e0 100644 --- a/spec/api/documentation/app_usage_event_api_spec.rb +++ b/spec/api/documentation/app_usage_event_api_spec.rb @@ -47,7 +47,7 @@ "The desired state of the app or 'BUILDPACK_SET' when buildpack info has been set.", required: false, readonly: true, - valid_values: %w[STARTED STOPPED BUILDPACK_SET TASK_STARTED TASK_STOPPED] + valid_values: %w[STARTED STOPPED WAS_RUNNING BUILDPACK_SET TASK_STARTED TASK_STOPPED TASK_WAS_RUNNING] field :task_guid, 'The GUID of the task if one exists.', required: false, readonly: true, experimental: true field :task_name, 'The NAME of the task if one exists.', required: false, readonly: true, experimental: true diff --git a/spec/api/documentation/service_usage_events_api_spec.rb b/spec/api/documentation/service_usage_events_api_spec.rb index 4d3e5dc3cda..94c707d71b3 100644 --- a/spec/api/documentation/service_usage_events_api_spec.rb +++ b/spec/api/documentation/service_usage_events_api_spec.rb @@ -13,7 +13,7 @@ get '/v2/service_usage_events' do field :guid, 'The guid of the event.', required: false - field :state, 'The desired state of the service.', required: false, readonly: true, valid_values: %w[CREATED DELETED UPDATED] + field :state, 'The desired state of the service.', required: false, readonly: true, valid_values: %w[CREATED DELETED UPDATED WAS_RUNNING] field :org_guid, 'The GUID of the organization.', required: false, readonly: true field :space_guid, 'The GUID of the space.', required: false, readonly: true field :space_name, 'The name of the space.', required: false, readonly: true diff --git a/spec/migrations/20260601120100_seed_was_running_app_usage_events_spec.rb b/spec/migrations/20260601120100_seed_was_running_app_usage_events_spec.rb new file mode 100644 index 00000000000..877cc3f04de --- /dev/null +++ b/spec/migrations/20260601120100_seed_was_running_app_usage_events_spec.rb @@ -0,0 +1,140 @@ +require 'spec_helper' +require 'migrations/helpers/migration_shared_context' + +RSpec.describe 'migration to seed WAS_RUNNING events for currently-running app processes', isolation: :truncation, type: :migration do + include_context 'migration' do + let(:migration_filename) { '20260601120100_seed_was_running_app_usage_events.rb' } + end + + let(:run_migration) do + Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true) + end + + let(:revert_migration) do + Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true) + end + + # Builds an org/space scaffold and returns the space guid that apps can reference. + def seed_space(suffix) + quota_id = db[:quota_definitions].insert(guid: "quota-#{suffix}", name: "quota-#{suffix}", non_basic_services_allowed: true, + total_services: 10, memory_limit: 1024, total_routes: 10) + org_id = db[:organizations].insert(guid: "org-#{suffix}", name: "org-#{suffix}", quota_definition_id: quota_id) + db[:spaces].insert(guid: "space-#{suffix}", name: "space-#{suffix}", organization_id: org_id) + "space-#{suffix}" + end + + def seed_app_event(suffix, state:, app_guid:) + db[:app_usage_events].insert(guid: "event-#{suffix}", created_at: Time.now.utc, state: state, + instance_count: 1, memory_in_mb_per_instance: 1, app_guid: app_guid, app_name: "app-#{suffix}", + space_guid: "space-#{suffix}", space_name: "space-#{suffix}", org_guid: "org-#{suffix}") + end + + describe 'up migration' do + it 'seeds WAS_RUNNING rows only for started processes, skips stopped ones, and preserves existing rows' do + space_guid = seed_space('main') + + # A running process with no droplet -> package_state PENDING + db[:apps].insert(guid: 'app-pending', name: 'pending-app', space_guid: space_guid) + db[:processes].insert(guid: 'proc-pending', app_guid: 'app-pending', state: 'STARTED', instances: 3, memory: 512, type: 'web') + + # A running process whose app points at a STAGED droplet -> package_state STAGED. + # Insert order avoids the apps.droplet_guid <-> droplets <-> packages.app_guid FK cycle. + db[:apps].insert(guid: 'app-staged', name: 'staged-app', space_guid: space_guid) + db[:packages].insert(guid: 'pkg-staged', app_guid: 'app-staged', state: 'READY') + db[:droplets].insert(guid: 'drop-staged', package_guid: 'pkg-staged', state: 'STAGED') + db[:apps].where(guid: 'app-staged').update(droplet_guid: 'drop-staged') + db[:processes].insert(guid: 'proc-staged', app_guid: 'app-staged', state: 'STARTED', instances: 1, memory: 256, type: 'web') + + # A running process whose latest droplet FAILED -> package_state FAILED + db[:apps].insert(guid: 'app-faildrop', name: 'faildrop-app', space_guid: space_guid) + db[:packages].insert(guid: 'pkg-faildrop', app_guid: 'app-faildrop', state: 'READY') + db[:droplets].insert(guid: 'drop-faildrop', package_guid: 'pkg-faildrop', state: 'FAILED') + db[:apps].where(guid: 'app-faildrop').update(droplet_guid: 'drop-faildrop') + db[:processes].insert(guid: 'proc-faildrop', app_guid: 'app-faildrop', state: 'STARTED', instances: 1, memory: 128, type: 'web') + + # A running process whose latest package FAILED (and has no droplet) -> package_state FAILED + db[:apps].insert(guid: 'app-failpkg', name: 'failpkg-app', space_guid: space_guid) + db[:packages].insert(guid: 'pkg-failpkg', app_guid: 'app-failpkg', state: 'FAILED') + db[:processes].insert(guid: 'proc-failpkg', app_guid: 'app-failpkg', state: 'STARTED', instances: 1, memory: 128, type: 'web') + + # A stopped process -> no WAS_RUNNING row + db[:apps].insert(guid: 'app-stopped', name: 'stopped-app', space_guid: space_guid) + db[:processes].insert(guid: 'proc-stopped', app_guid: 'app-stopped', state: 'STOPPED', instances: 1, memory: 128, type: 'web') + + # A running process that already has a WAS_RUNNING row -> not duplicated + db[:apps].insert(guid: 'app-existing', name: 'existing-app', space_guid: space_guid) + db[:processes].insert(guid: 'proc-existing', app_guid: 'app-existing', state: 'STARTED', instances: 1, memory: 128, type: 'web') + seed_app_event('existing', state: 'WAS_RUNNING', app_guid: 'proc-existing') + + # An unrelated pre-existing row that must be preserved (no truncate) + preexisting_id = seed_app_event('unrelated', state: 'STARTED', app_guid: 'some-other-guid') + + run_migration + + was_running = db[:app_usage_events].where(state: 'WAS_RUNNING') + # One row each for proc-pending, proc-staged, proc-faildrop and proc-failpkg, plus the + # pre-seeded proc-existing row (not duplicated). + expect(was_running.count).to eq(5) + expect(was_running.where(app_guid: 'proc-stopped').count).to eq(0) + expect(was_running.where(app_guid: 'proc-existing').count).to eq(1) + expect(db[:app_usage_events].where(id: preexisting_id).count).to eq(1) + + pending_row = was_running.where(app_guid: 'proc-pending').first + expect(pending_row[:guid]).to be_present + expect(pending_row[:previous_state]).to be_nil + expect(pending_row[:app_name]).to eq('pending-app') + expect(pending_row[:parent_app_guid]).to eq('app-pending') + expect(pending_row[:parent_app_name]).to eq('pending-app') + expect(pending_row[:process_type]).to eq('web') + expect(pending_row[:space_guid]).to eq(space_guid) + expect(pending_row[:space_name]).to eq('space-main') + expect(pending_row[:org_guid]).to eq('org-main') + expect(pending_row[:instance_count]).to eq(3) + expect(pending_row[:previous_instance_count]).to eq(3) + expect(pending_row[:memory_in_mb_per_instance]).to eq(512) + expect(pending_row[:previous_memory_in_mb_per_instance]).to eq(512) + expect(pending_row[:package_state]).to eq('PENDING') + expect(pending_row[:previous_package_state]).to eq('UNKNOWN') + + expect(was_running.where(app_guid: 'proc-staged').first[:package_state]).to eq('STAGED') + expect(was_running.where(app_guid: 'proc-faildrop').first[:package_state]).to eq('FAILED') + expect(was_running.where(app_guid: 'proc-failpkg').first[:package_state]).to eq('FAILED') + + # Idempotency of the seeding itself (the NOT EXISTS guard) is covered in + # spec/unit/lib/database/was_running_backfill_spec.rb, where seed_app_usage_events + # runs twice; the migrator does not re-apply an already-recorded migration. + end + + context 'when skip_was_running_backfill is set' do + before do + allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_was_running_backfill).and_return(true) + end + + it 'does not seed any WAS_RUNNING rows' do + space_guid = seed_space('main') + db[:apps].insert(guid: 'app-skip', name: 'skip-app', space_guid: space_guid) + db[:processes].insert(guid: 'proc-skip', app_guid: 'app-skip', state: 'STARTED', instances: 1, memory: 128, type: 'web') + + run_migration + + expect(db[:app_usage_events].where(state: 'WAS_RUNNING').count).to eq(0) + end + end + end + + describe 'down migration' do + it 'removes only the WAS_RUNNING rows' do + space_guid = seed_space('main') + db[:apps].insert(guid: 'app-down', name: 'down-app', space_guid: space_guid) + db[:processes].insert(guid: 'proc-down', app_guid: 'app-down', state: 'STARTED', instances: 1, memory: 128, type: 'web') + unrelated_id = seed_app_event('unrelated', state: 'STARTED', app_guid: 'some-other-guid') + + run_migration + expect(db[:app_usage_events].where(state: 'WAS_RUNNING').count).to eq(1) + + revert_migration + expect(db[:app_usage_events].where(state: 'WAS_RUNNING').count).to eq(0) + expect(db[:app_usage_events].where(id: unrelated_id).count).to eq(1) + end + end +end diff --git a/spec/migrations/20260601120200_seed_was_running_service_usage_events_spec.rb b/spec/migrations/20260601120200_seed_was_running_service_usage_events_spec.rb new file mode 100644 index 00000000000..7d0f79a296b --- /dev/null +++ b/spec/migrations/20260601120200_seed_was_running_service_usage_events_spec.rb @@ -0,0 +1,124 @@ +require 'spec_helper' +require 'migrations/helpers/migration_shared_context' + +RSpec.describe 'migration to seed WAS_RUNNING events for existing service instances', isolation: :truncation, type: :migration do + include_context 'migration' do + let(:migration_filename) { '20260601120200_seed_was_running_service_usage_events.rb' } + end + + let(:run_migration) do + Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true) + end + + let(:revert_migration) do + Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true) + end + + # Builds an org/space scaffold and returns the space id that instances can reference. + def seed_space(suffix) + quota_id = db[:quota_definitions].insert(guid: "quota-#{suffix}", name: "quota-#{suffix}", non_basic_services_allowed: true, + total_services: 10, memory_limit: 1024, total_routes: 10) + org_id = db[:organizations].insert(guid: "org-#{suffix}", name: "org-#{suffix}", quota_definition_id: quota_id) + db[:spaces].insert(guid: "space-#{suffix}", name: "space-#{suffix}", organization_id: org_id) + end + + # Builds a broker -> service -> plan chain and returns the plan id. + def seed_plan(suffix) + broker_id = db[:service_brokers].insert(guid: "broker-#{suffix}", name: "broker-#{suffix}", broker_url: 'http://example.com', auth_password: 'pw') + service_id = db[:services].insert(guid: "service-#{suffix}", label: "service-#{suffix}", description: 'desc', bindable: true, service_broker_id: broker_id) + db[:service_plans].insert(guid: "plan-#{suffix}", name: "plan-#{suffix}", description: 'desc', free: true, service_id: service_id, unique_id: "plan-unique-#{suffix}") + end + + def seed_service_event(suffix, state:, service_instance_guid:) + db[:service_usage_events].insert(guid: "event-#{suffix}", created_at: Time.now.utc, state: state, + org_guid: 'org-main', space_guid: 'space-main', space_name: 'space-main', + service_instance_guid: service_instance_guid, service_instance_name: "instance-#{suffix}", + service_instance_type: 'managed_service_instance') + end + + describe 'up migration' do + it 'seeds one WAS_RUNNING row per instance with the correct type and preserves existing rows' do + space_id = seed_space('main') + plan_id = seed_plan('main') + + # A managed instance -> managed_service_instance type with full broker chain. + db[:service_instances].insert(guid: 'managed-guid', name: 'my-instance', space_id: space_id, is_gateway_service: true, service_plan_id: plan_id) + # A user-provided instance -> user_provided_service_instance type with NULL plan/service/broker. + db[:service_instances].insert(guid: 'upsi-guid', name: 'upsi', space_id: space_id, is_gateway_service: false) + + # A managed instance that already has a WAS_RUNNING row -> not duplicated. + db[:service_instances].insert(guid: 'existing-guid', name: 'existing', space_id: space_id, is_gateway_service: true, service_plan_id: plan_id) + seed_service_event('existing', state: 'WAS_RUNNING', service_instance_guid: 'existing-guid') + + # An unrelated pre-existing row that must be preserved (no truncate). + preexisting_id = seed_service_event('unrelated', state: 'CREATED', service_instance_guid: 'managed-guid') + + run_migration + + was_running = db[:service_usage_events].where(state: 'WAS_RUNNING') + # One row each for managed-guid and upsi-guid, plus the pre-seeded existing-guid row (not duplicated). + expect(was_running.count).to eq(3) + expect(was_running.where(service_instance_guid: 'existing-guid').count).to eq(1) + expect(db[:service_usage_events].where(id: preexisting_id).count).to eq(1) + + managed_row = was_running.where(service_instance_guid: 'managed-guid').first + expect(managed_row[:guid]).to be_present + expect(managed_row[:service_instance_name]).to eq('my-instance') + expect(managed_row[:service_instance_type]).to eq('managed_service_instance') + expect(managed_row[:service_plan_guid]).to eq('plan-main') + expect(managed_row[:service_plan_name]).to eq('plan-main') + expect(managed_row[:service_guid]).to eq('service-main') + expect(managed_row[:service_label]).to eq('service-main') + expect(managed_row[:service_broker_name]).to eq('broker-main') + expect(managed_row[:service_broker_guid]).to eq('broker-main') + expect(managed_row[:space_guid]).to eq('space-main') + expect(managed_row[:space_name]).to eq('space-main') + expect(managed_row[:org_guid]).to eq('org-main') + + upsi_row = was_running.where(service_instance_guid: 'upsi-guid').first + expect(upsi_row[:service_instance_type]).to eq('user_provided_service_instance') + expect(upsi_row[:service_plan_guid]).to be_nil + expect(upsi_row[:service_plan_name]).to be_nil + expect(upsi_row[:service_guid]).to be_nil + expect(upsi_row[:service_label]).to be_nil + expect(upsi_row[:service_broker_name]).to be_nil + expect(upsi_row[:service_broker_guid]).to be_nil + + # Idempotency of the seeding itself (the NOT EXISTS guard) is covered in + # spec/unit/lib/database/was_running_backfill_spec.rb, where seed_service_usage_events + # runs twice; the migrator does not re-apply an already-recorded migration. + end + + context 'when skip_was_running_backfill is set' do + before do + allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_was_running_backfill).and_return(true) + end + + it 'does not seed any WAS_RUNNING rows' do + space_id = seed_space('main') + plan_id = seed_plan('main') + db[:service_instances].insert(guid: 'managed-guid', name: 'my-instance', space_id: space_id, is_gateway_service: true, service_plan_id: plan_id) + + run_migration + + expect(db[:service_usage_events].where(state: 'WAS_RUNNING').count).to eq(0) + end + end + end + + describe 'down migration' do + it 'removes only the WAS_RUNNING rows' do + space_id = seed_space('main') + plan_id = seed_plan('main') + db[:service_instances].insert(guid: 'managed-guid', name: 'my-instance', space_id: space_id, is_gateway_service: true, service_plan_id: plan_id) + unrelated_id = seed_service_event('unrelated', state: 'CREATED', service_instance_guid: 'managed-guid') + + run_migration + expect(db[:service_usage_events].where(state: 'WAS_RUNNING').count).to eq(1) + + revert_migration + expect(db[:service_usage_events].where(state: 'WAS_RUNNING').count).to eq(0) + expect(db[:service_usage_events].where(id: unrelated_id).count).to eq(1) + end + end +end diff --git a/spec/migrations/20260601120300_seed_was_running_task_usage_events_spec.rb b/spec/migrations/20260601120300_seed_was_running_task_usage_events_spec.rb new file mode 100644 index 00000000000..e701c883c8e --- /dev/null +++ b/spec/migrations/20260601120300_seed_was_running_task_usage_events_spec.rb @@ -0,0 +1,117 @@ +require 'spec_helper' +require 'migrations/helpers/migration_shared_context' + +RSpec.describe 'migration to seed TASK_WAS_RUNNING events for currently-running tasks', isolation: :truncation, type: :migration do + include_context 'migration' do + let(:migration_filename) { '20260601120300_seed_was_running_task_usage_events.rb' } + end + + let(:run_migration) do + Sequel::Migrator.run(db, migrations_path, target: current_migration_index, allow_missing_migration_files: true) + end + + let(:revert_migration) do + Sequel::Migrator.run(db, migrations_path, target: current_migration_index - 1, allow_missing_migration_files: true) + end + + # Builds an org/space/app scaffold and returns the app guid that tasks can reference. + def seed_app(suffix) + quota_id = db[:quota_definitions].insert(guid: "quota-#{suffix}", name: "quota-#{suffix}", non_basic_services_allowed: true, + total_services: 10, memory_limit: 1024, total_routes: 10) + org_id = db[:organizations].insert(guid: "org-#{suffix}", name: "org-#{suffix}", quota_definition_id: quota_id) + db[:spaces].insert(guid: "space-#{suffix}", name: "space-#{suffix}", organization_id: org_id) + db[:apps].insert(guid: "app-#{suffix}", name: "app-#{suffix}", space_guid: "space-#{suffix}") + "app-#{suffix}" + end + + def seed_task(suffix, state:, app_guid:, memory_in_mb: 256) + db[:tasks].insert(guid: "task-#{suffix}", name: "task-#{suffix}", command: 'bundle exec rake', state: state, + app_guid: app_guid, droplet_guid: "droplet-#{suffix}", memory_in_mb: memory_in_mb) + end + + def seed_task_event(suffix, state:, task_guid:) + db[:app_usage_events].insert(guid: "event-#{suffix}", created_at: Time.now.utc, state: state, + instance_count: 1, memory_in_mb_per_instance: 1, app_guid: '', app_name: '', + space_guid: "space-#{suffix}", space_name: "space-#{suffix}", org_guid: "org-#{suffix}", + task_guid: task_guid, task_name: "task-#{suffix}") + end + + describe 'up migration' do + it 'seeds TASK_WAS_RUNNING rows only for running tasks, skips finished ones, and preserves existing rows' do + app_guid = seed_app('main') + + seed_task('running', state: 'RUNNING', app_guid: app_guid, memory_in_mb: 512) + seed_task('succeeded', state: 'SUCCEEDED', app_guid: app_guid) + seed_task('pending', state: 'PENDING', app_guid: app_guid) + + # A running task that already has a TASK_WAS_RUNNING row -> not duplicated + seed_task('existing', state: 'RUNNING', app_guid: app_guid) + seed_task_event('existing', state: 'TASK_WAS_RUNNING', task_guid: 'task-existing') + + # An unrelated pre-existing row that must be preserved (no truncate) + preexisting_id = seed_task_event('unrelated', state: 'TASK_STARTED', task_guid: 'some-other-task') + + run_migration + + task_was_running = db[:app_usage_events].where(state: 'TASK_WAS_RUNNING') + # One row for task-running, plus the pre-seeded task-existing row (not duplicated). + expect(task_was_running.count).to eq(2) + expect(task_was_running.where(task_guid: 'task-succeeded').count).to eq(0) + expect(task_was_running.where(task_guid: 'task-pending').count).to eq(0) + expect(task_was_running.where(task_guid: 'task-existing').count).to eq(1) + expect(db[:app_usage_events].where(id: preexisting_id).count).to eq(1) + + row = task_was_running.where(task_guid: 'task-running').first + expect(row[:guid]).to be_present + expect(row[:previous_state]).to be_nil + expect(row[:task_name]).to eq('task-running') + expect(row[:app_guid]).to eq('') + expect(row[:app_name]).to eq('') + expect(row[:parent_app_guid]).to eq(app_guid) + expect(row[:parent_app_name]).to eq('app-main') + expect(row[:space_guid]).to eq('space-main') + expect(row[:space_name]).to eq('space-main') + expect(row[:org_guid]).to eq('org-main') + expect(row[:instance_count]).to eq(1) + expect(row[:previous_instance_count]).to eq(1) + expect(row[:memory_in_mb_per_instance]).to eq(512) + expect(row[:previous_memory_in_mb_per_instance]).to eq(512) + expect(row[:package_state]).to eq('STAGED') + expect(row[:previous_package_state]).to eq('STAGED') + + # Idempotency of the seeding itself (the NOT EXISTS guard) is covered in + # spec/unit/lib/database/was_running_backfill_spec.rb, where seed_task_usage_events + # runs twice; the migrator does not re-apply an already-recorded migration. + end + + context 'when skip_was_running_backfill is set' do + before do + allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_was_running_backfill).and_return(true) + end + + it 'does not seed any TASK_WAS_RUNNING rows' do + app_guid = seed_app('main') + seed_task('skip', state: 'RUNNING', app_guid: app_guid) + + run_migration + + expect(db[:app_usage_events].where(state: 'TASK_WAS_RUNNING').count).to eq(0) + end + end + end + + describe 'down migration' do + it 'removes only the TASK_WAS_RUNNING rows' do + app_guid = seed_app('main') + seed_task('down', state: 'RUNNING', app_guid: app_guid) + unrelated_id = seed_task_event('unrelated', state: 'TASK_STARTED', task_guid: 'some-other-task') + + run_migration + expect(db[:app_usage_events].where(state: 'TASK_WAS_RUNNING').count).to eq(1) + + revert_migration + expect(db[:app_usage_events].where(state: 'TASK_WAS_RUNNING').count).to eq(0) + expect(db[:app_usage_events].where(id: unrelated_id).count).to eq(1) + end + end +end diff --git a/spec/migrations/helpers/bigint_migration_step1_shared_context.rb b/spec/migrations/helpers/bigint_migration_step1_shared_context.rb index 81b48f484a7..3b34a4af58b 100644 --- a/spec/migrations/helpers/bigint_migration_step1_shared_context.rb +++ b/spec/migrations/helpers/bigint_migration_step1_shared_context.rb @@ -10,6 +10,9 @@ let(:skip_bigint_id_migration) { nil } before do + # Default so config reads from other migrations during the shared after-hook's forward run + # (e.g. the WAS_RUNNING backfill's skip? check) don't trip these strict argument matchers. + allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).and_call_original allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_bigint_id_migration).and_return(skip_bigint_id_migration) allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:migration_psql_concurrent_statement_timeout_in_seconds).and_return(300) end diff --git a/spec/migrations/helpers/bigint_migration_step3_shared_context.rb b/spec/migrations/helpers/bigint_migration_step3_shared_context.rb index 2378c849909..5f9832d02bd 100644 --- a/spec/migrations/helpers/bigint_migration_step3_shared_context.rb +++ b/spec/migrations/helpers/bigint_migration_step3_shared_context.rb @@ -14,6 +14,9 @@ let(:logger) { double(:logger, info: nil) } before do + # Default so config reads from other migrations during the shared after-hook's forward run + # (e.g. the WAS_RUNNING backfill's skip? check) don't trip these strict argument matchers. + allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).and_call_original allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_bigint_id_migration).and_return(skip_bigint_id_migration) allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:migration_psql_concurrent_statement_timeout_in_seconds).and_return(300) end @@ -119,6 +122,9 @@ let(:logger) { double(:logger, info: nil) } before do + # Default so config reads from other migrations during the shared after-hook's forward run + # (e.g. the WAS_RUNNING backfill's skip? check) don't trip these strict argument matchers. + allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).and_call_original allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_bigint_id_migration).and_return(skip_bigint_id_migration) allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:migration_psql_concurrent_statement_timeout_in_seconds).and_return(300) end diff --git a/spec/unit/lib/database/was_running_backfill_spec.rb b/spec/unit/lib/database/was_running_backfill_spec.rb new file mode 100644 index 00000000000..baa016b0295 --- /dev/null +++ b/spec/unit/lib/database/was_running_backfill_spec.rb @@ -0,0 +1,247 @@ +require 'spec_helper' +require 'database/was_running_backfill' + +RSpec.describe VCAP::WasRunningBackfill do + let(:db) { Sequel::Model.db } + let(:logger) { double(Steno::Logger, info: nil) } + + def was_running + db[:service_usage_events].where(state: 'WAS_RUNNING') + end + + def app_was_running + db[:app_usage_events].where(state: 'WAS_RUNNING') + end + + def task_was_running + db[:app_usage_events].where(state: 'TASK_WAS_RUNNING') + end + + describe 'WAS_RUNNING' do + # The backfill is raw SQL (no CC code), so it can't reference the repository + # constants directly. This guard catches any drift between the literal the + # backfill writes and the state value the repositories/cleanup recognise. + it 'matches the state value used by the usage event repositories' do + expect(described_class::WAS_RUNNING).to eq(VCAP::CloudController::Repositories::AppUsageEventRepository::WAS_RUNNING_EVENT_STATE) + expect(described_class::WAS_RUNNING).to eq(VCAP::CloudController::Repositories::ServiceUsageEventRepository::WAS_RUNNING_EVENT_STATE) + expect(described_class::TASK_WAS_RUNNING).to eq(VCAP::CloudController::Repositories::AppUsageEventRepository::TASK_WAS_RUNNING_EVENT_STATE) + end + end + + describe '.skip?' do + let(:skip_was_running_backfill) { nil } + + before do + allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_was_running_backfill).and_return(skip_was_running_backfill) + end + + context 'when skip_was_running_backfill is false' do + let(:skip_was_running_backfill) { false } + + it 'returns false' do + expect(described_class.skip?).to be(false) + end + end + + context 'when skip_was_running_backfill is true' do + let(:skip_was_running_backfill) { true } + + it 'returns true' do + expect(described_class.skip?).to be(true) + end + end + + context 'when skip_was_running_backfill is nil' do + let(:skip_was_running_backfill) { nil } + + it 'returns false' do + expect(described_class.skip?).to be(false) + end + end + + context 'when reading the config raises InvalidConfigPath' do + before do + allow_any_instance_of(VCAP::CloudController::Config).to receive(:get).with(:skip_was_running_backfill).and_raise(VCAP::CloudController::Config::InvalidConfigPath) + end + + it 'returns false rather than aborting the migration' do + expect(described_class.skip?).to be(false) + end + end + end + + describe '.seed_app_usage_events' do + it 'seeds one WAS_RUNNING row per started process across batches, idempotently, skipping stopped processes' do + started1 = VCAP::CloudController::ProcessModelFactory.make(state: 'STARTED') + started2 = VCAP::CloudController::ProcessModelFactory.make(state: 'STARTED') + VCAP::CloudController::ProcessModelFactory.make(state: 'STOPPED') + + # batch_size: 1 forces the keyset loop to iterate once per process. + described_class.seed_app_usage_events(db, logger, batch_size: 1) + + expect(app_was_running.select_map(:app_guid)).to contain_exactly(started1.guid, started2.guid) + expect { described_class.seed_app_usage_events(db, logger, batch_size: 1) }.not_to change(app_was_running, :count) + end + + it 'seeds a separate row per process when an app has multiple started processes' do + app = VCAP::CloudController::AppModel.make + web = VCAP::CloudController::ProcessModelFactory.make(app: app, type: 'web', state: 'STARTED') + worker = VCAP::CloudController::ProcessModelFactory.make(app: app, type: 'worker', state: 'STARTED') + + described_class.seed_app_usage_events(db, logger, batch_size: 1) + + scope = app_was_running.where(parent_app_guid: app.guid) + expect(scope.select_map(:app_guid)).to contain_exactly(web.guid, worker.guid) + expect(scope.select_map(:process_type)).to contain_exactly('web', 'worker') + end + + it 'tolerates legacy NULLs in nullable process and app columns' do + process = VCAP::CloudController::ProcessModelFactory.make(state: 'STARTED') + # Bypass the model layer, which would backfill these defaults. + db[:processes].where(guid: process.guid).update(memory: nil, instances: nil) + + described_class.seed_app_usage_events(db, logger) + + row = app_was_running.first(app_guid: process.guid) + expect(row[:memory_in_mb_per_instance]).to eq(0) + expect(row[:instance_count]).to eq(0) + end + + it 'sweeps WAS_RUNNING rows whose process is not running, e.g. apps stopped concurrently with the backfill' do + running = VCAP::CloudController::ProcessModelFactory.make(state: 'STARTED') + stopped = VCAP::CloudController::ProcessModelFactory.make(state: 'STOPPED') + VCAP::CloudController::AppUsageEvent.make(state: 'WAS_RUNNING', app_guid: stopped.guid) + VCAP::CloudController::AppUsageEvent.make(state: 'WAS_RUNNING', app_guid: 'no-such-process') + + described_class.seed_app_usage_events(db, logger) + + expect(app_was_running.select_map(:app_guid)).to contain_exactly(running.guid) + end + end + + describe '.delete_app_usage_events' do + it 'batch-deletes only WAS_RUNNING rows' do + VCAP::CloudController::ProcessModelFactory.make(state: 'STARTED') + described_class.seed_app_usage_events(db, logger, batch_size: 1) + started = VCAP::CloudController::AppUsageEvent.make(state: 'STARTED') + + described_class.delete_app_usage_events(db, batch_size: 1) + + expect(app_was_running.count).to eq(0) + expect(db[:app_usage_events].where(guid: started.guid).count).to eq(1) + end + end + + describe '.seed_task_usage_events' do + it 'seeds one TASK_WAS_RUNNING row per running task across batches, idempotently, skipping completed tasks' do + running1 = VCAP::CloudController::TaskModel.make(state: 'RUNNING', memory_in_mb: 256) + running2 = VCAP::CloudController::TaskModel.make(state: 'RUNNING') + VCAP::CloudController::TaskModel.make(state: 'SUCCEEDED') + + # batch_size: 1 forces the keyset loop to iterate once per task. + described_class.seed_task_usage_events(db, logger, batch_size: 1) + + expect(task_was_running.select_map(:task_guid)).to contain_exactly(running1.guid, running2.guid) + + row = task_was_running.first(task_guid: running1.guid) + expect(row[:previous_state]).to be_nil + expect(row[:task_name]).to eq(running1.name) + expect(row[:app_guid]).to eq('') + expect(row[:app_name]).to eq('') + expect(row[:parent_app_guid]).to eq(running1.app.guid) + expect(row[:parent_app_name]).to eq(running1.app.name) + expect(row[:instance_count]).to eq(1) + expect(row[:memory_in_mb_per_instance]).to eq(256) + expect(row[:previous_memory_in_mb_per_instance]).to eq(256) + expect(row[:package_state]).to eq('STAGED') + expect(row[:previous_package_state]).to eq('STAGED') + expect(row[:space_guid]).to eq(running1.space.guid) + expect(row[:space_name]).to eq(running1.space.name) + expect(row[:org_guid]).to eq(running1.space.organization.guid) + + expect { described_class.seed_task_usage_events(db, logger, batch_size: 1) }.not_to change(task_was_running, :count) + end + + it 'tolerates a legacy NULL task memory' do + task = VCAP::CloudController::TaskModel.make(state: 'RUNNING') + # Bypass the model layer, which would backfill the default. + db[:tasks].where(guid: task.guid).update(memory_in_mb: nil) + + described_class.seed_task_usage_events(db, logger) + + row = task_was_running.first(task_guid: task.guid) + expect(row[:memory_in_mb_per_instance]).to eq(0) + end + + it 'sweeps TASK_WAS_RUNNING rows whose task is no longer running, e.g. tasks completed concurrently with the backfill' do + running = VCAP::CloudController::TaskModel.make(state: 'RUNNING') + completed = VCAP::CloudController::TaskModel.make(state: 'SUCCEEDED') + VCAP::CloudController::AppUsageEvent.make(state: 'TASK_WAS_RUNNING', task_guid: completed.guid) + VCAP::CloudController::AppUsageEvent.make(state: 'TASK_WAS_RUNNING', task_guid: 'no-such-task') + + described_class.seed_task_usage_events(db, logger) + + expect(task_was_running.select_map(:task_guid)).to contain_exactly(running.guid) + end + end + + describe '.delete_task_usage_events' do + it 'batch-deletes only TASK_WAS_RUNNING rows' do + task = VCAP::CloudController::TaskModel.make(state: 'RUNNING') + described_class.seed_task_usage_events(db, logger, batch_size: 1) + task_started = VCAP::CloudController::AppUsageEvent.make(state: 'TASK_STARTED', task_guid: task.guid) + app_baseline = VCAP::CloudController::AppUsageEvent.make(state: 'WAS_RUNNING', app_guid: 'some-process') + + described_class.delete_task_usage_events(db, batch_size: 1) + + expect(task_was_running.count).to eq(0) + expect(db[:app_usage_events].where(guid: task_started.guid).count).to eq(1) + expect(db[:app_usage_events].where(guid: app_baseline.guid).count).to eq(1) + end + end + + describe '.seed_service_usage_events' do + it 'seeds one WAS_RUNNING row per instance across multiple batches, with the right type, idempotently' do + managed = VCAP::CloudController::ManagedServiceInstance.make + upsi = VCAP::CloudController::UserProvidedServiceInstance.make + + # batch_size: 1 forces the keyset loop to iterate once per instance. + described_class.seed_service_usage_events(db, logger, batch_size: 1) + + expect(was_running.select_map(:service_instance_guid)).to contain_exactly(managed.guid, upsi.guid) + + managed_row = was_running.first(service_instance_guid: managed.guid) + expect(managed_row[:service_instance_type]).to eq('managed_service_instance') + expect(managed_row[:service_plan_guid]).to eq(managed.service_plan.guid) + expect(managed_row[:service_broker_name]).to eq(managed.service_plan.service.service_broker.name) + + upsi_row = was_running.first(service_instance_guid: upsi.guid) + expect(upsi_row[:service_instance_type]).to eq('user_provided_service_instance') + expect(upsi_row[:service_plan_guid]).to be_nil + + expect { described_class.seed_service_usage_events(db, logger, batch_size: 1) }.not_to change(was_running, :count) + end + + it 'sweeps WAS_RUNNING rows whose service instance no longer exists, e.g. instances deleted concurrently with the backfill' do + instance = VCAP::CloudController::ManagedServiceInstance.make + VCAP::CloudController::ServiceUsageEvent.make(state: 'WAS_RUNNING', service_instance_guid: 'no-such-instance') + + described_class.seed_service_usage_events(db, logger) + + expect(was_running.select_map(:service_instance_guid)).to contain_exactly(instance.guid) + end + end + + describe '.delete_service_usage_events' do + it 'batch-deletes only WAS_RUNNING rows' do + instance = VCAP::CloudController::ManagedServiceInstance.make + described_class.seed_service_usage_events(db, logger, batch_size: 1) + created = VCAP::CloudController::ServiceUsageEvent.make(state: 'CREATED', service_instance_guid: instance.guid) + + described_class.delete_service_usage_events(db, batch_size: 1) + + expect(was_running.count).to eq(0) + expect(db[:service_usage_events].where(guid: created.guid).count).to eq(1) + end + end +end From 15657b017222e1888b78100593057376d9c775bc Mon Sep 17 00:00:00 2001 From: David Riddle Date: Fri, 12 Jun 2026 16:43:30 -0500 Subject: [PATCH 4/4] Emit TASK_STOPPED usage events based on recorded start evidence create_stop_event_if_needed skipped the TASK_STOPPED event whenever the TASK_STARTED event was absent -- so a task whose start event had been pruned never got a stop event on completion, and a billing consumer that recorded the start would bill the task forever. Emit the stop when either piece of recorded start evidence exists: the TASK_STARTED event, or the TASK_WAS_RUNNING baseline seeded by the backfill for tasks that were already running when the keep-running cleanup was introduced. Between the two, a legitimately started task always has one -- the cleanup no longer prunes the start event of a running task, and the backfill covers tasks that had already lost theirs. When neither exists (e.g. a task canceled before it ever ran), no consumer ever saw the task start, so a stop event would be unmatched noise. --- app/models/runtime/task_model.rb | 13 ++++-- spec/unit/models/runtime/task_model_spec.rb | 46 +++++++++++++++++++++ 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/app/models/runtime/task_model.rb b/app/models/runtime/task_model.rb index bad22c00eef..576513c5f63 100644 --- a/app/models/runtime/task_model.rb +++ b/app/models/runtime/task_model.rb @@ -137,9 +137,16 @@ def create_start_event def create_stop_event_if_needed app_usage_repo = Repositories::AppUsageEventRepository.new - start_event = app_usage_repo.find_by_task_and_state(task: self, state: 'TASK_STARTED') - existing_stop_event = app_usage_repo.find_by_task_and_state(task: self, state: 'TASK_STOPPED') - return if start_event.nil? || existing_stop_event.present? + return if app_usage_repo.find_by_task_and_state(task: self, state: 'TASK_STOPPED').present? + + # Record the stop only when there is recorded evidence that the task + # started: the TASK_STARTED event, or the TASK_WAS_RUNNING baseline seeded + # for tasks that were already running when the keep-running cleanup was + # introduced. Without either, no consumer ever saw the task start, so a + # stop event would be unmatched noise. + started = app_usage_repo.find_by_task_and_state(task: self, state: 'TASK_STARTED') || + app_usage_repo.find_by_task_and_state(task: self, state: 'TASK_WAS_RUNNING') + return if started.nil? create_stop_event end diff --git a/spec/unit/models/runtime/task_model_spec.rb b/spec/unit/models/runtime/task_model_spec.rb index fe1e5790c45..b7c07975404 100644 --- a/spec/unit/models/runtime/task_model_spec.rb +++ b/spec/unit/models/runtime/task_model_spec.rb @@ -28,6 +28,30 @@ module VCAP::CloudController expect(event.task_guid).to eq(task.guid) expect(event.parent_app_guid).to eq(task.app.guid) end + + context 'when the TASK_STARTED event has been pruned and a TASK_WAS_RUNNING baseline exists' do + let!(:start_event) { AppUsageEvent.make(task_guid: task.guid, state: 'TASK_WAS_RUNNING') } + + it 'still creates a TASK_STOPPED event' do + task.update(state: TaskModel::SUCCEEDED_STATE) + + event = AppUsageEvent.find(task_guid: task.guid, state: 'TASK_STOPPED') + expect(event).not_to be_nil + expect(event.task_guid).to eq(task.guid) + expect(event.parent_app_guid).to eq(task.app.guid) + end + end + + context 'when there is neither a TASK_STARTED event nor a TASK_WAS_RUNNING baseline' do + let!(:start_event) { nil } + + it 'does not create a TASK_STOPPED event no consumer could match to a start' do + task.update(state: TaskModel::SUCCEEDED_STATE) + + event = AppUsageEvent.find(task_guid: task.guid, state: 'TASK_STOPPED') + expect(event).to be_nil + end + end end context 'when the task is moving to the FAILED_STATE' do @@ -43,6 +67,28 @@ module VCAP::CloudController expect(event.task_guid).to eq(task.guid) expect(event.parent_app_guid).to eq(task.app.guid) end + + context 'when the TASK_STARTED event has been pruned and a TASK_WAS_RUNNING baseline exists' do + let!(:start_event) { AppUsageEvent.make(task_guid: task.guid, state: 'TASK_WAS_RUNNING') } + + it 'still creates a TASK_STOPPED event' do + task.update(state: TaskModel::FAILED_STATE) + + event = AppUsageEvent.find(task_guid: task.guid, state: 'TASK_STOPPED') + expect(event).not_to be_nil + end + end + + context 'when there is neither a TASK_STARTED event nor a TASK_WAS_RUNNING baseline' do + let!(:start_event) { nil } + + it 'does not create a TASK_STOPPED event no consumer could match to a start' do + task.update(state: TaskModel::FAILED_STATE) + + event = AppUsageEvent.find(task_guid: task.guid, state: 'TASK_STOPPED') + expect(event).to be_nil + end + end end context 'when the task is moving from the PENDING state' do