Setting Up and Using Snowpipe for Automated Data Loading in Snowflake

Setting Up Snowpipe: Streamlining data loading automatically in Snowflake.
Published
May 29, 2024
Author

Snowpipe is a serverless service from Snowflake that automatically loads data into Snowflake tables from files as soon as they become available in a stage. This tutorial will guide you through the foundational concepts, best practices, and step-by-step instructions for setting up a data pipeline using Snowpipe.

What is Snowpipe?

Snowpipe is a service provided by Snowflake that enables automatic data loading into Snowflake tables from files as they become available in a stage. It operates in micro-batches, ensuring data is available to users within minutes. Snowpipe leverages Snowflake-supplied compute resources and can be automated using cloud messaging, such as event notifications for cloud storage.

-- Example of a COPY statement for Snowpipe
CREATE OR REPLACE PIPE my_pipe
AUTO_INGEST = TRUE
AS COPY INTO my_table
FROM @my_stage
FILE_FORMAT = (TYPE = 'CSV');

This code defines a Snowpipe that automatically ingests data from a specified stage into a Snowflake table using a defined file format.

How does Snowpipe work?

Snowpipe loads data according to the COPY statement defined in a referenced pipe, which identifies the source location of the data files. It supports continuous, real-time, and batch loading. Snowpipe uses a file name and checksum to ensure that only new data is processed, and it can load data either continuously or in bulk.

  • Continuous Loading: Snowpipe loads data in small batches over time, ensuring that data is available to users within minutes.
  • Bulk Loading: Snowpipe can also load data all at once, which is useful for initial data loads or large data migrations.
  • Data Security: Snowpipe offers multiple approaches to data security, including end-to-end encryption with AES 256-bit encryption, column-level security, and row-level security.

How to Set Up a Data Pipeline Using Snowpipe

1. Create a Separate Database

Begin by creating a separate database to organize your data pipeline. This helps in managing and isolating the data specific to your pipeline.

CREATE DATABASE my_database;

This SQL command creates a new database named "my_database".

2. Create a Schema for Source Data

Next, create a schema within the database to hold your source data. Schemas help in organizing tables and other database objects.

CREATE SCHEMA my_schema;

This command creates a new schema named "my_schema" within the database.

3. Create a Table

Create a table to store the data that will be loaded by Snowpipe.

CREATE TABLE my_table (
id INT,
name STRING,
value FLOAT
);

This code creates a table named "my_table" with columns for id, name, and value.

4. Create a File Format

Define a file format that matches the structure of your data files.

CREATE FILE FORMAT my_csv_format
TYPE = 'CSV'
FIELD_OPTIONALLY_ENCLOSED_BY = '"';

This command creates a CSV file format with fields optionally enclosed by double quotes.

5. Create an External Stage

Create an external stage that points to an S3 location where your data files are stored.

CREATE STAGE my_stage
URL = 's3://my_bucket/data/'
CREDENTIALS = (AWS_KEY_ID = 'your_key_id' AWS_SECRET_KEY = 'your_secret_key');

This code creates an external stage named "my_stage" that points to an S3 bucket.

6. Create the Snowpipe

Create a Snowpipe to automate data loading from the external stage into the table.

CREATE OR REPLACE PIPE my_pipe
AUTO_INGEST = TRUE
AS COPY INTO my_table
FROM @my_stage
FILE_FORMAT = (FORMAT_NAME = 'my_csv_format');

This command creates a Snowpipe named "my_pipe" that automatically ingests data from the external stage into the table using the specified file format.

7. Monitor Data Loads

Monitor the data loads to ensure that the data is being ingested correctly.

SELECT * FROM table(information_schema.copy_history(table_name=>'my_table', start_time=>dateadd('hour', -1, current_timestamp())));

This query retrieves the copy history for the past hour to monitor recent data loads into "my_table".

Common Challenges and Solutions

While setting up and using Snowpipe, you may encounter some common challenges. Here are solutions to address them:

  • File Sizing: Ensure that your files are between 100–250 MB to optimize processing. Aggregate smaller files and split larger ones as needed.
  • Semi-structured Data: Use the STRIP_NULL_VALUES file format option to remove null values from object and array elements, improving data quality.
  • Security Configuration: Properly configure IAM policies and event notifications to ensure secure and automated data loading.

