Share

Loading Data to a Snowflake Data Warehouse

The Pipeline stages your data in Hevo’s S3 bucket, from where it is finally loaded to your Snowflake Destination.

This section describes the queries for loading data into a Snowflake data warehouse. It assumes that you are familiar with Hevo’s process for Loading Data to a Data Warehouse.

Note: The queries listed here have been simplified to facilitate understanding of the overall data deduplication and loading process.

Loading Data with Higher Precision and Scale

For a decimal, numeric data type, the total number of digits in a number is called precision, and the number of digits to the right of the decimal is called scale.

Snowflake supports a maximum precision and scale of 38 and 37, respectively. The precision does not affect storage because it is always fixed, whereas the scale can impact storage because the column has to be sized accordingly. Hence, while loading data to the Destination table, Hevo sets the precision of every number column to the maximum value of 38 and the scale of each column to the highest scale found in the Events. This is done across all Pipelines with Snowflake as their Destination.

Scenario 1: Source data with precision greater than 38.

Let us suppose you have a column in your Destination table with precision and scale of 38 and 5, respectively, and the Source data contains an Event with precision and scale of 43 and 5, respectively.

When Hevo tries to load this data, it results in an error since Snowflake does not support a precision greater than 38. You must manually reduce the precision to less than or equal to 38 in the Source to load the data to the Destination successfully.

Scenario 2: Source data with a scale higher than the current scale of the Destination column.

Let us suppose you have a column in your Destination table with precision and scale of 38 and 5, respectively, and the Source data contains an Event with precision and scale of 38 and 10, respectively.

When Hevo tries to load this data, it results in an error since Snowflake does not support resizing the scale from a lower value to a higher value. Read Alter Table. For such cases, from Release 1.82 onwards, Hevo tries to resize the scale of the column in your Destination table for all new and existing Pipelines with Snowflake as a Destination. You need to contact Hevo Support to enable this feature.

Note: Resizing the scale may incur extra costs at the Destination level due to its effect on storage.

After enabling the feature, Hevo tries to resize the scale using the following steps:

  1. Add a temporary column in the Destination table:

    ALTER TABLE <table_name> ADD COLUMN <temp_column_name> decimal(38,10);
    
  2. Copy the data from the original column to the temporary column:

    UPDATE <table_name> SET <temp_column_name> = <column_name>;
    
  3. Drop the original column:

    ALTER TABLE <table_name> DROP <column_name>;
    
  4. Rename the temporary column to the original column:

    ALTER TABLE <table_name> RENAME COLUMN <temp_column_name> TO <column_name>;
    

Loading Data without Primary Keys

