基於OGG Datahub外掛將Oracle資料同步上雲

NO IMAGE
1 Star2 Stars3 Stars4 Stars5 Stars 給文章打分!
Loading...

一、背景介紹

隨著資料規模的不斷擴大,傳統的RDBMS難以滿足OLAP的需求,

OGG(Oracle GoldenGate)是一個基於日誌的結構化資料備份工具,一般用於Oracle資料庫之間的主從備份以及Oracle資料庫到其他資料庫(DB2, MySQL等)的同步。下面是Oracle官方提供的一個OGG的整體架構圖,從圖中可以看出OGG的部署分為源端和目標端兩部分組成,主要有Manager,Extract,Pump,Collector,Replicat這麼一些元件。

  • Manager:在源端和目標端都會有且只有一個Manager程序存在,負責管理其他程序的啟停和監控等;
  • Extract:負責從源端資料庫表或者事務日誌中捕獲資料,有初始載入和增量同步兩種模式可以配置,初始載入模式是直接將源表資料同步到目標端,而增量同步就是分析源端資料庫的日誌,將變動的記錄傳到目標端,本文介紹的是增量同步的模式;
  • Pump:Extract從源端抽取的資料會先寫到本地磁碟的Trail檔案,Pump程序會負責將Trail檔案的資料投遞到目標端;
  • Collector:目標端負責接收來自源端的資料,生成Trail檔案
  • Replicat:負責讀取目標端的Trail檔案,轉化為相應的DDL和DML語句作用到目標資料庫,實現資料同步。

本文介紹的Oracle資料同步是通過OGG的Datahub外掛實現的,該Datahub外掛在架構圖中處於Replicat的位置,會分析Trail檔案,將資料的變化記錄寫入Datahub中,可以使用流計算對datahub中的資料進行實時分析,也可以將資料歸檔到MaxCompute中進行離線處理。

二、安裝步驟

0. 環境要求

  • 源端已安裝好Oracle
  • 源端已安裝好OGG(建議版本Oracle GoldenGate V12.1.2.1)
  • 目標端已安裝好OGG Adapters(建議版本Oracle GoldenGate Application Adapters
    12.1.2.1)
  • java 7

(下面將介紹Oracle/OGG相關安裝和配置過程,Oracle的安裝將不做介紹,另外需要注意的是:Oracle/OGG相關引數配置以熟悉Oracle/OGG的運維人員配置為準,本示例只是提供一個可執行的樣本,Oracle所使用的版本為ORA11g)

1. 源端OGG安裝
下載OGG安裝包解壓後有如下目錄:

drwxr-xr-x install
drwxrwxr-x response
-rwxr-xr-x runInstaller
drwxr-xr-x stage

目前oracle一般採取response安裝的方式,在response/oggcore.rsp中配置安裝依賴,具體如下:

oracle.install.responseFileVersion=/oracle/install/rspfmt_ogginstall_response_schema_v12_1_2
# 需要目前與oracle版本對應
INSTALL_OPTION=ORA11g
# goldegate主目錄
SOFTWARE_LOCATION=/home/oracle/u01/ggate
# 初始不啟動manager
START_MANAGER=false
# manger埠
MANAGER_PORT=7839
# 對應oracle的主目錄
DATABASE_LOCATION=/home/oracle/u01/app/oracle/product/11.2.0/dbhome_1
# 暫可不配置
INVENTORY_LOCATION=
# 分組(目前暫時將oracle和ogg用同一個賬號ogg_test,實際可以給ogg單獨賬號)
UNIX_GROUP_NAME=oinstall

執行命令:

runInstaller -silent -responseFile {YOUR_OGG_INSTALL_FILE_PATH}/response/oggcore.rsp

本示例中,安裝後OGG的目錄在/home/oracle/u01/ggate,安裝日誌在/home/oracle/u01/ggate/cfgtoollogs/oui目錄下,當silentInstall{時間}.log檔案裡出現如下提示,表明安裝成功:

The installation of Oracle GoldenGate Core was successful.

執行/home/oracle/u01/ggate/ggsci命令,並在提示符下鍵入命令:CREATE SUBDIRS,從而生成ogg需要的各種目錄(dir打頭的那些)。
至此,源端OGG安裝完成。

2. 源端Oracle配置
以dba分身進入sqlplus:sqlplus / as sysdba

# 建立獨立的表空間
create tablespace ATMV datafile '/home/oracle/u01/app/oracle/oradata/uprr/ATMV.dbf' size 100m autoextend on next 50m maxsize unlimited;

# 建立ogg_test使用者,密碼也為ogg_test
create user ogg_test identified by ogg_test default tablespace ATMV;

# 給ogg_test賦予充分的許可權
grant connect,resource,dba to ogg_test;

# 檢查附加日誌情況
Select SUPPLEMENTAL_LOG_DATA_MIN, SUPPLEMENTAL_LOG_DATA_PK, SUPPLEMENTAL_LOG_DATA_UI, SUPPLEMENTAL_LOG_DATA_FK, SUPPLEMENTAL_LOG_DATA_ALL from v$database;

# 增加資料庫附加日誌及回退
alter database add supplemental log data;
alter database add supplemental log data (primary key, unique,foreign key) columns;
# rollback
alter database drop supplemental log data (primary key, unique,foreign key) columns;
alter database drop supplemental log data;

# 全欄位模式,注意:在該模式下的delete操作也只有主鍵值
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
# 開啟資料庫強制日誌模式
alter database force logging;
# 執行marker_setup.sql 指令碼
@marker_setup.sql
# 執行@ddl_setup.sql
@ddl_setup.sql
# 執行role_setup.sql
@role_setup.sql
# 給ogg使用者賦權
grant GGS_GGSUSER_ROLE to ogg_test;
# 執行@ddl_enable.sql,開啟DDL trigger
@ddl_enable.sql
# 執行優化指令碼
@ddl_pin ogg_test
# 安裝sequence support
@sequence.sql
#
alter table sys.seq$ add supplemental log data (primary key) columns;

3. OGG源端mgr配置
以下是通過ggsci對ogg進行配置

配置mgr
edit params mgr

PORT 7839
DYNAMICPORTLIST  7840-7849
USERID ogg_test, PASSWORD ogg_test
PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45
PURGEDDLHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7
PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7

啟動mgr(執行日誌在ggate/dirrpt中)

start mgr

檢視mgr狀態

info mgr

檢視mgr配置

view params mgr

4. OGG源端extract配置
以下是通過ggsci對ogg進行配置

配置extract(名字可任取,extract是組名)
edit params extract

EXTRACT extract
SETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
DBOPTIONS   ALLOWUNUSEDCOLUMN
USERID ogg_test, PASSWORD ogg_test
REPORTCOUNT EVERY 1 MINUTES, RATE
NUMFILES 5000
DISCARDFILE ./dirrpt/ext_test.dsc, APPEND, MEGABYTES 100
DISCARDROLLOVER AT 2:00
WARNLONGTRANS 2h, CHECKINTERVAL 3m
EXTTRAIL ./dirdat/st, MEGABYTES 200
DYNAMICRESOLUTION
TRANLOGOPTIONS CONVERTUCS2CLOBS
TRANLOGOPTIONS RAWDEVICEOFFSET 0
DDL &
INCLUDE MAPPED OBJTYPE 'table' &
INCLUDE MAPPED OBJTYPE 'index' &
INCLUDE MAPPED OBJTYPE 'SEQUENCE' &
EXCLUDE OPTYPE COMMENT
DDLOPTIONS  NOCROSSRENAME  REPORT
TABLE     OGG_TEST.*;
SEQUENCE  OGG_TEST.*;

GETUPDATEBEFORES