How Does Secoda Integrate with Snowflake for Enhanced Data Discovery?

Secoda's integration with Snowflake enhances data discovery by connecting its data catalog to Snowflake. This integration enables users to search, index, and discover data more efficiently, automate data preparation and governance, analyze data within Snowflake, visualize data lineage, and map and track data flows, sources, and targets. By leveraging Secoda's capabilities, users can streamline their data management processes and gain deeper insights into their data assets.

  • Data Discovery: Secoda allows users to search, index, and discover data within Snowflake, making it easier to find relevant datasets and understand their structure and content.
  • Data Preparation and Governance: The integration automates data preparation and governance tasks, ensuring that data is clean, well-organized, and compliant with regulatory requirements.
  • Data Lineage Visualization: Secoda provides tools to visualize data lineage, helping users understand how data flows through their systems, from source to target, and identify any dependencies or transformations.

How to Integrate Secoda with Snowflake

1. Connect Secoda to Snowflake

Begin by connecting Secoda to your Snowflake instance. This involves configuring the necessary credentials and permissions to allow Secoda to access your Snowflake data.

-- Example of granting access to Secoda
GRANT USAGE ON DATABASE my_database TO ROLE secoda_role;
GRANT USAGE ON SCHEMA my_schema TO ROLE secoda_role;
GRANT SELECT ON ALL TABLES IN SCHEMA my_schema TO ROLE secoda_role;

This SQL code grants the necessary permissions to the "secoda_role" to access the database and schema in Snowflake.

2. Configure Data Catalog

Next, configure the data catalog in Secoda to index and catalog the data stored in Snowflake. This involves setting up the data sources and defining the metadata that will be collected.

-- Example of configuring data source in Secoda
{
"data_source": "snowflake",
"database": "my_database",
"schema": "my_schema",
"role": "secoda_role",
"warehouse": "my_warehouse"
}

This JSON configuration sets up Snowflake as a data source in Secoda, specifying the database, schema, role, and warehouse to be used.

3. Automate Data Preparation and Governance

Leverage Secoda's automation capabilities to streamline data preparation and governance tasks. This includes setting up workflows and rules to ensure data quality and compliance.

-- Example of a data preparation workflow
{
"workflow": "data_preparation",
"tasks": [
{"task": "clean_data", "parameters": {"remove_nulls": true}},
{"task": "validate_data", "parameters": {"schema": "my_schema"}}
]
}

This JSON configuration defines a data preparation workflow in Secoda, specifying tasks to clean and validate data.

4. Analyze Data with Snowflake

Use Secoda's integration with Snowflake to analyze data directly within Snowflake. This involves running queries and generating insights based on the indexed and cataloged data.

-- Example of a query to analyze data
SELECT name, COUNT(*) as count
FROM my_table
GROUP BY name
ORDER BY count DESC;

This SQL query analyzes data in Snowflake by counting the occurrences of each name in "my_table" and ordering the results by count.

5. Visualize Data Lineage

Utilize Secoda's tools to visualize data lineage, helping you understand the flow of data through your systems and identify any dependencies or transformations.

-- Example of a data lineage visualization configuration
{
"lineage": {
"source": "my_table",
"target": "analysis_table",
"transformations": ["aggregation", "filtering"]
}
}

This JSON configuration sets up a data lineage visualization in Secoda, specifying the source and target tables and the transformations applied.

Snowflake Snowpipe Recap

In this tutorial, we covered the key concepts and steps for setting up and using Snowpipe for automated data loading in Snowflake. Here are the main takeaways:

  • Automated Data Loading: Snowpipe automates data loading from external stages into Snowflake tables, making data available in near real-time.
  • Data Security: Snowpipe supports end-to-end encryption, column-level security, and row-level security to protect sensitive data.
  • Best Practices: Follow best practices for file sizing, semi-structured data handling, and security configuration to optimize Snowpipe performance.

Keep reading

See all