I am finding many repeated rows in BigQuery. I am reading several csvs as you see below, converting it into dataframe , appending data-frame and then loading it into BQ. I don’t think this process can anywhere give me duplication. Can you confirm?
storage_client = storage.Client()
bigquery_client = bigquery.Client(project=project_id)
blobs = storage_client.list_blobs(bucket_name)
csv_files = [blob.name for blob in blobs if blob.name.endswith('.csv')]
# Load each CSV into a DataFrame and store them in a list
dataframes = []
for file_name in csv_files:
df = load_csv_to_dataframe(storage_client, bucket_name, file_name)
dataframes.append(df)
merged_df = pd.concat(dataframes, ignore_index=True)
df_to_bq(merged_df, table_id, bigquery_client, dataset_id)
def load_csv_to_dataframe(storage_client, bucket_name, file_name):
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(file_name)
file_path = "gs://{}/{}".format(bucket_name, file_name)
df = pd.read_csv(file_path, encoding='unicode_escape')
df['file_name'] = file_name
return df
def df_to_bq(df, table_name, client, dataset_name):
dataset_ref = client.dataset(dataset_name)
table_ref = dataset_ref.table(table_name)
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE # Replace the existing table
job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
job.result() # Wait for the job to complete
Encountering duplicate rows in BigQuery can happen for several reasons, even when your process seems correct at first glance. Here are som potential causes and how to address them:
1. Duplicates in the Source CSV Files
Double-check Your CSVs: Carefully review your CSV files to ensure they don’t contain duplicate rows before loading them into BigQuery. Duplicates at this stage can propagate through your data pipeline.
Deduplicating CSVs: If duplicates are found within the CSVs, it’s crucial to clean them up. This can be done before loading or as a preprocessing step within your Python code:
WRITE_TRUNCATE vs. WRITE_APPEND: Your script uses WRITE_TRUNCATE for write_disposition, which overwrites the existing table each time. This approach prevents accumulation of duplicates across script executions but doesn’t address duplicates within your source data or those introduced during data processing.
Consider WRITE_APPEND if your goal is to add new data to existing data in BigQuery, ensuring you’re not re-adding existing rows.
3. Issues with the df_to_bq Function
Review for Hidden Logic: Ensure there’s no unintended logic within your df_to_bq function that might lead to duplication. The function, as provided, should not inherently cause duplicates unless the input DataFrame already contains them.
Debugging and Prevention Strategies
Inspect Intermediate Data: After merging DataFrames but before loading to BigQuery, inspect for duplicates:
merged_df = pd.concat(dataframes, ignore_index=True)
print("Number of duplicate rows:", merged_df.duplicated().sum())
Deduplicate Before BigQuery: If duplicates are found, remove them before loading to BigQuery:
Use Unique Identifiers for Deduplication: If your dataset has a unique identifier (or a combination of columns that can serve as one), consider using BigQuery’s DML statements to deduplicate after loading.
Example:
DELETE FROM `project.dataset.table_name`
WHERE ROW_NUMBER() OVER (PARTITION BY id_column, column2 ORDER BY timestamp_column DESC) > 1;
Additional Considerations
Alternative Deduplication Methods: For complex deduplication logic, BigQuery’s window functions (e.g., ROW_NUMBER()) provide powerful options.
Data Integrity: Define what constitutes a duplicate based on your dataset.
Performance and Cost: Deduplicating before BigQuery is generally more efficient.
Regular Data Validation: Implement regular checks to maintain data quality.