MySQL Forums
Forum List  »  Data Warehouse

datawarehouse ETL rolls back after 5 GB of data transfer
Posted by: shyam ahuja
Date: May 12, 2010 10:01AM

Hi,

I am running ETL which is Extracting Data from staging table and inserting into Datawarehouse DB. I have around 10 GB of data in one table. After transferring 5 GB of data it rolls back the transaction and deletes everything from table. Any feedback?

Here is ETL
--Procedure: new_etl1

--DROP PROCEDURE IF EXISTS new_etl1;

DELIMITER $$
CREATE PROCEDURE `new_etl1`()
BEGIN
DECLARE ins1 varchar(15) default '';
DECLARE ins2 varchar(15) default '';
DECLARE ins3 varchar(15) default '';
DECLARE ins4 varchar(15) default '';
DECLARE ins5 varchar(15) default '';
DECLARE ins6 varchar(15) default '';
DECLARE ins7 varchar(15) default '';
DECLARE ins8 varchar(15) default '';
DECLARE ins9 varchar(15) default '';
DECLARE ins10 varchar(15) default '';
DECLARE j int;
DECLARE dw_max_id int ;
DECLARE stg_max_id int ;
DECLARE dw_max_id_1 int ;
DECLARE stg_max_id_1 int ;
DECLARE log1 varchar(15) default'';
DECLARE l_log numeric ;

DECLARE EXIT HANDLER FOR SQLEXCEPTION,SQLWARNING,NOT FOUND
select concat
(
'Erorr Inserting Data into DWH Table ',
case
when ins1='' then 'Dw_log_presentation'
when ins2='' then 'Dw_log_smil_sequence'
when ins3='' then 'Dw_log_smil_context'
when ins4='' then 'Dw_theaterexhibitor '
when ins5='' then 'Dw_log_processor'
when ins6='' then 'Dw_log_day_control '
when ins7='' then 'Dw_Files '
when ins8='' then 'Dw_log_File'
when ins9='' then 'Dw_log_playlog'
when ins10='' then 'Dw_playlog'
END,
' Insertion Failed' );
SET AUTOCOMMIT=0;
START TRANSACTION;
select max(job_instance_no) into j from staging_db.job_details ;

insert into datawarehouse_db.dw_log_presentation
(Presentation_Id, Name)
select Presentation_Id,Name from staging_db.stg_log_presentation
where Presentation_Id not in
(select Presentation_Id from datawarehouse_db.dw_log_presentation);
SELECT ROW_COUNT() into log1;

if log1 is null or log1=0
then
insert into staging_db.dw_logs(job_id,num_rows,Table_Name,Updated_Status) values(j,log1,'Dw_log_presentation','No Duplicate Entry Allowed');
else
insert into staging_db.dw_logs(job_id,num_rows,Table_Name,Updated_status) values(j,log1,'Dw_log_presentation','Completed');
end if;

SET ins1='Done';

insert into datawarehouse_db.dw_log_smil_sequence
(SEQUENCE_ID, NAME)
select SEQUENCE_ID, NAME from staging_db.stg_log_smil_sequence
where SEQUENCE_ID not in
(select SEQUENCE_ID from datawarehouse_db.dw_log_smil_sequence);

SELECT ROW_COUNT() into log1;

if log1 is null or log1=0
then
insert into staging_db.dw_logs(job_id,num_rows,Table_Name,Updated_Status) values(j,log1,'Dw_log_smil_sequence ','No Duplicate Entry Allowed ');
else
insert into staging_db.dw_logs(job_id,num_rows,Table_Name,Updated_status) values(j,log1,'Dw_log_smil_sequence','Completed');
end if;

SET ins2='Done';

insert into datawarehouse_db.dw_log_smil_context
(SMIL_ID, REF_ID)
select SMIL_ID, REF_ID from staging_db.stg_log_smil_context
where SMIL_ID not in
(select SMIL_ID from datawarehouse_db.dw_log_smil_context);

SELECT ROW_COUNT() into log1;
if log1 is null or log1=0
then
insert into staging_db.dw_logs(job_id,num_rows,Table_Name,Updated_Status) values(j,log1,'Dw_log_smil_context','No Duplicate Entry Allowed ');
else
insert into staging_db.dw_logs(job_id,num_rows,Table_Name,Updated_status) values(j,log1,'Dw_log_smil_context','Completed');
end if;
SET ins3='Done';
insert into datawarehouse_db.dw_theaterexhibitor
(PROCESSOR, THEATER, EXHIBITOR)
select PROCESSOR, THEATER, EXHIBITOR
from
staging_db.stg_theaterexhibitor
where PROCESSOR not in
(select PROCESSOR from datawarehouse_db.dw_theaterexhibitor );

