PostgreSQL. ltree.

Автор shurutov, ноября 23, 2014, 15:26:10

« назад - далее »

0 Пользователи и 7 гостей просматривают эту тему.

shurutov

А стало быть развлекся с ltree на PostgreSQL. Чтобы было. В качестве подопытных кроликов использовалась БД КЛАДР (умнее и оригинальнее ничего в голову не пришло :)). Было весело, однако.
Вот таким скриптом
Spoiler: KLADR upload • показать
[code]#!/bin/bash
# Get Kladr and upload it to PostgreSQL

NO7ZIP=65
NOARJ=66
NOXBASE=67
NOPGPASS=68
NODBACCESS=69
NODB=70
NOLTREE=71
NOPOSTGRES=72

OLDPATH=$(pwd)
Z7=$(which 7z)
ARJ=$(which arj)
DBFDUMP=$(which dbfdump)

[ -z $Z7 ] && {
   echo "Install 7zip before actions"
   exit $NO7ZIP
}

[ -z $ARJ ] && {
   echo "Install arj before actions"
   exit $NOARJ
}

[ -z $DBFDUMP ] && {
   echo "Install DBD-XBase before actions"
   exit $NOXBASE
}

[ -z `find ~/ -name ".pgpass" -perm 600` ] && {
   echo "You need ~/.pgpass with mode 600."
   echo "See documentation about this file in postgresql documentation."
   exit $NODBACCESS
}

[ -z `which psql` ] && {
   echo "You need install a postgresql client"
   exit $NOPOSTGRES
}

PGCMD="psql -h127.0.0.1 -w -Ukladr kladr"
$PGCMD -tc "select 'OK' as OK;" | sed -e '/^$/d' -e 's/\s\+//' || {
   echo "You need DB kladr whith owner kladr"
   echo "Access on DB kladr for user kladr should be in ~/.pgpass"
   exit $NODB
}

$PGCMD -tc "\dT"|cut -d' ' -f4|grep ltree > /dev/null 2>&1 || {
   $PGCMD -c "CREATE EXTENSION ltree;" || {
      echo "You need ltree extension"
      echo "Check documentation for set up it"
      exit $NOLTREE
   }
}

URLS="http://www.gnivc.ru/html/gnivcsoft/KLADR/4/osob4.0.doc
http://www.gnivc.ru/html/gnivcsoft/KLADR/Read_me.doc
http://www.gnivc.ru/html/gnivcsoft/KLADR/DOCUM.ARJ
http://www.gnivc.ru/html/gnivcsoft/KLADR/licenziya_kladr.doc
http://www.gnivc.ru/html/gnivcsoft/KLADR/Base.7z"