增加extract程序(ext後的名字要跟上面extract對應,本例中extract是組名)
add ext extract,tranlog, begin now

刪除某廢棄程序DP_TEST
delete ext DP_TEST

新增抽取程序,每個佇列檔案大小為200m
add exttrail ./dirdat/st,ext extract, megabytes 200

啟動抽取程序(執行日誌在ggate/dirrpt中)
start extract extract
至此,extract配置完成,資料庫的一條變更可以在ggate/dirdat目錄下的檔案中看到

5. 生成def檔案
源端ggsci起來後執行如下命令,生成defgen檔案,並且拷貝到目標端dirdef下
edit params defgen

DEFSFILE ./dirdef/ogg_test.def
USERID ogg_test, PASSWORD ogg_test
table OGG_TEST.*;
在shell中執行如下命令,生成ogg_test.def
./defgen paramfile ./dirprm/defgen.prm

6. 目標端OGG安裝和配置
解壓adapter包
將源端中dirdef/ogg_test.def檔案拷貝到adapter的dirdef下

執行ggsci起來後執行如下命令,建立必須目錄
create subdirs

編輯mgr配置
edit params mgr

PORT 7839
DYNAMICPORTLIST  7840-7849
PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45
PURGEDDLHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7
PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7

啟動mgr
start mgr

7. 源端ogg pump配置
啟動ggsci後執行如下操作:

編輯pump配置
edit params pump

EXTRACT pump
RMTHOST xx.xx.xx.xx, MGRPORT 7839, COMPRESS
PASSTHRU
NUMFILES 5000
RMTTRAIL ./dirdat/st
DYNAMICRESOLUTION
TABLE      OGG_TEST.*;
SEQUENCE   OGG_TEST.*;

新增投遞程序,從某一個佇列開始投
add ext pump,exttrailsource ./dirdat/st

備註:投遞程序,每個隊檔案大小為200m
add rmttrail ./dirdat/st,ext pump,megabytes 200

啟動pump
start pump
啟動後,結合上面adapter的配置,可以在目標端的dirdat目錄下看到過來的trailfile

8. Datahub外掛安裝和配置
依賴環境:jdk1.7。
配置好JAVA_HOME, LD_LIBRARY_PATH,可以將環境變數配置到~/.bash_profile中,例如

export JAVA_HOME=/xxx/xxx/jrexx
export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$JAVA_HOME/lib/amd64:$JAVA_HOME/lib/amd64/server

修改環境變數後,請重啟adapter的mgr程序
下載datahub-ogg-plugin.tar.gz並解壓:

修改conf路徑下的javaue.properties檔案,將{YOUR_HOME}替換為解壓後的路徑

gg.handlerlist=ggdatahub

gg.handler.ggdatahub.type=com.aliyun.odps.ogg.handler.datahub.DatahubHandler
gg.handler.ggdatahub.configureFileName={YOUR_HOME}/datahub-ogg-plugin/conf/configure.xml

goldengate.userexit.nochkpt=false
goldengate.userexit.timestamp=utc

