I had a requirement to export data from a BigQuery table to AlloyDB recently. BigQuery is a powerful datawarehouse, but querying it often can be costly, and if you need to be able to run low-latency analytical queries with predictable access patterns, such as a dashboard built in Grafana, BigQuery can be limiting as well. In these cases, AlloyDB might be a better choice.

AlloyDB is Google’s Postgres-compatible HTAP database, and includes an in-memory columnar engine for analytical workloads. The default number of connections is 1000, but this can scale up to an impressive 240,000.

The problem I had is that my data was already in BigQuery, so I needed a way to export that data and insert it into AlloyDB. BigQuery can export data in Parquet format to Google Cloud Storage, but AlloyDB (being a Postgres-compatible database) doesn’t natively support reading from Parquet files. This is where DuckDB comes to the rescue. DuckDB can not only read Parquet data from GCS, it can also attach to a Postgresql database and insert data into a table directly. And combining this with Airflow, you can schedule this export and import on a regular basis.

Creating a table in AlloyDB

There are lots of things to consider when creating the table in AlloyDB, such as partitioning based on a timestamp using pg_partman, but I’m not going into those details here. What’s important to keep in mind is the conversion from BigQuery data types to Postgres data types (via Parquet.) This can be tricky. The BigQuery documentation has a conversion table for BigQuery to Parquet. My table didn’t have any complex data types (such as nested and repeated fields), but it did have STRING columns which needed to be converted to VARCHAR(n) in AlloyDB, where n is the length limit (number of characters) it can store. So I had to do some exploration in BigQuery to find out what the maximum number of characters for each STRING column was.

Be extra careful with FLOAT64 and NUMERIC columns in BigQuery. These will have to be converted to decimal or numeric columns in AlloyDB and you don’t want to lose any precision.

Creating the table in AlloyDB is straightforward. You can use a tool like psql or pg_admin to run a CREATE TABLE statement:

CREATE TABLE IF NOT EXISTS schema.table
(
    string_col character varying(32) NOT NULL,
    ts timestamp without time zone NOT NULL,
    metric integer
);

Preparing the Airflow environment

We will be using DuckDB to read from Google Cloud Storage, so the Airflow workers need to have the correct Python packages. The packages are: duckdb, fsspec and gcsfs. If you’re on Cloud Composer, then fsspec and gcsfs are already available. You just need to add duckdb as a dependency.

Also, be sure to add the AlloyDB connection to the Airflow Connections. Create a new connection of type Postgres and fill in the details: host, schema, login, password, port.

Exporting the BigQuery table in Airflow

Exporting an entire table to Google Cloud Storage using Airflow can be done with the BigQueryToGCSOperator:

from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator

bigquery_to_gcs = BigQueryToGCSOperator(
    task_id="bigquery_to_gcs",
    source_project_dataset_table=f"{DATASET_NAME}.{TABLE}",
    destination_cloud_storage_uris=[f"gs://{BUCKET_NAME}/{BUCKET_FILE}"],
    export_format="PARQUET"
)

Of course, the table shouldn’t be too big or else DuckDB will run out of memory. If you run into this issue, you should extract the data out of BigQuery in increments. This can be done with the BigQueryInsertJobOperator in which you can run a EXPORT DATA query to export only the specific increment.

Loading the Parquet data into AlloyDB using DuckDB

To do this, we will create a function with will be called by the PythonOperator:

from airflow.hooks.base_hook import BaseHook
from airflow.operators.python import PythonOperator

def gcs_to_alloydb(conn_id, **kwargs):
    import duckdb
    from fsspec import filesystem

    conn_info = BaseHook.get_connection(conn_id)
    conn = duckdb.connect(":memory:")
    conn.execute("INSTALL postgres")

    fs = filesystem("gcs")
    conn.register_filesystem(fs)

    conn.execute(f"ATTACH '{conn_info.get_uri()}' AS alloydb (TYPE postgres)")
    conn.execute("INSERT INTO alloydb.schema.table SELECT * FROM read_parquet('gcs://bucket/path/to/*.parquet')")

gcs_to_alloydb = PythonOperator(
    task_id="gcs_to_alloydb",
    python_callable=gcs_to_alloydb,
    op_args=["conn_id"],
    provide_context=True
)


bigquery_to_gcs >> gcs_to_alloydb

Note: The above is just an example of how it could work. In reality, running this every day may cause duplicate data in your AlloyDB table. You may want to truncate the table every time before the INSERT statement, or something like that, to prevent duplicates.

Also be aware that the above example might not be 100% safe from SQL injection. But as long as you don’t work with user input the risk of this should be minimal.

Conclusion

DuckDB can be a powerful tool for basic ETL pipelines. While this use case might be very specific, the above example can be easily generalized. For example, you may have some Parquet data coming from a different source, that you want to insert into a normal Postgresql database. DuckDB can also handle CSV files, and attach to MySQL databases.

I hope you found this article helpful!