0

我尝试使用symmetricds配置mysqlpostgres与转换同步。在 100% CPU 负载下,我在初始负载时插入性能非常低postgres。当我查看postgres日志时,我发现它使用了INSERT. 正常工作没问题,但初始化不行,因为我有数百万条记录。我PostgresBulkDatabaseWriter在源代码中COPY创建了它INSERT,它看起来像是很好的解决方案(COPYsql request 对我来说非常好),但我没有找到如何使用它。

所以我的问题:

symmetricds如何更好地为数百万条记录进行初始加载?

如何启用PostgresBulkDatabaseWriter初始(反向初始)加载?

谢谢

更新

源表mysql

CREATE TABLE `companies` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cid` int(11) NOT NULL,
  `universalName` text NOT NULL,
  `name` text NOT NULL,
  `country` text NOT NULL,
  `city` text NOT NULL,
  `street` text NOT NULL,
  `phone` text NOT NULL,
  `foundedYear` text NOT NULL,
  `employeeCountRange` text NOT NULL,
  `specialties` text NOT NULL,
  `websiteUrl` text NOT NULL,
  `twitterId` text NOT NULL,
  `check` tinyint(4) NOT NULL DEFAULT '0',
  `date` datetime NOT NULL,
  PRIMARY KEY (`id`)
);

CREATE TABLE `search_results` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `cid` int(11) NOT NULL,
  `title` text NOT NULL,
  `description` text NOT NULL,
  `link` text NOT NULL,
  `raw` text NOT NULL,
  `date` datetime NOT NULL,
  PRIMARY KEY (`id`)
);

核心表postgres

CREATE TABLE res_country (
    id integer NOT NULL,
    create_uid integer,
    create_date timestamp without time zone,
    write_date timestamp without time zone,
    write_uid integer,
    address_format text,
    currency_id integer,
    code character varying(2),
    name character varying(64) NOT NULL
);

INSERT INTO res_country VALUES (1, 1, '2013-11-16 06:53:31.030363', '2013-11-16 06:53:31.030363', 1, '%(street)s
%(street2)s
%(city)s %(state_code)s %(zip)s
%(country_name)s', 1, 'AD', 'Andorra, Principality of');

CREATE TABLE res_partner (
    id integer NOT NULL,
    name character varying(128) NOT NULL,
    lang character varying(64),
    company_id integer,
    create_uid integer,
    create_date timestamp without time zone,
    write_date timestamp without time zone,
    write_uid integer,
    comment text,
    ean13 character varying(13),
    color integer,
    image bytea,
    use_parent_address boolean,
    active boolean,
    street character varying(128),
    supplier boolean,
    city character varying(128),
    user_id integer,
    zip character varying(24),
    title integer,
    function character varying(128),
    country_id integer,
    parent_id integer,
    employee boolean,
    type character varying,
    email character varying(240),
    vat character varying(32),
    website character varying(64),
    fax character varying(64),
    street2 character varying(128),
    phone character varying(64),
    credit_limit double precision,
    date date,
    tz character varying(64),
    customer boolean,
    image_medium bytea,
    mobile character varying(64),
    ref character varying(64),
    image_small bytea,
    birthdate character varying(64),
    is_company boolean,
    state_id integer,
    notification_email_send character varying NOT NULL,
    opt_out boolean,
    signup_type character varying,
    signup_expiration timestamp without time zone,
    signup_token character varying,
    last_reconciliation_date timestamp without time zone,
    debit_limit double precision,
    display_name character varying,
    vat_subjected boolean,
    section_id integer
);


CREATE TABLE my_res_partner_companies (
  id INT8 NOT NULL PRIMARY KEY,
  cid INT8 NOT NULL,
  universalName VARCHAR NOT NULL,
  employeeCountRange VARCHAR NOT NULL,
  specialties VARCHAR NOT NULL,
  twitterId VARCHAR NOT NULL,
  "check" INT4 NOT NULL DEFAULT '0',
  date TIMESTAMP NOT NULL
);

CREATE TABLE my_res_partner_search_result (
  id INT8 NOT NULL PRIMARY KEY,
  link VARCHAR NOT NULL,
  raw VARCHAR NOT NULL,
  date TIMESTAMP NOT NULL
);

源属性:

engine.name=source-001

# The class name for the JDBC Driver
db.driver=com.mysql.jdbc.Driver

# The JDBC URL used to connect to the database
db.url=jdbc:mysql://localhost:3307/data?tinyInt1isBit=false

# The user to login as who can create and update tables
db.user=root

# The password for the user to login as
db.password=

# The HTTP URL of the root node to contact for registration
registration.url=http://localhost:8080/sync/core-000
#auto.reload.reverse=true

# Do not change these for running the demo
group.id=source
external.id=001

# This is how often the routing job will be run in milliseconds
job.routing.period.time.ms=5000
# This is how often the push job will be run.
job.push.period.time.ms=10000
# This is how often the pull job will be run.
job.pull.period.time.ms=10000

核心属性:

engine.name=core-000

# The class name for the JDBC Driver
db.driver=org.postgresql.Driver

# The JDBC URL used to connect to the database
db.url=jdbc:postgresql://localhost:5432/data2?stringtype=unspecified

# The user to login as who can create and update tables
db.user=admin

# The password for the user to login as
db.password=admin

registration.url=
sync.url=http://localhost:8080/sync/core-000
auto.reload.reverse=true
datareload.batch.insert.transactional=true

# Do not change these for running the demo
group.id=core
external.id=000

# Don't muddy the waters with purge logging
job.purge.period.time.ms=7200000

# This is how often the routing job will be run in milliseconds
job.routing.period.time.ms=5000
# This is how often the push job will be run.
job.push.period.time.ms=10000
# This is how often the pull job will be run.
job.pull.period.time.ms=10000

主要symmetric配置:

-- Nodes
insert into sym_node_group (node_group_id, description)
values ('core', 'Core Storage');
insert into sym_node_group (node_group_id, description)
values ('source', 'Source Storage');

insert into sym_node_group_link (source_node_group_id, target_node_group_id, data_event_action)
values ('source', 'core', 'P');
insert into sym_node_group_link (source_node_group_id, target_node_group_id, data_event_action)
values ('core', 'source', 'W');

insert into sym_node (node_id, node_group_id, external_id, sync_enabled)
values ('000', 'core', '000', 1);
insert into sym_node_security (node_id,node_password,registration_enabled,registration_time,initial_load_enabled,initial_load_time,initial_load_id,initial_load_create_by,rev_initial_load_enabled,rev_initial_load_time,rev_initial_load_id,rev_initial_load_create_by,created_at_node_id)
values ('000','changeme',1,current_timestamp,1,current_timestamp,null,null,0,null,null,null,'000');
insert into sym_node_identity values ('000');

-- Channels
insert into sym_channel
(channel_id, processing_order, max_batch_size, enabled, description)
values('source__acc', 1, 100000, 1, 'accounting synchronisation');

-- Triggers
insert into sym_trigger
(trigger_id,source_table_name,channel_id,last_update_time,create_time)
values('source__companies','companies','source__acc',current_timestamp,current_timestamp);

insert into sym_trigger
(trigger_id,source_table_name,channel_id,last_update_time,create_time)
values('source__search_results','search_results','source__acc',current_timestamp,current_timestamp);

-- Routers
insert into sym_router
(router_id,source_node_group_id,target_node_group_id,router_type,create_time,last_update_time)
values('source_2_core', 'source', 'core', 'default',current_timestamp, current_timestamp);

-- Trigger Router Links
insert into sym_trigger_router
(trigger_id,router_id,initial_load_order, INITIAL_LOAD_BATCH_COUNT,last_update_time,create_time)
values('source__companies','source_2_core', 100, 0, current_timestamp, current_timestamp);

insert into sym_trigger_router
(trigger_id,router_id,initial_load_order, INITIAL_LOAD_BATCH_COUNT,last_update_time,create_time)
values('source__search_results','source_2_core', 200, 0, current_timestamp, current_timestamp);

主要转变:

-- Transform
insert into SYM_TRANSFORM_TABLE
  (transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name, target_table_name, delete_action, column_policy)
values
  ('source__companies__main', 'source', 'core', 'LOAD', 'companies', 'res_partner', 'DEL_ROW', 'SPECIFIED');
--  ('source__companies__main', 'source', 'core', 'EXTRACT', 'companies', 'res_partner', 'DEL_ROW', 'SPECIFIED');

insert into SYM_TRANSFORM_COLUMN
  (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION)
values
  ('source__companies__main', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 1000000;'),
  ('source__companies__main', '*', 'name', 'name', 0, 'copy', NULL),
  ('source__companies__main', '*', 'country', 'country_id', 0, 'const', '1'),
  ('source__companies__main', '*', 'city', 'city', 0, 'copy', NULL),
  ('source__companies__main', '*', 'street', 'street', 0, 'copy', NULL),
  ('source__companies__main', '*', 'phone', 'phone', 0, 'copy', NULL),
  ('source__companies__main', '*', 'websiteUrl', 'website', 0, 'copy', NULL),
  ('source__companies__main', '*', NULL, 'notification_email_send', 0, 'const', '0'),
  ('source__companies__main', '*', NULL, 'is_company', 0, 'const', '1');

insert into SYM_TRANSFORM_TABLE
  (transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name, target_table_name, delete_action, column_policy)
values
  ('source__companies__residue', 'source', 'core', 'LOAD', 'companies', 'my_res_partner_companies', 'DEL_ROW', 'SPECIFIED');
  -- ('source__companies__residue', 'source', 'core', 'EXTRACT', 'companies', 'my_res_partner_companies', 'DEL_ROW', 'SPECIFIED');

insert into SYM_TRANSFORM_COLUMN
  (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION)
values
  ('source__companies__residue', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 1000000;'),
  ('source__companies__residue', '*', 'cid', 'cid', 0, 'copy', NULL),
  ('source__companies__residue', '*', 'universalName', 'universalName', 0, 'copy', NULL),
  ('source__companies__residue', '*', 'employeeCountRange', 'employeeCountRange', 0, 'copy', NULL),
  ('source__companies__residue', '*', 'specialties', 'specialties', 0, 'copy', NULL),
  ('source__companies__residue', '*', 'twitterId', 'twitterId', 0, 'copy', NULL),
  ('source__companies__residue', '*', 'check', 'check', 0, 'copy', NULL),
  ('source__companies__residue', '*', 'date', 'date', 0, 'copy', NULL);


insert into SYM_TRANSFORM_TABLE
  (transform_id, source_node_group_id, target_node_group_id, transform_point,
   source_table_name, target_table_name, delete_action, column_policy)
values
  ('source__search_results__main', 'source', 'core', 'LOAD', 'search_results', 'res_partner', 'DEL_ROW', 'SPECIFIED');
  -- ('source__search_results__main', 'source', 'core', 'EXTRACT', 'search_results', 'res_partner', 'DEL_ROW', 'SPECIFIED');

insert into SYM_TRANSFORM_COLUMN
  (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION)
values
  ('source__search_results__main', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 2000000;'),
  ('source__search_results__main', '*', 'cid', 'parent_id', 0, 'bsh', 'return Integer.parseInt(currentValue) + 1000000;'),
  ('source__search_results__main', '*', 'title', 'name', 0, 'copy', NULL),
  ('source__search_results__main', '*', 'description', 'comment', 0, 'copy', NULL),
  ('source__search_results__main', '*', NULL, 'use_parent_address', 0, 'const', '1'),
  ('source__search_results__main', '*', NULL, 'notification_email_send', 0, 'const', '0'),
  ('source__search_results__main', '*', NULL, 'is_company', 0, 'const', '0');

insert into SYM_TRANSFORM_TABLE
  (transform_id, source_node_group_id, target_node_group_id, transform_point,
   source_table_name, target_table_name, delete_action, column_policy)
values
  ('source__search_results__residue', 'source', 'core', 'LOAD', 'search_results', 'my_res_partner_search_result', 'DEL_ROW', 'SPECIFIED');
  -- ('source__search_results__residue', 'source', 'core', 'EXTRACT', 'search_results', 'my_res_partner_search_result', 'DEL_ROW', 'SPECIFIED');

insert into SYM_TRANSFORM_COLUMN
  (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION)
values
  ('source__search_results__residue', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 2000000;'),
  ('source__search_results__residue', '*', 'link', 'link', 0, 'copy', NULL),
  ('source__search_results__residue', '*', 'raw', 'raw', 0, 'copy', NULL),
  ('source__search_results__residue', '*', 'date', 'date', 0, 'copy', NULL);

简化转换:

-- Transform
insert into SYM_TRANSFORM_TABLE
  (transform_id, source_node_group_id, target_node_group_id, transform_point, source_table_name, target_table_name, delete_action, column_policy)
values
  ('source__companies__main', 'source', 'core', 'LOAD', 'companies', 'res_partner', 'DEL_ROW', 'SPECIFIED');
--  ('source__companies__main', 'source', 'core', 'EXTRACT', 'companies', 'res_partner', 'DEL_ROW', 'SPECIFIED');

insert into SYM_TRANSFORM_COLUMN
  (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION)
values
  ('source__companies__main', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 1000000;'),
  ('source__companies__main', '*', 'name', 'name', 0, 'copy', NULL),
  ('source__companies__main', '*', 'country', 'country_id', 0, 'const', '1'),
  ('source__companies__main', '*', 'city', 'city', 0, 'copy', NULL),
  ('source__companies__main', '*', 'street', 'street', 0, 'copy', NULL),
  ('source__companies__main', '*', 'phone', 'phone', 0, 'copy', NULL),
  ('source__companies__main', '*', 'websiteUrl', 'website', 0, 'copy', NULL),
  ('source__companies__main', '*', NULL, 'notification_email_send', 0, 'const', '0'),
  ('source__companies__main', '*', NULL, 'is_company', 0, 'const', '1');


insert into SYM_TRANSFORM_TABLE
  (transform_id, source_node_group_id, target_node_group_id, transform_point,
   source_table_name, target_table_name, delete_action, column_policy)
values
  ('source__search_results__main', 'source', 'core', 'LOAD', 'search_results', 'res_partner', 'DEL_ROW', 'SPECIFIED');
  -- ('source__search_results__main', 'source', 'core', 'EXTRACT', 'search_results', 'res_partner', 'DEL_ROW', 'SPECIFIED');

insert into SYM_TRANSFORM_COLUMN
  (transform_id, include_on, source_column_name, target_column_name, pk, transform_type, TRANSFORM_EXPRESSION)
values
  ('source__search_results__main', '*', 'id', 'id', 1, 'bsh', 'return Integer.parseInt(currentValue) + 2000000;'),
  ('source__search_results__main', '*', 'cid', 'parent_id', 0, 'bsh', 'return Integer.parseInt(currentValue) + 1000000;'),
  ('source__search_results__main', '*', 'title', 'name', 0, 'copy', NULL),
  ('source__search_results__main', '*', 'description', 'comment', 0, 'copy', NULL),
  ('source__search_results__main', '*', NULL, 'use_parent_address', 0, 'const', '1'),
  ('source__search_results__main', '*', NULL, 'notification_email_send', 0, 'const', '0'),
  ('source__search_results__main', '*', NULL, 'is_company', 0, 'const', '0');

核心设置:

update sym_channel set DATA_LOADER_TYPE = 'postgres_bulk' where channel_id = 'reload';

看起来像symmetric插入COPY一条一条记录,主要转换(LOAD 和 EXTRACT)和简化转换(LOAD 和 EXTRACT)。

4

2 回答 2

6

PostgresBulkDataLoaderFactory 是您的答案。

如果您只想将批量写入器用于初始加载和重新加载,我建议您将重新加载通道配置为仅使用批量写入器。

在您的频道表(默认为 sym_channel)上,将重新加载频道 data_loader_type 列更新为“postgres_bulk”。

用户指南简要解释了如何实现 DatabaseWriters。

于 2014-01-15T13:43:37.343 回答
0

在 postgresql 中,大型数据集的良好插入性能的关键是在尽可能少的事务中完成所有操作。默认情况下,postgresql 在每次插入时使用自动提交,因此每次插入它本身就是一个事务。只需用 begin 包装所有插入;犯罪; 可以提供更快的吞吐量。

因此,复制速度更快的原因是双重的。1:一个大事务,2:不解析单个插入语句。解析开销很小,但相对而言,事务开销对于单个语句来说是巨大的。

有没有办法让它做到这一点:

begin;
insert ...;
insert ...;
insert ...;
commit;
于 2014-01-14T22:13:52.047 回答