gg.classpath={YOUR_HOME}/datahub-ogg-plugin/lib/*
gg.log.level=debug

jvm.bootoptions=-Xmx512m -Dlog4j.configuration=file:{YOUR_HOME}/datahub-ogg-plugin/conf/log4j.properties -Djava.class.path=ggjava/ggjava.jar

修改conf路徑下的log4j.properties檔案,將{YOUR_HOME}替換為解壓後的路徑

修改conf路徑下的configure.xml檔案,修改方式見檔案中的註釋

<?xml version="1.0" encoding="UTF-8"?>
<configue>

    <defaultOracleConfigure>
        <!-- oracle sid, 必選-->
        <sid>100</sid>
        <!-- oracle schema, 可以被mapping中的oracleSchema覆蓋, 兩者必須有一個非空-->
        <schema>ogg_test</schema>
    </defaultOracleConfigure>

    <defalutDatahubConfigure>
        <!-- datahub endpoint, 必填-->
        <endPoint>YOUR_DATAHUB_ENDPOINT</endPoint>
        <!-- datahub project, 可以被mapping中的datahubProject, 兩者必須有一個非空-->
        <project>YOUR_DATAHUB_PROJECT</project>
        <!-- datahub accessId, 可以被mapping中的datahubAccessId覆蓋, 兩者必須有一個非空-->
        <accessId>YOUR_DATAHUB_ACCESS_ID</accessId>
        <!-- datahub accessKey, 可以被mapping中的datahubAccessKey覆蓋, 兩者必須有一個非空-->
        <accessKey>YOUR_DATAHUB_ACCESS_KEY</accessKey>
        <!-- 資料變更型別同步到datahub對應的欄位,可以被columnMapping中的ctypeColumn覆蓋 -->
        <ctypeColumn>optype</ctypeColumn>
        <!-- 資料變更時間同步到datahub對應的欄位,可以被columnMapping中的ctimeColumn覆蓋 -->
        <ctimeColumn>readtime</ctimeColumn>
        <!-- 資料變更序號同步到datahub對應的欄位, 按資料變更先後遞增, 不保證連續, 可以被columnMapping中的cidColumn覆蓋 -->
        <cidColumn>record_id</cidColumn>
<!-- 額外增加的常量列,每條record該列值為指定值,格式為c1=xxx,c2=xxx,可以被columnMapping中的constColumnMap覆蓋-->
         <constColumnMap></constColumnMap>
    </defalutDatahubConfigure>

    <!-- 預設最嚴格,不落檔案 直接退出 無限重試-->

    <!-- 執行每批上次的最多紀錄數, 可選, 預設1000-->
    <batchSize>1000</batchSize>

    <!-- 預設時間欄位轉換格式, 可選, 預設yyyy-MM-dd HH:mm:ss-->
    <defaultDateFormat>yyyy-MM-dd HH:mm:ss</defaultDateFormat>

    <!-- 髒資料是否繼續, 可選, 預設false-->
    <dirtyDataContinue>true</dirtyDataContinue>

    <!-- 髒資料檔案, 可選, 預設datahub_ogg_plugin.dirty-->
    <dirtyDataFile>datahub_ogg_plugin.dirty</dirtyDataFile>

    <!-- 髒資料檔案最大size, 單位M, 可選, 預設500-->
    <dirtyDataFileMaxSize>200</dirtyDataFileMaxSize>

    <!-- 重試次數, -1:無限重試 0:不重試 n:重試次數, 可選, 預設-1-->
    <retryTimes>0</retryTimes>

    <!-- 重試間隔, 單位毫秒, 可選, 預設3000-->
    <retryInterval>4000</retryInterval>

    <!-- 點位檔案, 可選, 預設datahub_ogg_plugin.chk-->
    <checkPointFileName>datahub_ogg_plugin.chk</checkPointFileName>

    <mappings>
        <mapping>
            <!-- oracle schema, 見上描述-->
            <oracleSchema></oracleSchema>
            <!-- oracle table, 必選-->
            <oracleTable>t_person</oracleTable>
            <!-- datahub project, 見上描述-->
            <datahubProject></datahubProject>
            <!-- datahub AccessId, 見上描述-->
            <datahubAccessId></datahubAccessId>
            <!-- datahub AccessKey, 見上描述-->
            <datahubAccessKey></datahubAccessKey>
            <!-- datahub topic, 必選-->
            <datahubTopic>t_person</datahubTopic>
            <ctypeColumn></ctypeColumn>
            <ctimeColumn></ctimeColumn>
            <cidColumn></cidColumn>
            <constColumnMap></constColumnMap>
            <columnMapping>
                <!--
                src:oracle欄位名稱, 必須;
                dest:datahub field, 必須;
                destOld:變更前資料落到datahub的field, 可選;
                isShardColumn: 是否作為shard的hashkey, 可選, 預設為false, 可以被shardId覆蓋
                isDateFormat: timestamp欄位是否採用DateFormat格式轉換, 預設true. 如果是false, 源端資料必須是long
                dateFormat: timestamp欄位的轉換格式, 不填就用預設值
                -->
                <column src="id" dest="id" isShardColumn="true"  isDateFormat="false" dateFormat="yyyy-MM-dd HH:mm:ss"/>
                <column src="name" dest="name" isShardColumn="true"/>
                <column src="age" dest="age"/>
                <column src="address" dest="address"/>
                <column src="comments" dest="comments"/>
                <column src="sex" dest="sex"/>
                <column src="temp" dest="temp" destOld="temp1"/>
            </columnMapping>

            <!--指定shard id, 優先生效, 可選-->
            <shardId>1</shardId>
        </mapping>
    </mappings>
</configue>

在ggsci下啟動datahub writer

edit params dhwriter

extract dhwriter
getEnv (JAVA_HOME)
getEnv (LD_LIBRARY_PATH)
getEnv (PATH)
CUSEREXIT ./libggjava_ue.so CUSEREXIT PASSTHRU INCLUDEUPDATEBEFORES, PARAMS "{YOUR_HOME}/datahub-ogg-plugin/conf/javaue.properties"
sourcedefs ./dirdef/ogg_test.def
table OGG_TEST.*;

新增dhwriter
add extract dhwriter, exttrailsource ./dirdat/st

啟動dhwriter
start dhwriter

三、使用場景

這裡會用一個簡單的示例來說明資料的使用方法,例如我們在Oracle資料庫有一張商品訂單表orders(oid int, pid int, num int),該表有三列,分別為訂單ID, 商品ID和商品數量。
將這個表通過OGG Datahub進行增量資料同步之前,我們需要先將源表已有的資料通過DataX同步到MaxCompute中。增量同步的關鍵步驟如下:
(1)在Datahub上建立相應的Topic,Topic的schema為(string record_id, string optype, string readtime, bigint oid_before, bigint oid_after, bigint pid_before, bigint pid_after, bigint num_before, bigint num_after);
(2)OGG Datahub的外掛按照上述的安裝流程部署配置,其中列的Mapping配置如下:

    <ctypeColumn>optype</ctypeColumn>
    <ctimeColumn>readtime</ctimeColumn>
    <columnMapping>
        <column src="oid" dest="oid_after" destOld="oid_before" isShardColumn="true"/>
        <column src="pid" dest="pid_after" destOld="pid_before"/>
        <column src="num" dest="num_after" destOld="num_before"/>
    </columnMapping>

其中optype和readtime欄位是記錄資料庫的資料變更型別和時間,optype有”I”, “D”, “U”三種取值,分別對應為“增”,“刪”,“改”三種資料變更操作。
(3)OGG Datahub外掛部署好成功執行後,外掛會源源不斷的將源表的資料變更記錄輸送至datahub中,例如我們在源訂單表中新增一條記錄(1,2,1),datahub裡收到的記錄如下:

 -------- ------------ ------------ ------------ ------------ ------------ ------------ ------------ ------------ 
| record_id | optype     | readtime   | oid_before | oid_after  | pid_before | pid_after  | num_before | num_after  |
 ------- ------------ ------------ ------------ ------------ ------------ ------------ ------------ ------------ 
| 14810373343020000 |     I          | 2016-12-06 15:15:28.000141 | NULL       | 1          | NULL       | 2          | NULL       | 1   |  

 

修改這條資料,比如把num改為20,datahub則會收到的一條變更資料記錄,如下:

 ------- ------------ ------------ ------------ ------------ ------------ ------------ ------------ ------------ 
| record_id | optype     | readtime   | oid_before | oid_after  | pid_before | pid_after  | num_before | num_after  |
 -------- ------------ ------------ ------------ ------------ ------------ ------------ ------------ ------------ 
| 14810373343080000 |     U          | 2016-12-06 15:15:58.000253 | 1          | 1          | 2          | 2          | 1          | 20         |

實時計算
在前一天的離線計算的基礎資料上,我們可以寫一個StreamCompute流計算的分析程式,很容易地對資料進行實時彙總,例如實時統計當前總的訂單數,每種商品的銷售量等。處理思路就是對於每一條到來的變更資料,可以拿到變化的數值,實時更新統計變數即可。

離線處理
為了便於後續的離線分析,我們也可以將Datahub裡的資料歸檔到MaxCompute中,在MaxCompute中建立相應Schema的表:

create table orders_log(record_id string, optype string, readtime string, oid_before bigint, oid_after bigint, pid_before bigint, pid_after bigint, num_before bigint, num_after bigint);
在Datahub上建立MaxCompute的資料歸檔,上述流入Datahub裡的資料將自動同步到MaxCompute當中。建議將同步到MaxCompute中的資料按照時間段進行劃分,比如每一天的增量資料都對應一個獨立分割槽。這樣當天的資料同步完成後,我們可以處理對應的分割槽,拿到當天所有的資料變更,而與和前一天的全量資料進行合併後,即可得到當天的全量資料。為了簡單起見,先不考慮分割槽表的情況,以2016-12-06這天的增量資料為例,假設前一天的全量資料在表orders_base裡面,datahub同步過來的增量資料在orders_log表中,將orders_base與orders_log做合併操作,可以得到2016-12-06這天的最終全量資料寫入表orders_result中。這個過程可以在MaxCompute上用如下這樣一條SQL完成。

INSERT OVERWRITE TABLE orders_result
SELECT t.oid, t.pid, t.num
FROM
(
     SELECT oid, pid, num, '0' x_record_id, 1 AS x_optype
     FROM
     orders_base 
     UNION ALL
     SELECT decode(optype,'D',oid_before,oid_after) AS oid
              , decode(optype,'D', pid_before,pid_after) AS pid
              , num_after AS num
              , record_id x_record_id
              , decode(optype, 'D', 0, 1) AS x_optype
     FROM
     orders_log
 ) t
JOIN
 (
     SELECT
     oid
     , pid
     , max(record_id) x_max_modified
     FROM
     (
     SELECT
     oid
     , pid
     , '0' record_id
     FROM
     orders_base UNION ALL SELECT
                      decode(optype,'D',oid_before,oid_after) AS oid
                      , decode(optype,'D', pid_before,pid_after) AS pid
                      , record_id
                      FROM
                      orders_log ) g
     GROUP BY oid , pid
 ) s
ON
t.oid = s.oid AND t.pid = s.pid AND t.x_record_id = s.x_max_modified AND t.x_optype <> 0;

四、常見問題

Q:目標端報錯 OGG-06551 Oracle GoldenGate Collector: Could not translate host name localhost into an Internet address.
A:目標端機器hostname在/etc/hosts裡面重新設定localhost對應的ip

Q:找不到jvm相關的so包
A:將jvm的so路徑新增到LD_LIBRARY_PATH後,重啟mgr

例如:export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$JAVA_HOME/lib/amd64:$JAVA_HOME/lib/amd64/server
Q:有了DDL語句,比如增加一列,源端ogg沒有問題,但是adapter端的ffwriter和jmswriter程序退出,且報錯: 2015-06-11 14:01:10 ERROR OGG-01161 Bad column index (5) specified for table OGG_TEST.T_PERSON, max columns = 5.
A:由於表結構改變,需要重做def檔案,將重做的def檔案放入dirdef後重啟即可

本文作者:冶善

原文連結

本文為雲棲社群原創內容,未經允許不得轉載。

相關文章

資料庫 最新文章