If primary keys are not present in the Destination tables, Hevo directly appends the data into the target tables in the Destination warehouse using the following steps:

  1. Apply the __hevo_ingested_at timestamp to each Event at the time of ingestion from Source. This column is retained in the Destination table also.

  2. Ensure __hevo_ingested_at and, if required, __hevo__loaded_at columns are present in the Destination table, else, create them.

  3. Copy files into the target table:

    1. Create a temporary external stage (for example, temp_stage_name) in Snowflake, which is a reference to the location in Amazon S3 buckets of the data files to be loaded:

      CREATE TEMPORARY STAGE <temp_stage_name>
      URL = 's3://<bucket>[/<path>/]'
      CREDENTIALS = ( <AWS_CREDENTIALS> )
      ENCRYPTION = ( TYPE = 'AWS_CSE'
                     MASTER_KEY = '<string>' )
      FILE_FORMAT = ( TYPE = CSV
                      COMPRESSION = GZIP
                      FIELD_OPTIONALLY_ENCLOSED_BY = ''''
                      BINARY_FORMAT = BASE64 )
      COPY_OPTIONS = ( ON_ERROR = SKIP_FILE );
      
    2. Copy data into the target table:

      COPY INTO <target_table>
         (FIELD_1, FIELD_2,..., FIELD_n)
      FROM
         (SELECT T.FIELD_1, T.FIELD_2,..., T.FIELD_n
            FROM @<temp_stage_name> T)
      

Loading Data with Primary Keys

If the Destination tables provide primary keys, Hevo performs the following steps to deduplicate and load the data to the data warehouse.

  1. Apply the __hevo_ingested_at timestamp to each Event at the time of ingestion from Source. This column is retained in the Destination table also.

  2. Ensure __hevo_ingested_at, and if required, __hevo__loaded_at columns are present in the Destination table, else, create them.

  3. Create a temporary, flat staging table:

    CREATE TEMP TABLE <stage_table> LIKE <target_table>;
    
  4. Copy files to the staging table:

    1. Create a temporary external stage (for example, temp_stage_name) in Snowflake, which is a reference to the location in Amazon S3 buckets of the data files to be loaded:

      CREATE TEMPORARY STAGE <temp_stage_name>
      URL = 's3://<bucket>[/<path>/]'
      CREDENTIALS = ( <AWS_CREDENTIALS> )
      ENCRYPTION = ( TYPE = 'AWS_CSE'
                     MASTER_KEY = '<string>' )
      FILE_FORMAT = ( TYPE = CSV
                      COMPRESSION = GZIP
                      FIELD_OPTIONALLY_ENCLOSED_BY = ''''
                      BINARY_FORMAT = BASE64 )
      COPY_OPTIONS = ( ON_ERROR = SKIP_FILE );
      
    2. Copy data into the staging table:

      COPY INTO <stage_table>
         (FIELD_1, FIELD_2,..., FIELD_n)
      FROM
         (SELECT T.FIELD_1, T.FIELD_2,..., T.FIELD_n
            FROM @<temp_stage_name> T)
      
  5. Add the Hevo-reserved meta columns to the staging table. These are used to identify the latest Event in corner cases and special scenarios:

    ALTER TABLE <stage_table> ADD COLUMN __HE__MSG_SEQ_ID BIGINT DEFAULT 0;
    
    ALTER TABLE <stage_table> ADD COLUMN __HEVO__CONSUMPTION_ID BIGINT DEFAULT 0;
    
    ALTER TABLE <stage_table> ADD COLUMN __HEVO__MARKED_DELETED BOOLEAN DEFAULT NULL;
    
  6. Delete duplicate data from the staging table on basis of __hevo__ingested_at, __he__msg_seq_id, and __hevo__consumption_id:

    DELETE FROM <stage_table>
     WHERE (<PK1, PK2, ...PKn>, __HEVO__INGESTED_AT)
             NOT IN (SELECT <PK1, PK2, ...PKn>, max(__HEVO__INGESTED_AT)
                       FROM <stage_table>
                     GROUP BY <PK1, PK2, ...PKn>);
    
    DELETE FROM <stage_table>
     WHERE (<PK1, PK2, ...PKn>, __HE__MSG_SEQ_ID)
             NOT IN (SELECT <PK1, PK2, ...PKn>, max(__HE__MSG_SEQ_ID)
                       FROM <stage_table>
                     GROUP BY <PK1, PK2, ...PKn>);
    
    DELETE FROM <stage_table>
     WHERE (<PK1, PK2, ...PKn>, __HEVO__CONSUMPTION_ID)
             NOT IN (SELECT <PK1, PK2, ...PKn>, max(__HEVO__CONSUMPTION_ID)
                       FROM <stage_table>
                     GROUP BY <PK1, PK2, ...PKn>);
    
  7. Check if duplicates are still present:

    SELECT COUNT(*) AS DUPLICATE_COUNT
      FROM (SELECT <PK1, PK2, ...PKn>
              FROM <stage_table>
             GROUP BY <PK1, PK2, ...PKn> HAVING COUNT(*) >= 2);
    
  8. Load the data into the target table:

    Note: Snowflake uses a single Merge statement to handle all three operations of deletions, insertions, and updates to the Destination table.

    MERGE INTO <target_table> AS T
       USING <stage_table> AS S
         ON T.PK1 = S.PK1 AND
            T.PK2 = S.PK2 AND
            ...
            T.PKn = S.PKn
           WHEN MATCHED AND
                S.__HEVO__INGESTED_AT >= T.__HEVO__INGESTED_AT AND
                S.__HEVO__MARKED_DELETED IS NOT NULL AND
                S.__HEVO__MARKED_DELETED = true
             THEN UPDATE SET T.__HEVO__MARKED_DELETED = true,
                             T.__HEVO__INGESTED_AT = S.__HEVO__INGESTED_AT,
                             T.__HEVO__LOADED_AT = <loaded_at>
           WHEN MATCHED AND
                S.__HEVO__INGESTED_AT >= T.__HEVO__INGESTED_AT AND
                (S.__HEVO__MARKED_DELETED IS NULL OR S.__HEVO__MARKED_DELETED = false)
             THEN UPDATE SET T.FIELD_1 = S.FIELD_1,
                             T.FIELD_2 = S.FIELD_2,
                             ...
                             T.FIELD_n = S.FIELD_n,
                             T.__HEVO__LOADED_AT = <loaded_at>
           WHEN NOT MATCHED AND
                (S.__HEVO__MARKED_DELETED IS NULL OR S.__HEVO__MARKED_DELETED = false)
             THEN INSERT (FIELD_1,
                          FIELD_2,
                          ...
                          FIELD_n,
                          __HEVO__LOADED_AT)
                  VALUES (FIELD_1,
                          FIELD_2,
                          ...
                          FIELD_n,
                          <loaded_at>);
    
  9. Drop the staging table:

     DROP TABLE <stage_table>;
    

This completes the loading of data to the Snowflake data warehouse.


Last updated on Jun 05, 2023

Tell us what went wrong

Skip to the section