Snow Pipe

 Snow Pipe

What is Snowpipe :

Snowpipe is a database object in Snowflake
Copy command is always like a sql statement so we can't create an object. 
Snowpipe object will wrap the copy command always.
One snowpipe can't contain multiple copy commands
Here are few important options that are available when configuring a Snowpipe:
  • Name: This is the name of the Snowpipe.
  • Database and Schema: The database and schema where the Snowpipe will be created.
  • Enabled: Specifies whether the Snowpipe is enabled or not.
  • Description: An optional description of the Snowpipe.
  • Notification Channel: An optional notification channel that can be used to receive notifications when data is ingested by the Snowpipe.
  • Auto Ingest: Specifies whether the Snowpipe should automatically ingest data when it is added to the external source.
  • File Format: Specifies the file format of the data being ingested by the Snowpipe.
  • Compression: Specifies whether the data being ingested is compressed or not.
  • Maximum File Size: Specifies the maximum size of files that can be ingested by the Snowpipe.
  • Maximum Time Interval: Specifies the maximum time interval between file ingestion attempts.
  • Stage Location: Specifies the location of the internal stage where the data being ingested will be temporarily stored.
  • Named File Format: Specifies the name of a named file format object that defines the file format of the data being ingested. 
  • Named Stage: Specifies the name of a named stage object that specifies the location of the internal stage where the data being ingested will be temporarily stored.
  • Transformation: Specifies an optional SQL transformation that can be applied to the data being ingested.
  • Comment: An optional comment associated with the Snowpipe.
Recap on Copy Command :
There are bunch of options in copy command as well
we can run copy command in validation mode to check if files has any errors.
Copy command is an sql command in snowflake. So we will not have any object created for it.
Copy Command will store history for 14 days so we can avoid coping data multiple time (duplication ) can be avoided.
we can specify how to deal error when encountered . we have three options :
CONTINUE | SKIP_FILE | SKIP_FILE_<num> | 'SKIP_FILE_<num>%' | ABORT_STATEMENT
purge = true will remove the file from stage so we can avoid storage costs.
/*****************************************************************************/

We have already seen the external table in the other class. Now it is time to learn some additional topics on top of it that is called Snow pipe.

Snow Pipe is a service provided by Snowflake that automates the ingestion of data from external sources, such as Amazon S3 or Azure Blob Storage, into Snowflake tables. 

Snow pipe is considerd as real time or near time as it will be having few secods latency to complete the process of loading data.

Snow pipe can integrate and talk with SWS, GCP, Azure, Kafka also

It is almost same as External state with aditional two steps: 

External stage Process :

S3 =>  IAM Roles /Policies => Stage integration => File Format => Copy into Command=> table Load

In the case of Snow pipe we do follow the same process but we wrap Copy Into command with Snow pipe object and attach it to SQS service. The flow is going to be as below.

Snow Pipe Process : 

S3=>IAM Roles / Policies => SQS =>  Stage integration => File Format =>snowPipe(Copy into Command) => table Load

A pipe object always raped with copy into command.

Recap :

We have seen how to create S3 and integrate it with Snow External stage object.

S3 => storage integration < = > Named External Stage  = > External Table

So the long story short, the files that are stored in S3 will seamlessly  available in Named External Stage.

So up to now we created a External table on top of it.

So instead of using external table we want to load that data into table and need to make it an automated process.

So the deviation starts after creating Stage object. Please refer the External Table blog for better understanding.

Based on the discuss what we have in had :

S3 Bucket : s3://customers-snowpipe-integration-tiruven
IAM Policy : customers-snowpipe-integration-tiruven-policy
IAM Role : customers-snowpipe-integration-tiruven-role
storage integration Object : training_snowpipe_s3_integration (we can have any number of storage integration objects on one bucket. )
Snow Stage : emp_snowpipe_stage (we can have any number of stages on one stage integration object.)
File Format : emp_csv_format

-- What we need to do to create a snow pipe :
1. Create temp / transient table : emp_pipe_tbl
2. Create a pipe object :
CREATE OR REPLACE PIPE emp_snowpipe
AUTO_INGEST = True  -- the pipe will be in suspended mode if we make it false
AS
copy into customers
  from @emp_snowpipe_stage/
  file_format = (format_name = emp_csv_format)
  on_error = 'CONTINUE'; 

ALTER PIPE customers_snowpipe refresh; it is to process files on demand by keeping the pipe executioon paused it True

ALTER PIPE customers_snowpipe set pipe_execution_paused = true; -- True will pause the pipe false will resume the pipe.

show pipes; -- This will show you all the details of all pipes in the database.

Note : A pipe object is always wrapped with copy command. 

once the pipe is created give the command as below:

snow pipes;

copy  notification_channel informaion and update this in s2 bucket's 

Edit event notification


select the options radio buttons 
sqs Queue

Enter SQS queue ARN

and enter the notification_channel informaion  in the text box below















Comments