Share

Loading Data to an Amazon Redshift Data Warehouse

The Pipeline stages your data in Hevo’s S3 bucket, from where it is finally loaded to your Amazon Redshift Destination.

This section describes the queries for loading data into an Amazon Redshift data warehouse. It assumes that you are familiar with Hevo’s process for Loading Data to a Data Warehouse.

Note: The queries listed here have been simplified to facilitate understanding of the overall data deduplication and loading process.


Loading Data without Primary Keys

If primary keys are not present in the Destination tables, Hevo directly appends the data into the target tables in the Destination warehouse using the following steps:

  1. Apply the __hevo_ingested_at timestamp to each Event at the time of ingestion from Source. This column is retained in the Destination table also.

  2. Ensure __hevo_ingested_at and, if required, __hevo__loaded_at columns are present in the Destination table, else, create them.

  3. Copy the data directly into the target table:

    COPY <target_table> FROM '<s3://path/to/manifest-file>'...;
    

Loading Data with Primary Keys

If the Destination tables provide for primary keys, Hevo performs the following steps to deduplicate and load the data to the data warehouse.

  1. Apply the __hevo_ingested_at timestamp to each Event at the time of ingestion from Source. This column is retained in the Destination table also.

  2. Ensure __hevo_ingested_at, and if required, __hevo__loaded_at columns are present in the Destination table, else, create them.

  3. Create a temporary staging table in the Destination with the same schema as the Destination table. The ingested data is loaded to this table and all the steps for its deduplication are performed on this table.

    CREATE TEMP TABLE <stage_table> (LIKE <target_table>);
    
  4. Add the Hevo-reserved meta columns to the staging table:

    ALTER TABLE <stage_table> ADD COLUMN __he__msg_seq_id BIGINT DEFAULT 0;
    
    ALTER TABLE <stage_table> ADD COLUMN __hevo__consumption_id BIGINT DEFAULT 0;
    
    ALTER TABLE <stage_table> ADD COLUMN __hevo__marked_deleted BOOLEAN DEFAULT NULL;
    
    
  5. Copy the ingested data along with the new columns into the staging table:

    COPY <stage_table> FROM '<s3://path/to/manifest-file>'...;
    
  6. Ensure __hevo__marked_deleted is present if data is streamed from a Source that captures deleted Events. For example, MySQL BinLog.

  7. Identify and remove duplicates:

    1. Get the count of duplicate Events:

      SELECT COUNT(*) AS duplicate_count
      FROM (SELECT <PK1, PK2, ...PKn>
          FROM <stage_table>
      GROUP BY <PK1, PK2, ...PKn> HAVING COUNT(*) >= 2);
      
    2. If duplicate data exists, delete the duplicates from the staging table on basis of __hevo__ingested_at, __he__msg_seq_id, and __hevo__consumption_id:

      DELETE FROM <stage_table>
      WHERE (<PK1, PK2, ...PKn>, __hevo__ingested_at)
          NOT IN (SELECT <PK1, PK2, ...PKn>, MAX(__hevo__ingested_at)
      FROM <stage_table>
      GROUP BY <PK1, PK2, ...PKn>);
      
      DELETE FROM <stage_table>
      WHERE (<PK1, PK2, ...PKn>, __he__msg_seq_id)
          NOT IN (SELECT <PK1, PK2, ...PKn>, MAX(__he__msg_seq_id)
      FROM <stage_table>
      GROUP BY <PK1, PK2, ...PKn>);
      
      DELETE FROM <stage_table>
      WHERE (<PK1, PK2, ...PKn>, __hevo__consumption_id)
          NOT IN (SELECT <PK1, PK2, ...PKn>, MAX(__hevo__consumption_id)
      FROM <stage_table>
      GROUP BY <PK1, PK2, ...PKn>);
      
  8. Remove stale data from the staging table (where __hevo__ingested_at timestamp is earlier than that of the Destination Event):

    DELETE FROM <stage_table> AS S
        USING <target_table> AS T
    WHERE T.PK1 = S.PK1
      AND T.PK2 = S.PK2
      ...
      AND T.PKn = S.PKn
      AND T.__hevo__ingested_at IS NOT NULL
      AND T.__hevo__ingested_at > S.__hevo__ingested_at;
    
  9. Clean-up the data in the staging table for loading, by removing the now redundant Hevo-reserved columns:

    ALTER TABLE $stage_table$ DROP COLUMN __he__msg_seq_id BIGINT DEFAULT 0
    ALTER TABLE $stage_table$ DROP COLUMN __hevo__consumption_id BIGINT DEFAULT 0
    
  10. Update, delete, and insert the eligible Events using the following queries.

    Note: After deduplication of data, if the latest record is a delete record, the __hevo__marked_deleted column is set to True for it in the Destination table.

    1. Check if there are deleted columns:

      SELECT COUNT(*) AS deleted_count
      FROM
        <stage_table>
      WHERE
        __hevo__marked_deleted = 1;
      
    2. Mark columns that are deleted, in the target table:

      UPDATE <target_table> AS T
        SET T.__hevo__marked_deleted = 1,
            T.__hevo__ingested_at = S.__hevo__ingested_at,
            T.__hevo__loaded_at = <loaded_at>
      FROM <stage_table> AS S
      WHERE S.__hevo__marked_deleted = 1
          AND S.PK1 = T.PK1
          AND S.PK2 = T.PK2
          ...
          AND S.PKn = T.PKn;
      
    3. Remove deleted rows from the staging table:

      DELETE
      FROM
        <stage_table>
      WHERE
        __hevo__marked_deleted = 1;
      
    4. Delete records from the target table that are also present in the staging table:

      DELETE FROM <target_table> AS T
          USING <stage_table> AS S 
      WHERE S.PK1 = T.PK1
          AND S.PK2 = T.PK2
          ...
          AND S.PKn = T.PKn;
      
    5. Insert rows in the target table:

      INSERT INTO <target_table> (field_1,
                                  field_2,
                                  ...
                                  field_n,
                                  __hevo__loaded_at)
      (SELECT field_1,
              field_2,
              ...
              field_n,
              <loaded_at>
      FROM <stage_table>);
      
  11. Drop the staging table:

    DROP TABLE <stage_table>;
    

This completes the loading of data to the Amazon Redshift data warehouse.



See Also

Last updated on May 30, 2023

Tell us what went wrong

Skip to the section