You’ll want to set up a cron job that prunes jobs at some regular cadence. Your screenshot has 20,000+ completed (succeeded and failed) jobs. In my experience with GCP Batch, anything around or more than 1300 jobs will make the UI (or gcloud) laggy.
I have a cron job in Cloud Scheduler that triggers a Cloud Run Job every 6 hours to sweep one GCP Project’s Batch jobs and delete anything older than X days (depends on the project)
Here’s essentially the code I use
import logging
import os
from queue import Queue
import threading
import time
import arrow
import click
from google.api_core.retry import Retry
from google.cloud import batch_v1 as batch
logger = logging.getLogger(__name__)
PAGE_SIZE = 1000
DEFAULT_PROJECT_REGION = f'projects/{os.environ["GCP_PROJECT_ID"]}/locations/{os.environ["GCP_REGION"]}'
DEFAULT_SEARCH_FILTER = 'status.state="SUCCEEDED" OR status.state="FAILED"'
DEFAULT_OLDER_THAN_DAYS = 7
DEFAULT_NUM_WORKERS = 8
def delete_job(the_queue, dry_run=False):
client = batch.BatchServiceClient()
operations = {}
def drain_operations():
for job_name, operation in operations.items():
operation.result()
logger.info(f'Finished deleting job: {job_name}')
operations.clear()
while True:
try:
job_name = the_queue.get()
if job_name is None:
drain_operations()
break
logger.info(f'Deleting job: {job_name}')
if not dry_run:
operations[job_name] = client.delete_job(name=job_name, retry=Retry())
if len(operations) >= 100:
drain_operations()
finally:
the_queue.task_done()
def run(
search_filter=DEFAULT_SEARCH_FILTER,
older_than_days=DEFAULT_OLDER_THAN_DAYS,
gcp_project_region=DEFAULT_PROJECT_REGION,
num_workers=DEFAULT_NUM_WORKERS,
dry_run=False,
):
older_than = arrow.utcnow().shift(days=-1 * abs(older_than_days))
logger.info(f'Deleting jobs from project region: {gcp_project_region}')
logger.info(f'Deleting jobs matching filter: {search_filter}')
logger.info(f'Deleting jobs older than: {older_than.isoformat()}')
logger.info(f'Deleting jobs with # of workers: {num_workers}')
the_queue = Queue()
workers = []
for x in range(num_workers):
worker = threading.Thread(
target=delete_job,
daemon=True,
args=[
the_queue,
],
kwargs=dict(
dry_run=dry_run,
),
)
worker.start()
workers.append(worker)
client = batch.BatchServiceClient()
request = batch.ListJobsRequest(
parent=gcp_project_region,
filter=search_filter,
page_size=PAGE_SIZE,
)
idx = 0
for job in client.list_jobs(request, retry=Retry()):
if arrow.get(job.update_time.rfc3339()) < older_than:
the_queue.put(job.name)
idx += 1
if idx % PAGE_SIZE == 0:
logger.info(f'Enqueued another {PAGE_SIZE} jobs for deletion')
while True:
count = the_queue.qsize()
logger.info(f'Remaining jobs to delete: {count}')
if count < 10:
break
time.sleep(30)
for w in workers:
the_queue.put(None)
the_queue.join()
logger.info('Finished deleting all jobs')
@click.command()
@click.option('--filter', 'search_filter', type=str, default=DEFAULT_SEARCH_FILTER)
@click.option('--older-than-days', 'older_than_days', type=int, default=DEFAULT_OLDER_THAN_DAYS, help='Delete jobs older than the number of days from today')
@click.option('--gcp-project-region', default=DEFAULT_PROJECT_REGION, help='GCP Project and Region in the following format: projects/GCP_PROJECT_ID/locations/GCP_REGION')
@click.option('--num-workers', default=DEFAULT_NUM_WORKERS, )
@click.option('--dry-run', is_flag=True)
def command(search_filter, older_than_days, gcp_project_region, num_workers, dry_run):
run(
search_filter=search_filter,
older_than_days=older_than_days,
gcp_project_region=gcp_project_region,
num_workers=num_workers,
dry_run=dry_run
)
if __name__ == '__main__':
command()
.