[ -d /var/tmp/kladr ] && rm -rf /var/tmp/kladr/* || mkdir /var/tmp/kladr
cd /var/tmp/kladr

for url in $URLS; do
   wget $url
done

[ -d tmp ] || mkdir tmp
cd tmp
7z e ../Base.7z

$PGCMD -f ~/prj/kladr/drop-tables.sql

for fold in *.DBF; do
   fnew=${fold,,}
   TABLE=${fnew/\.dbf/}
   fset=${fnew/dbf/set}
   fsql=${fset/\.set/\.sql}
   fcsv=${fset/\.set/\.csv}
   dbfdump ${fold} |iconv -fcp866 -tutf8 > ${fcsv}
   dbfdump --info ${fold}|sed -e '1,/Field info:/d'|sed -e '1d' > ${fset}
   echo "CREATE TABLE $TABLE (" >> $fsql
   tac $fset | awk '{print "   "$2,"varchar("$4"),"}'|sed -e '1s/,$//'|tac >> $fsql
   echo ");"  >> $fsql
   $PGCMD < $fsql
   $PGCMD -c "\copy $TABLE FROM '$fcsv' DELIMITER ':'"
   case $TABLE in
      "kladr"|"street"|"doma")
         echo "$TABLE - first"
         $PGCMD -c "alter table $TABLE add column hierarch ltree;"
         $PGCMD -c "update $TABLE set hierarch = ltree(case
            when substring(code from 3 for 9) = '000000000' then
               substring(code from 1 for 2)
            when substring(code from 6 for 6) = '000000' then
               substring(code from 1 for 2) || '.' || substring(code from 3 for 3)
            when substring(code from 9 for 3) = '000' then
               substring(code from 1 for 2) || '.' || substring(code from 3 for 3) || '.' || substring(code from 6 for 3)
            else substring(code from 1 for 2) || '.' || substring(code from 3 for 3) || '.' || substring(code from 6 for 3) || '.' ||  substring(code from 9 for 3) end);"
         $PGCMD -c "alter table $TABLE add column level int2;"
      ;;&
      "street"|"doma")
         echo "$TABLE - 2"
         $PGCMD -c "alter table $TABLE add column streetcode char(4);"
         $PGCMD -c "update $TABLE set streetcode=substring(code from 12 for 4);"
      ;;&
      "doma")
         echo "$TABLE - 3"
         $PGCMD -c "alter table $TABLE add column domcode char(4);"
         $PGCMD -c "update $TABLE set domcode=substring(code from 16 for 4);"
      ;;
   esac
done

$PGCMD -f ~/prj/kladr/alter-tables.sql

cd $OLDPATH
exit 0[/code]

Я готовил базу.
Парочка SQL-ей:
Spoiler: drop-tables.sql • показать
[code]DROP TABLE IF EXISTS flat;
DROP TABLE IF EXISTS doma_hist;
DROP TABLE IF EXISTS street_hist;
DROP TABLE IF EXISTS kladr_hist;
-- DROP TABLE IF EXISTS kladr_hierarch;
DROP TABLE IF EXISTS doma;
DROP TABLE IF EXISTS street;
DROP TABLE IF EXISTS kladr;
DROP TABLE IF EXISTS altnames;
DROP TABLE IF EXISTS socrbase;
[/code]

Я же умный! Я же АДМИН Баз Данных! :) Поэтому в первом варианте иерархия была вынесена в отдельную таблицу. Потом я решил не усложнять себе жизнь на данном этапе и добавил еще одно поле в оригинальную таблицу. А таблицу иерархий помножил на ноль.
Собственно, создание необходимых индексов и прочих хранимых функций/процедур:
Spoiler: alter-tables.sql • показать
[code]select 'alter table altnames' as alteraltnames;
alter table altnames alter column level type int2 using (level::int2);

select 'alter table socrbase' as altersocrbase;
alter table socrbase alter column kod_t_st type char(3);
alter table socrbase alter column level type int2 using (level::int2);
alter table socrbase add constraint socrbase_scname_level_pk primary key (scname,level);

select 'alter table kladr' as alterkladr;
alter table kladr alter column code type char(13);
alter table kladr add column parent_code char(13);
alter table kladr add constraint socrname_fk foreign key (socr,level) references socrbase (scname,level);
update kladr set level=nlevel(hierarch);

select 'create table kladr_hist' as createkladrhist;
create table kladr_hist (like kladr);
insert into kladr_hist (select * from kladr where substring(code from 12 for 2) != '00');
delete from kladr where substring(code from 12 for 2) != '00';
alter table kladr_hist add constraint socrname_fk foreign key (socr,level) references socrbase (scname,level);

select 'alter table kladr' as alterkladr;
alter table kladr add constraint kladr_hierarch_pk primary key (hierarch);
create index kladr_hierarch_gist on kladr USING gist (hierarch);
create unique index kladr_code_idx on kladr (substring(code from 1 for 11));

select 'create table street_hist' as createstreethist;
create table street_hist (like street);
insert into street_hist (
   select s.name,s.socr,s.code,s.index,s.gninmb,s.uno,s.ocatd,s.hierarch,s.level,s.streetcode
   from street as s left outer join kladr as k using (hierarch)
   where k.hierarch IS NULL
);
delete from street where code in (
   select s.code from street as s left outer join kladr as k using(hierarch) where k.hierarch IS NULL);
insert into street_hist (
   select * from street where substring(code from 16 for 2)!='00');
delete from street where substring(code from 16 for 2)!='00';
update street_hist set level='5';

select 'alter table street' as alterstreet;
alter table street alter column code type char(17);
update street set level='5';
alter table street add constraint socrname_fk foreign key (socr,level) references socrbase (scname,level);
alter table street add constraint street_hierarch_fk foreign key (hierarch) references kladr (hierarch);
alter table street add constraint street_hierarch_sc_pk primary key (hierarch, streetcode);
create index street_hierarch_gist on street USING gist (hierarch);
create index street_code_idx on street (substring(code from 1 for 15));

select 'alter table doma' as alterdoma;
alter table doma alter column code type char(19);
update doma set level='6';

select 'create table doma_hist' as createdomahist;
create table doma_hist (like doma);
insert into doma_hist
   select d.name,d.korp,d.socr,d.code,d.index,d.gninmb,d.uno,d.ocatd,d.hierarch,d.level,d.streetcode,d.domcode
   from doma as d left outer join kladr as k using (hierarch)
   where k.hierarch IS NULL;
delete from doma where code in (
   select d.code
   from doma as d left outer join kladr as k using (hierarch)
   where k.hierarch IS NULL
);
insert into doma_hist
   select d.name,d.korp,d.socr,d.code,d.index,d.gninmb,d.uno,d.ocatd,d.hierarch,d.level,d.streetcode,d.domcode
   from doma as d left outer join street as s using(hierarch,streetcode)
   where s.code is null;
delete from doma where code in (
   select d.code
   from doma as d left outer join street as s using(hierarch,streetcode)
   where s.code is null
);
update doma_hist set level='6';

select 'alter table doma' as alterdoma;
insert into street (name,socr,code,hierarch,streetcode,level)
   select distinct k.name,k.socr, substring(d.code from 1 for 15) || '00' as code, k.hierarch, d.streetcode, k.level
   from doma as d inner join kladr as k using (hierarch)
   where d.streetcode='0000';
alter table doma add constraint socrname_fk foreign key (socr,level) references socrbase (scname,level);
alter table doma add constraint doma_hierarch_sc_fk foreign key (hierarch, streetcode) references street (hierarch,streetcode);
alter table doma add constraint doma_hierarch_sc_dc_pk primary key (hierarch, streetcode, domcode);
create index doma_hierarch_gist on doma USING gist (hierarch);
create unique index doma_code_idx on doma (substring(code from 1 for 19));

-- Fill parent_code on kladr
-- get parent_code for hierarch
create or replace function get_parent_code(hrrh ltree, lvl int2, OUT pcode char(13)) as $$
declare phrrh ltree;
begin
   case lvl
      when 2 then
         phrrh := subpath(hrrh,0,1);
      when 3 then
         if subpath(hrrh,1,1) = '000' then
            phrrh := subpath(hrrh,0,1);
         else
            phrrh := subpath(hrrh,0,2);
         end if;
      when 4 then
         if subpath(hrrh,1,2) = '000.000' then
            phrrh := subpath(hrrh,0,1);
         elsif subpath(hrrh,2,1) = '000' then
            phrrh := subpath(hrrh,0,2);
         else
            phrrh := subpath(hrrh,0,3);
         end if;
      else
         phrrh := ''::ltree;
   end case;
   if lvl = 1 then
      pcode := '';
   else
      execute 'select code from kladr where hierarch = $1' into pcode using phrrh;
   end if;
end;
$$ LANGUAGE plpgsql;

create or replace function initkh() returns bigint as $$
declare
   rowcount bigint;
   pcode char(13);
   lvl int2;
   hrrh ltree;
   phrrh ltree;
begin
   rowcount := 0;
   for hrrh,lvl in select hierarch,level from kladr where level > 1 loop
      case lvl
         when 2 then
            phrrh := subpath(hrrh,0,1);
         when 3 then
            if subpath(hrrh,1,1) = '000' then
               phrrh := subpath(hrrh,0,1);
            else
               phrrh := subpath(hrrh,0,2);
            end if;
         when 4 then
            if subpath(hrrh,1,2) = '000.000' then
               phrrh := subpath(hrrh,0,1);
            elsif subpath(hrrh,2,1) = '000' then
               phrrh := subpath(hrrh,0,2);
            else
               phrrh := subpath(hrrh,0,3);
            end if;
      end case;
      rowcount := rowcount+1;
      execute 'select code from kladr where hierarch = $1' into pcode using phrrh;
      execute 'update kladr set parent_code = $1 where hierarch = $2' using pcode,hrrh;
   end loop;
   return rowcount;
end;
$$ LANGUAGE plpgsql;
-- It is no used
-- select 'create table kladr_hierarch' as createkladrhierarch;
-- create table kladr_hierarch (
--    parent_code char[13] NOT NULL,
--    parent_hierarch ltree NOT NULL,
--    obj_code char[13] NOT NULL,
--    obj_hierarch ltree NOT NULL,
--    obj_level int2 NOT NULL
-- );
-- insert into kladr_hierarch select '','',code,hierarch,level from kladr where level = 1;

-- Fisrt edition of filling of table kladr_hierarch
-- create or replace function inithierarch() returns bigint as $$
--    declare
--       pcode char(13);
--       phierarch ltree;
--       ocode char(13);
--       ohierarch ltree;
--       olevel int2;
--       rowcount bigint;
--       workrow record;
--       initstring cursor for select code,hierarch,level from kladr where level != 1;
--    begin
--       rowcount := 0;
--       for workrow in initstring loop
--          case workrow.level
--             when 2 then
--                phierarch := subpath(workrow.hierarch,0,1);
--             when 3 then
--                if subpath(workrow.hierarch,1,1) = '000' then
--                   phierarch := subpath(workrow.hierarch,0,1);
--                else
--                   phierarch := subpath(workrow.hierarch,0,2);
--                end if;
--             when 4 then
--                if subpath(workrow.hierarch,1,2) = '000.000' then
--                   phierarch := subpath(workrow.hierarch,0,1);
--                elsif subpath(workrow.hierarch,2,1) = '000' then
--                   phierarch := subpath(workrow.hierarch,0,2);
--                else
--                   phierarch := subpath(workrow.hierarch,0,3);
--                end if
--          end case;
--          rowcount := rowcount + 1;
--          execute select code from kladr where hierarch = phierarch into pcode;
--          ocode := workrow.code;
--          ohierarch := workrow.hierarch;
--          olevel := workrow.level;
--          insert into kladr_hierarch (parent_code,parent_hierarch,obj_code,obj_hierarch,obj_level)
--             values (pcode,phierarch,ocode,ohierarch,olevel);
--       end loop;
--       return rowcount;
--    end;
-- $$ LANGUAGE plpgsql;

-- Second edition of filling of table kladr_hierarch
-- create or replace function get_parent_data (hrrh ltree, lvl int2, OUT phrrh ltree, OUT pcode char(13)) as $$
-- begin
--    case lvl
--       when 2 then
--          phrrh := subpath(hrrh,0,1);
--       when 3 then
--          if subpath(hrrh,1,1) = '000' then
--             phrrh := subpath(hrrh,0,1);
--          else
--             phrrh := subpath(hrrh,0,2);
--          end if;
--       when 4 then
--          if subpath(hrrh,1,2) = '000.000' then
--             phrrh := subpath(hrrh,0,1);
--          elsif subpath(hrrh,2,1) = '000' then
--             phrrh := subpath(hrrh,0,2);
--          else
--             phrrh := subpath(hrrh,0,3);
--          end if;
--    end case;
--    execute 'select code from kladr where hierarch = $1' into pcode using phrrh;
-- end;
-- $$ LANGUAGE plpgsql;

-- create or replace function initkh() returns bigint as $$
-- declare
--    phrrh ltree;
--    ohrrh ltree;
--    pcode char(13);
--    ocode char(13);
--    olevel int2;
--    rowcount int8;
-- begin
--    rowcount := 0;
--    for ocode,ohrrh,olevel in select code,hierarch,level from kladr where level != 1 loop
--       execute 'select * from get_parent_data($1,$2)' into (phrrh,pcode) using ohrrh,olevel;
--       insert into kladr_hierarch (parent_code,parent_hierarch,obj_code,obj_hierarch,obj_level)
--          values (pcode,phrrh,ocode,ohrrh,olevel);
--       rowcount := rowcount + 1;
--    end loop;
--    return rowcount;
-- end
-- $$ language plpgsql;

-- there is a function regexp_replace...
[/code]

Один индекс, который по parent_code, code был создан после кувырканий с заполнением parent_code, поэтому я опущу описание его (индекса) создания.
Ну и поехали, посмотрим, что и как с ltree.
С уважением,
Шурутов Михаил

shurutov

Первоначальное заполнение таблицы.
Установка родителя для объектов федерального подчинения единая для всех вариантов заполнения.
kladr=> \timing
Секундомер включен.
kladr=> update kladr set parent_code = '' where level = 1;
UPDATE 86
Время: 134,192 мс


  • В лоб, без лишних ухищрений
kladr=> update kladr set parent_code = (select p.code from kladr as p where p.hierarch = text2ltree(regexp_replace(subpath(hierarch,0,level-1)::text,'(.000){1,}$', ''))) where level > 1;
UPDATE 189644
Время: 13350,516 мс
kladr=> select count(*) from kladr where parent_code is NULL;
count
--------
189598
(1 строка)

Время: 288,071 мс

Клево, однако... Чё за фигня? select в скобках нормально все возвращает. Надо чего-то сделать.
Оба два других будут использовать только subpath. Из встроенных ф-й.
  • В лоб, одной функцией
Ее определение - в предыдущем посте.
kladr=> select initkh();
initkh
--------
189598
(1 строка)

Время: 44216,704 мс
kladr=> select count(*) from kladr where parent_code is NULL;
count
-------
      0
(1 строка)
 
Время: 56,546 мс

  • Изподвыподверта
kladr=> update kladr set parent_code = get_parent_code(hierarch,level) where level > 1;
UPDATE 189598
Время: 22990,729 мс
kladr=> select count(*) from kladr where parent_code is NULL;
count
-------
      0
(1 строка)

Время: 322,651 мс

вроде бы нормальный результат. Проверочные выборки я приводить не буду, чтобы сократить количество буковок повествования.

В принципе, результаты более-менее ожидаемые.
С уважением,
Шурутов Михаил

shurutov

#2
Все потомки
  • Рекурсивный запрос
explain analyze with recursive kr (parent_code, code) as (
select k.parent_code, k.code from kladr k where k.parent_code='5000000000000'
union
select k.parent_code, k.code from kladr k join kr on k.parent_code = kr.code)
select * from kr;
                                                                          QUERY PLAN                         
--------------------------------------------------------------------------------------------------------------------------------------------------------------
CTE Scan on kr  (cost=44832.11..53929.73 rows=454881 width=112) (actual time=0.038..65.915 rows=7103 loops=1)
   CTE kr
     ->  Recursive Union  (cost=0.42..44832.11 rows=454881 width=28) (actual time=0.037..63.310 rows=7103 loops=1)
           ->  Index Only Scan using kladr_pcode_code_idx on kladr k  (cost=0.42..5.49 rows=61 width=28) (actual time=0.031..0.051 rows=76 loops=1)
                 Index Cond: (parent_code = '5000000000000'::bpchar)
                 Heap Fetches: 0
           ->  Nested Loop  (cost=0.42..3572.90 rows=45482 width=28) (actual time=4.552..19.787 rows=2342 loops=3)
                 ->  WorkTable Scan on kr kr_1  (cost=0.00..12.20 rows=610 width=56) (actual time=0.000..0.325 rows=2368 loops=3)
                 ->  Index Only Scan using kladr_pcode_code_idx on kladr k_1  (cost=0.42..5.09 rows=75 width=28) (actual time=0.008..0.008 rows=1 loops=7103)
                       Index Cond: (parent_code = kr_1.code)
                       Heap Fetches: 0
Total runtime: 67.574 ms
(12 строк)


  • Список из ltree
kladr=> explain analyze select parent_code, code from kladr where hierarch ~ '50.*{1,}';
                                                            QUERY PLAN                                       
----------------------------------------------------------------------------------------------------------------------------------
Bitmap Heap Scan on kladr  (cost=29.75..709.90 rows=190 width=28) (actual time=1.359..3.927 rows=7103 loops=1)
   Recheck Cond: (hierarch ~ '50.*{1,}'::lquery)
   ->  Bitmap Index Scan on kladr_hierarch_gist  (cost=0.00..29.71 rows=190 width=0) (actual time=1.328..1.328 rows=7103 loops=1)
         Index Cond: (hierarch ~ '50.*{1,}'::lquery)
Total runtime: 4.364 ms
(5 строк)

Время: 4,704 мс


  • Запрос по предку
kladr=> explain analyze select parent_code, code from kladr where parent_code='5000000000000';
                                                             QUERY PLAN                                     
------------------------------------------------------------------------------------------------------------------------------------
Index Only Scan using kladr_pcode_code_idx on kladr  (cost=0.42..5.49 rows=61 width=28) (actual time=0.028..0.046 rows=76 loops=1)
   Index Cond: (parent_code = '5000000000000'::bpchar)
   Heap Fetches: 0
Total runtime: 0.066 ms
(4 строки)


  • Список из ltree
kladr=> explain analyze select parent_code,code from kladr where hierarch ~ '50.000.000.*{1}' or hierarch ~ '50.000.*{1}' or hierarch ~ '50.*{1}';
                                                              QUERY PLAN                                     
--------------------------------------------------------------------------------------------------------------------------------------
Bitmap Heap Scan on kladr  (cost=89.55..1922.92 rows=568 width=28) (actual time=1.059..1.135 rows=76 loops=1)
   Recheck Cond: ((hierarch ~ '50.000.000.*{1}'::lquery) OR (hierarch ~ '50.000.*{1}'::lquery) OR (hierarch ~ '50.*{1}'::lquery))
   ->  BitmapOr  (cost=89.55..89.55 rows=569 width=0) (actual time=1.049..1.049 rows=0 loops=1)
         ->  Bitmap Index Scan on kladr_hierarch_gist  (cost=0.00..29.71 rows=190 width=0) (actual time=0.065..0.065 rows=8 loops=1)
               Index Cond: (hierarch ~ '50.000.000.*{1}'::lquery)
         ->  Bitmap Index Scan on kladr_hierarch_gist  (cost=0.00..29.71 rows=190 width=0) (actual time=0.102..0.102 rows=32 loops=1)
               Index Cond: (hierarch ~ '50.000.*{1}'::lquery)
         ->  Bitmap Index Scan on kladr_hierarch_gist  (cost=0.00..29.71 rows=190 width=0) (actual time=0.882..0.882 rows=36 loops=1)
               Index Cond: (hierarch ~ '50.*{1}'::lquery)
Total runtime: 1.174 ms


      Но это конкретная база, по которой напильник толком не проходил. На самом деле запрос д/б таким
kladr=# explain analyze select parent_code,code from kladr where hierarch ~ '50.*{1}';
                                                           QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------
Bitmap Heap Scan on kladr  (cost=29.75..709.90 rows=190 width=28) (actual time=1.003..1.055 rows=36 loops=1)
   Recheck Cond: (hierarch ~ '50.*{1}'::lquery)
   ->  Bitmap Index Scan on kladr_hierarch_gist  (cost=0.00..29.71 rows=190 width=0) (actual time=0.993..0.993 rows=36 loops=1)
         Index Cond: (hierarch ~ '50.*{1}'::lquery)
Total runtime: 1.079 ms
(5 строк)

    Ну как бы понятно. Нет в мире совершенства, а в жизни счастья, ну и все такое...
Подытоживая. ltree - вполне себе годная реализация materialized path в PostgreSQL и пользоваться ей можно и даже нужно, когда логика того требует. А вот с функциями и процедурами все не так однозначно. На мой скромный взгляд.
С уважением,
Шурутов Михаил