在项目中总是遇到批量处理数据的情况,oracle中bluk collect的引入,减少了loop处理的开销,提升查询性能和效率,在dm中同样也适用,需要注意的是使用时,所有的into变量都必须是collections。
bulk collect是批量的聚合类型,是可以存储多行多列存储类型。
在说明bulk collect使用之前,简单说明一下环境准备。
测试数据是一千万数据的分区表
CREATE TABLE "T4"
(
"ID" int primary key,
"PLAN_NATURE" varchar(4),
"RISK_LEVEL" varchar(4),
"PLAN_BEGIN_TIME" timestamp,
"PLAN_END_TIME" timestamp,
"BUREAU_CODE" varchar(10)
)
PARTITION BY LIST("BUREAU_CODE")
(
PARTITION "P_0300" VALUES('301') ,
PARTITION "P_0301" VALUES('302') ,
PARTITION "P_0302" VALUES('303') ,
PARTITION "P_0303" VALUES('304') ,
PARTITION "P_0304" VALUES('305') ,
PARTITION "P_0305" VALUES('306') ,
PARTITION "P_0306" VALUES('307') ,
PARTITION "P_0307" VALUES('308') ,
PARTITION "P_0308" VALUES('309') ,
PARTITION "P_0309" VALUES('310') ,
PARTITION "P_0310" VALUES('311') ,
PARTITION "P_0311" VALUES('312') ,
PARTITION "P_0312" VALUES('313') ,
PARTITION "P_0313" VALUES('314') ,
PARTITION "P_0314" VALUES('315') ,
PARTITION "P_0315" VALUES('316') ,
PARTITION "P_0316" VALUES('317') ,
PARTITION "P_0317" VALUES('318') ,
PARTITION "P_0318" VALUES('319') ,
PARTITION "P_0319" VALUES('320') ,
PARTITION "P_0320" VALUES('321') ,
PARTITION "P_DEFAULT" VALUES(DEFAULT)
) ;
insert into t4(ID,PLAN_NATURE,RISK_LEVEL,BUREAU_CODE) select level,to_char(round(dbms_random.value(0,2),0)),to_char(round(dbms_random.value(1,3),0)),to_char(round(dbms_random.value(0301,0302),0)) from dual connect by level<=3000000;
commit;
insert into t4(ID,PLAN_NATURE,RISK_LEVEL,BUREAU_CODE) select level+3000000,to_char(round(dbms_random.value(0,2),0)),to_char(round(dbms_random.value(1,3),0)),to_char(round(dbms_random.value(0303,0310),0)) from dual connect by level<=4000000;
commit;
insert into t4(ID,PLAN_NATURE,RISK_LEVEL,BUREAU_CODE) select level+7000000,to_char(round(dbms_random.value(0,2),0)),to_char(round(dbms_random.value(1,6),0)),to_char(round(dbms_random.value(0311,0315),0)) from dual connect by level<=3000000;
commit;
insert into t4(ID,PLAN_NATURE,RISK_LEVEL,BUREAU_CODE) select level+10000000,to_char(round(dbms_random.value(0,2),0)),to_char(round(dbms_random.value(4,6),0)),to_char(round(dbms_random.value(0316,0321),0)) from dual connect by level<=5000;
commit;
update t4 set PLAN_BEGIN_TIME=(
SELECT to_date(TRUNC(DBMS_RANDOM.VALUE(
to_number(to_char(to_date('20220101','yyyymmdd'),'J')),
to_number(to_char(to_date('20220801','yyyymmdd')+1,'J')))),'J')+
DBMS_RANDOM.VALUE(1,3600)/3600
prize_time
FROM dual),PLAN_END_TIME=(SELECT to_date(TRUNC(DBMS_RANDOM.VALUE(
to_number(to_char(to_date('20220101','yyyymmdd'),'J')),
to_number(to_char(to_date('20220801','yyyymmdd')+1,'J')))),'J')+
DBMS_RANDOM.VALUE(1,3600)/3600
prize_time
FROM dual) where id<=2000000;
commit;
update t4 set PLAN_BEGIN_TIME=(
SELECT to_date(TRUNC(DBMS_RANDOM.VALUE(
to_number(to_char(to_date('20190101','yyyymmdd'),'J')),
to_number(to_char(to_date('20220801','yyyymmdd')+1,'J')))),'J')+
DBMS_RANDOM.VALUE(1,3600)/3600
prize_time
FROM dual),PLAN_END_TIME=(SELECT to_date(TRUNC(DBMS_RANDOM.VALUE(
to_number(to_char(to_date('20190101','yyyymmdd'),'J')),
to_number(to_char(to_date('20220801','yyyymmdd')+1,'J')))),'J')+
DBMS_RANDOM.VALUE(1,3600)/3600
prize_time
FROM dual) where id>=2000000 and id<4000000;
commit;
update t4 set PLAN_BEGIN_TIME=(
SELECT to_date(TRUNC(DBMS_RANDOM.VALUE(
to_number(to_char(to_date('20180101','yyyymmdd'),'J')),
to_number(to_char(to_date('20220801','yyyymmdd')+1,'J')))),'J')+
DBMS_RANDOM.VALUE(1,3600)/3600
prize_time
FROM dual),PLAN_END_TIME=(SELECT to_date(TRUNC(DBMS_RANDOM.VALUE(
to_number(to_char(to_date('20180101','yyyymmdd'),'J')),
to_number(to_char(to_date('20220801','yyyymmdd')+1,'J')))),'J')+
DBMS_RANDOM.VALUE(1,3600)/3600
prize_time
FROM dual) where id>=4000000 and id<6000000;
commit;
update t4 set PLAN_BEGIN_TIME=(
SELECT to_date(TRUNC(DBMS_RANDOM.VALUE(
to_number(to_char(to_date('20220101','yyyymmdd'),'J')),
to_number(to_char(to_date('20220801','yyyymmdd')+1,'J')))),'J')+
DBMS_RANDOM.VALUE(1,3600)/3600
prize_time
FROM dual),PLAN_END_TIME=(SELECT to_date(TRUNC(DBMS_RANDOM.VALUE(
to_number(to_char(to_date('20220101','yyyymmdd'),'J')),
to_number(to_char(to_date('20220801','yyyymmdd')+1,'J')))),'J')+
DBMS_RANDOM.VALUE(1,3600)/3600
prize_time
FROM dual) where id>=6000000 and id<8000000;
commit;
update t4 set PLAN_BEGIN_TIME=(
SELECT to_date(TRUNC(DBMS_RANDOM.VALUE(
to_number(to_char(to_date('20220101','yyyymmdd'),'J')),
to_number(to_char(to_date('20220801','yyyymmdd')+1,'J')))),'J')+
DBMS_RANDOM.VALUE(1,3600)/3600
prize_time
FROM dual),PLAN_END_TIME=(SELECT to_date(TRUNC(DBMS_RANDOM.VALUE(
to_number(to_char(to_date('20220101','yyyymmdd'),'J')),
to_number(to_char(to_date('20220801','yyyymmdd')+1,'J')))),'J')+
DBMS_RANDOM.VALUE(1,3600)/3600
prize_time
FROM dual) where id>=8000000 and id<=10005000;
commit;
DBMS_STATS.GATHER_TABLE_STATS(USER,'T4',null,100);
select into主要是获取查询项,利用bulk collect可以批量获取查询项 into到一个集合类型里。
select 查询项 bulk collect into 集合 from 表
declare
type test_list
is
table of t4.bureau_code%type;
tests test_list;
begin
select top 100 bureau_code bulk collect into tests from t4;
--- bulk collect 批量的将结果存入test中
for i in tests.first..tests.last
loop
print('tests(i)='||tests(i));
end loop;
end;
fetch into主要是在游标获取中使用,当没有指定limit,bulk collect是将数据一次性返回到集合里面。
fetch 游标 bulk collect into 集合 [limit n]
declare
type test_list
is
table of t4%rowtype;
tests test_list;
cursor cur_test is select top 200 * from t4;
begin
open cur_test;
fetch cur_test bulk collect into tests;
for i in tests.first..tests.last
loop
print('tests(i)='||tests(i).bureau_code);
end loop;
close cur_test;
end;
通过limit限制取多少数据
declare
type test_list
is
table of t4%rowtype;
tests test_list;
cursor cur_test is select top 10000 * from t4;
begin
open cur_test;
fetch cur_test bulk collect into tests limit 1000; --取1000数据
for i in tests.first..tests.last
loop
print('tests(i)='||tests(i).bureau_code);
end loop;
close cur_test;
end;
returning into主要是和dml操作一起,利用bulk collect 批量返回所操作的数据。
insert/update/delete returning exp1 [,exp2,exp3…] bulk collect into 集合
创建测试表
create table test as select * from t4 where 1=2;
注意分区表要开启行迁移才能改分区列
alter table t4 enable row movement;
declare
type test_id is table of t4.id%type;
testid test_id;
type test_pn is table of t4.plan_nature%type;
testpl test_pn;
type test_rl is table of t4.risk_level%type;
testrl test_rl;
type test_pbt is table of t4.plan_begin_time%type;
testpbt test_pbt;
type test_pet is table of t4.plan_end_time%type;
testpet test_pet;
type test_bc is table of t4.bureau_code%type;
testbc test_bc;
begin
update t4 set bureau_code='311' where bureau_code='303' returning id,plan_nature,risk_level,plan_begin_time,plan_end_time,bureau_code
bulk collect into testid,testpl,testrl,testpbt,testpet,testbc;
commit;
for i in testid.first..testid.last
loop
insert into test values(testid(i),testpl(i),testrl(i),testpbt(i),testpet(i),testbc(i));
commit;
end loop;
end;
下面做一个对照组进行对比其效率。
create table test1 as select * from t4 where 1=2;
declare
type test_list
is
table of t4%rowtype;
tests test_list;
cursor cur_test is select * from t4;
begin
open cur_test;
loop
fetch cur_test bulk collect into tests limit 10000;
-- exit when cur_test.count=0;
for i in tests.first..tests.last
loop
insert into test1 values(tests(i).id,tests(i).plan_nature,tests(i).risk_level,tests(i).plan_begin_time,tests(i).plan_end_time,tests(i).bureau_code);
commit;
end loop;
end loop;
close cur_test;
end;
create table test2 as select * from t4 where 1=2;
declare
cursor cur_test is select * from t4;
a int;
begin
a:=0;
for c_test in cur_test
loop
insert into test1 values(c_test.id,c_test.plan_nature,c_test.risk_level,c_test.plan_begin_time,c_test.plan_end_time,c_test.bureau_code);
a:=a+1;
if a=1000 then
commit;
end if;
commit;
end loop;
end;
declare
type test_list
is
table of t4%rowtype;
tests test_list;
cursor cur_test is select * from t4;
begin
open cur_test;
loop
fetch cur_test bulk collect into tests limit 10000;
forall i in tests.first..tests.last
insert into test3 values(tests(i).id,tests(i).plan_nature,tests(i).risk_level,tests(i).plan_begin_time,tests(i).plan_end_time,tests(i).bureau_code);
commit;
end loop;
close cur_test;
end;
测试结果说明单独使用bulk collect的情况并不能提升性能,而在bulk collect+forall相结合的情况下执行效率提升最明显。
实际项目中,每天晚上需要对热点表进行数据统计,经常因为使用表热度高导致统计中断。数据库是dsc集群,现模拟场景,在一定并发查询下,利用bulk collect+forall去做数据统计。
Jmeter并发100查询热点表配置如下:
做批量插入的脚本如下:
create table test4 as select * from t4 where 1=2;
declare
type test_list
is
table of t4%rowtype;
tests test_list;
cursor cur_test is select * from t4
union all select * from t1;
begin
open cur_test;
loop
fetch cur_test bulk collect into tests limit 5000;
forall i in tests.first..tests.last
insert into test4 values(tests(i).id,tests(i).plan_nature,tests(i).risk_level,tests(i).plan_begin_time,tests(i).plan_end_time,tests(i).bureau_code);
commit;
end loop;
close cur_test;
end;
top情况
执行时间
两千万的数据
used time: 00:01:04.026
内存使用大概在6g左右
在这次测试中,可以利用bulk collect+forall方式批量处理数据,可以提升一定的性能和执行效率。但是需要注意的是,利用bulk collect批量处理数据,是会使用一定的内存临时存储数据,在使用过程中应当考虑内存的情况,如果内存不足以存储数据而需要用到磁盘io,那么效率会大打折扣,不如不做批量处理。
文章
阅读量
获赞