SELECT ROW_COUNT() into log1;
if log1 is null or log1=0
then
insert into staging_db.dw_logs(job_id,num_rows,Table_Name,Updated_Status) values(j,log1,'Dw_theaterexhibitor','No Duplicate Entry Allowed ');
else
insert into staging_db.dw_logs(job_id,num_rows,Table_Name,Updated_status) values(j,log1,'Dw_theaterexhibitor','Completed');
end if;
SET ins4='Done';
insert into datawarehouse_db.dw_log_processor
(Processor_Id, Name)
select Processor_Id, Name
from
staging_db.stg_log_processor
where Processor_Id not in
(select Processor_Id from datawarehouse_db.dw_log_processor );
SELECT ROW_COUNT() into log1;
if log1 is null or log1=0
then
insert into staging_db.dw_logs(job_id,num_rows,Table_Name,Updated_Status) values(j,log1,'Dw_log_processor','No Duplicate Entry Allowed ');
else
insert into staging_db.dw_logs(job_id,num_rows,Table_Name,Updated_status) values(j,log1,'Dw_log_processor','Completed');
end if;
SET ins5='Done';
insert into datawarehouse_db.dw_log_day_control
(LDC_Id, Log_Date, Pending, Node_Id)
select LDC_Id, Log_Date, Pending, Node_Id
from
staging_db.stg_log_day_control
where LDC_Id not in
(select LDC_Id from datawarehouse_db.dw_log_day_control);
SELECT ROW_COUNT() into log1;
if log1 is null or log1=0
then
insert into staging_db.dw_logs(job_id,num_rows,Table_Name,Updated_Status) values(j,log1,'Dw_log_day_control','No Duplicate Entry Allowed ');
else
insert into staging_db.dw_logs(job_id,num_rows,Table_Name,Updated_status) values(j,log1,'Dw_log_day_control','Completed');
end if;
SET ins6='Done';
select max(Entry_Id) into dw_max_id from datawarehouse_db.dw_log_playlog ;
select max(Entry_Id) into stg_max_id from staging_db.stg_log_playlog ;
if dw_max_id is null then
select MIN(Entry_Id) into dw_max_id from staging_db.stg_log_playlog ;
End if;
if stg_max_id is null then
set stg_max_id=0;
End if;
set log1 = 0 ;
while dw_max_id < stg_max_id
DO
SET @s =CONCAT("insert into datawarehouse_db.dw_log_playlog
(Entry_Id, File_Id, Processor_Id, Sequence_Id, SMIL_Id, Play_Time)
select Entry_Id, File_Id, Processor_Id, Sequence_Id, SMIL_Id, Play_Time
from staging_db.stg_log_playlog
where stg_log_playlog.Entry_Id > " , dw_max_id, " and stg_log_playlog.Entry_Id <= " ,dw_max_id+10000);
PREPARE stmt1 FROM @s;
EXECUTE stmt1;
SELECT ROW_COUNT() into l_log;
if l_log is not null then
set log1 = log1 + l_log ;
end if ;
DEALLOCATE PREPARE stmt1;
SET dw_max_id=dw_max_id+10000;

END WHILE;
if log1 is null or log1=0
then
insert into staging_db.dw_logs(job_id,num_rows,Table_Name,Updated_Status) values(j,log1,'Dw_log_playlog','No Duplicate Entry Allowed ');
else
insert into staging_db.dw_logs(job_id,num_rows,Table_Name,Updated_status) values(j,log1,'Dw_log_playlog','Completed');
end if;
SET ins7='Done';
insert into datawarehouse_db.dw_files
(LONG_NAME, SHORT_NAME, MOVIE, STUDIO)
select LONG_NAME, SHORT_NAME, MOVIE, STUDIO from staging_db.stg_files
where LONG_NAME not in
(select LONG_NAME from datawarehouse_db.dw_files );
SELECT ROW_COUNT() into log1;
if log1 is null or log1=0
then
insert into staging_db.dw_logs(job_id,num_rows,Table_Name,Updated_Status) values(j,log1,'Dw_files ','No Duplicate Entry Allowed ');
else
insert into staging_db.dw_logs(job_id,num_rows,Table_Name,Updated_status) values(j,log1,'Dw_files ','Completed');
end if;
SET ins8='Done';
insert ignore into datawarehouse_db.dw_log_file
(File_Id, Presentation_Id, File_Name)
select File_Id, Presentation_Id, File_Name
from
staging_db.stg_log_file
where File_Id not in
(select File_Id from datawarehouse_db.dw_log_file);
SELECT ROW_COUNT() into log1;
if log1 is null or log1=0
then
insert into staging_db.dw_logs(job_id,num_rows,Table_Name,Updated_Status) values(j,log1,'Dw_log_file ','No Duplicate Entry Allowed ');
else
insert into staging_db.dw_logs(job_id,num_rows,Table_Name,Updated_status) values(j,log1,'Dw_log_file ','Completed');
end if;
SET ins9='Done';

select max(Entry_Id) into dw_max_id_1 from datawarehouse_db.dw_playlog;
select max(Entry_Id) into stg_max_id_1 from staging_db.stg_log_playlog;
if dw_max_id_1 is null then
select MIN(Entry_Id) into dw_max_id_1 from staging_db.stg_log_playlog ;
End if;
if stg_max_id_1 is null then
set stg_max_id_1=0;
End if;
set log1 = 0 ;
while dw_max_id_1 < stg_max_id_1
DO
SET @s =CONCAT("insert into datawarehouse_db.dw_playlog
(Entry_Id, Play_Time, File_Name, Processor_Name, THEATER, EXHIBITOR, Presentation_Name, Presentation_Id, File_id, processor_id, short_name, movie, studio, Node_id, Log_date )
SELECT stg_log_playlog.Entry_Id
,stg_log_playlog.Play_Time
, stg_log_file.File_Name
, stg_log_processor.Name
, stg_theaterexhibitor.THEATER
, stg_theaterexhibitor.EXHIBITOR
, stg_log_presentation.Name
,stg_log_presentation.Presentation_Id
,stg_log_file.File_Id
,stg_log_processor.Processor_Id
,stg_files.SHORT_NAME
,stg_files.MOVIE
,stg_files.STUDIO
,stg_log_day_control.Node_Id
,stg_log_day_control.Log_Date
FROM
((stg_log_file stg_log_file
INNER JOIN
stg_log_presentation stg_log_presentation
ON (stg_log_file.Presentation_Id = stg_log_presentation.Presentation_Id))
INNER JOIN
stg_log_playlog stg_log_playlog
ON (stg_log_playlog.File_Id = stg_log_file.File_Id))
INNER JOIN
stg_log_processor stg_log_processor
ON (stg_log_playlog.Processor_Id = stg_log_processor.Processor_Id)
left JOIN stg_theaterexhibitor stg_theaterexhibitor
ON (stg_log_processor.Name=stg_theaterexhibitor.PROCESSOR)
left JOIN stg_files stg_files
ON (stg_log_file.File_Name=stg_files.LONG_NAME)
left JOIN stg_log_day_control stg_log_day_control
ON (stg_log_file.File_Id=stg_log_day_control.LDC_Id)
where (stg_log_playlog.File_Id = stg_log_file.File_Id)
AND (stg_log_file.Presentation_Id = stg_log_presentation.Presentation_Id)
AND (stg_log_playlog.Processor_Id = stg_log_processor.Processor_Id)
AND stg_log_playlog.Entry_Id > " , dw_max_id_1, " and stg_log_playlog.Entry_Id <= " ,dw_max_id_1+10000);
PREPARE stmt1 FROM @s;
EXECUTE stmt1;
SELECT ROW_COUNT() into l_log;
if l_log is not null then
set log1 = log1 + l_log ;
end if ;
DEALLOCATE PREPARE stmt1;
SET dw_max_id_1=dw_max_id_1+10000;
END WHILE;

if log1 is null or log1=0
then
insert into staging_db.dw_logs(job_id,num_rows,Table_Name,Updated_Status) values(j,log1,'Dw_playlog ','No Duplicate Entry Allowed ');
else
insert into staging_db.dw_logs(job_id,num_rows,Table_Name,Updated_status) values(j,log1,'Dw_playlog','Completed');
end if;

SET ins10='Done';

COMMIT;

END$$

DELIMITER ;

Options: ReplyQuote


Subject
Views
Written By
Posted
datawarehouse ETL rolls back after 5 GB of data transfer
7156
May 12, 2010 10:01AM


Sorry, you can't reply to this topic. It has been closed.

Content reproduced on this site is the property of the respective copyright holders. It is not reviewed in advance by Oracle and does not necessarily represent the opinion of Oracle or any other party.