Python自動化拉取Mysql數據並裝載到Hive(V2.0)


上一版本http://blog.csdn.net/babyfish13/article/details/70792158
相較於之前的版本,主要作了兩方面的優化:數據字符回車換行導致裝載錯誤的問題修復及跑批調用參數等方面的優化
此版本,主要考慮了串行調度、全量加載,並行、增量裝載將在下一個版本中優化。

1、數據裝載的表級參數配置文件
/Users/nisj/PycharmProjects/BiDataProc/Mysql2Hive-Auto/batchPar.conf
# BatchParConfig file  
----------------------------------
# srcMysql_config=srcMysqlConfig_jellyfish_server, src_tabName='game_zone', tabType='single'
# srcMysql_config=srcMysqlConfig_jellyfish_server, src_tabName='live_history_status', tabType='submeter-256'
srcMysql_config=srcMysqlConfig_jellyfish_user, src_tabName='user_profile', tabType='submeter-256'
# srcMysql_config=srcMysqlConfig_jellyfish_seed, src_tabName='room', tabType='single'
# srcMysql_config=srcMysqlConfig_jellyfish_user, src_tabName='user_id_card', tabType='single'
srcMysql_config=srcMysqlConfig_jellyfish_seed, src_tabName='game', tabType='single'


srcMysql_config=srcMysqlConfig_jellyfish_event, src_tabName='match_apply', tabType='single'
# srcMysql_config=srcMysqlConfig_jellyfish_server, src_tabName='user_daily_sign_record', tabType='single'
# srcMysql_config=srcMysqlConfig_jellyfish_event, src_tabName='event_online_count', tabType='single'
# srcMysql_config=srcMysqlConfig_jellyfish_event, src_tabName='event_award_201611', tabType='single'
srcMysql_config=srcMysqlConfig_jellyfish_hadoop_stat, src_tabName='room_group', tabType='single'

2、數據裝載的系統級參數配置腳本
/Users/nisj/PycharmProjects/BiDataProc/Mysql2Hive-Auto/systemParGet.py
# -*- coding=utf-8 -*-
import warnings
import datetime

warnings.filterwarnings("ignore")

# src Database config
srcMysqlConfig_Tv_server = {
'host': 'MysqlHostInnerIp',
# 'host': 'MysqlHostOuterIp',
'user': 'MysqlUser',
'passwd': 'MysqlPass',
'port': 50506,
'db': 'Tv_server'
}

srcMysqlConfig_Tv_user = {
'host': 'MysqlHostInnerIp',
# 'host': 'MysqlHostOuterIp',
'user': 'MysqlUser',
'passwd': 'MysqlPass',
'port': 50514,
'db': 'Tv_user'
}

srcMysqlConfig_Tv_seed = {
'host': 'MysqlHostInnerIp',
# 'host': 'MysqlHostOuterIp',
'user': 'MysqlUser',
'passwd': 'MysqlPass',
'port': 50029,
'db': 'Tv_seed'
}

srcMysqlConfig_Tv_event = {
'host': 'MysqlHostInnerIp',
# 'host': 'MysqlHostOuterIp',
'user': 'MysqlUser',
'passwd': 'MysqlPass',
'port': 50512,
'db': 'Tv_event'
}

srcMysqlConfig_Tv_hadoop_stat = {
'host': 'MysqlHostInnerIp',
# 'host': 'MysqlHostOuterIp',
'user': 'MysqlUser',
'passwd': 'MysqlPass',
'port': 6605,
'db': 'Tv_hadoop_stat'
}

tmp_data_dir = '/home/hadoop/nisj/Mysql2Hive-Auto/tmp_data'

def getNowDay():
DayNow = datetime.datetime.today().strftime('%Y-%m-%d')
return DayNow

def dateRange(beginDate, endDate):
dates = []
dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d")
date = beginDate[:]
while date <= endDate:
dates.append(date)
dt = dt + datetime.timedelta(1)
date = dt.strftime("%Y-%m-%d")
return dates

def getSrcMysqlConfig(srcMysql_config):
srcMysql_config = srcMysql_config
return srcMysql_config['host'], srcMysql_config['port'], srcMysql_config['user'], srcMysql_config['passwd'], srcMysql_config['db']

# print getSrcMysqlConfig(srcMysql_config=srcMysqlConfig_Tv_server)

3、源表結構及元數據信息的獲取
/Users/nisj/PycharmProjects/BiDataProc/Mysql2Hive-Auto/getSrcMetadata.py
# -*- coding=utf-8 -*-
import os
import re
from systemParGet import *

warnings.filterwarnings("ignore")


def mysqlTabCreateScript(srcMysql_config, src_tabName, tabType):
# 參數初始化賦值
host = getSrcMysqlConfig(srcMysql_config)[0]
port = getSrcMysqlConfig(srcMysql_config)[1]
user = getSrcMysqlConfig(srcMysql_config)[2]
passwd = getSrcMysqlConfig(srcMysql_config)[3]
db = getSrcMysqlConfig(srcMysql_config)[4]

if tabType == 'single':
srcTabName = src_tabName
elif 'submeter' in tabType:
srcTabName = src_tabName + "_0"

srcTabStructure = os.popen("""source /etc/profile; \
/usr/bin/mysql -h{host} -P{port} -u{user} -p{passwd} -D{db} \
-N -e"set names utf8; \
select a2.column_name,case when a2.data_type like '%int' then 'bigint' else 'string' end data_type
from information_schema.TABLES a1
left join information_schema.columns a2 on a1.TABLE_SCHEMA=a2.TABLE_SCHEMA and a1.TABLE_NAME=a2.TABLE_NAME
where a1.TABLE_SCHEMA='{db}' and a1.table_name ='{srcTabName}'
order by a2.ORDINAL_POSITION;" \
""" .format(host=host, port=port, user=user, passwd=passwd, db=db, srcTabName=srcTabName)).readlines();

srcTabCol_list = []
for stcList in srcTabStructure:
stc = re.split('\t', stcList.replace('\n', ''))
srcTabCol_list.append(stc)
TabCreateScript = 'drop table if exists xxx_{src_tabName};\ncreate table xxx_{src_tabName}(\n'.format(src_tabName=src_tabName)
colList = []
for srcColType in srcTabCol_list:
TabCreateScript = TabCreateScript + '\`' + srcColType[0] + '\`' + ' ' + srcColType[1] + ',\n'
colList.append(srcColType[0])

TabCreateScript = TabCreateScript[:-2]+")row format delimited fields terminated by '\t' lines terminated by '\n';;"
return TabCreateScript, colList

# Batch Test

4、hive庫表結構的創建
/Users/nisj/PycharmProjects/BiDataProc/Mysql2Hive-Auto/hiveTabCreate.py
# -*- coding=utf-8 -*-
from getSrcMetadata import *

warnings.filterwarnings("ignore")

def HiveCreateTab(srcMysql_config, src_tabName, tabType):
TabCreateScript = mysqlTabCreateScript(srcMysql_config, src_tabName, tabType)[0]
os.system("""/usr/lib/hive-current/bin/hive -e "{TabCreateScript}" """.format(TabCreateScript=TabCreateScript))

# Batch Test

5、Mysql數據向Hive裝載
/Users/nisj/PycharmProjects/BiDataProc/Mysql2Hive-Auto/mysqlData2Hive.py
# -*- coding=utf-8 -*-
from getSrcMetadata import *

warnings.filterwarnings("ignore")


def mysqlDataDownload(srcMysql_config, src_tabName, tabType):
# 參數初始化賦值
host = getSrcMysqlConfig(srcMysql_config)[0]
port = getSrcMysqlConfig(srcMysql_config)[1]
user = getSrcMysqlConfig(srcMysql_config)[2]
passwd = getSrcMysqlConfig(srcMysql_config)[3]
db = getSrcMysqlConfig(srcMysql_config)[4]

if os.path.exists('{tmp_data_dir}/'.format(tmp_data_dir=tmp_data_dir)) == False:
os.system('mkdir -p {tmp_data_dir}/'.format(tmp_data_dir=tmp_data_dir))

colList = mysqlTabCreateScript(srcMysql_config, src_tabName, tabType)[1]
allColChars = ''
for colName in colList:
allColChars = allColChars + 'replace(replace(replace(replace(\`' + colName + '\`,\',\',\'[comma]\'),\'\\n\',\'[newline-n]\'),\'\\r\',\'[newline-r]\'),\'\\t\',\'[tab]\')' + ','

if tabType == 'single':
submeter_cnt = 1
elif 'submeter' in tabType:
submeter_cnt = int(str(tabType).replace('submeter-', ''))

os.system("rm -rf {tmp_data_dir}/xxx_{src_tabName}.txt".format(src_tabName=src_tabName, tmp_data_dir=tmp_data_dir))
for submeterPlus in range(0, submeter_cnt, 1):
if submeter_cnt == 1:
submeterPlus = ''
else:
submeterPlus = "_" + str(submeterPlus)

os.system("""source /etc/profile; \
/usr/bin/mysql -h{host} -P{port} -u{user} -p{passwd} -D{db} \
-N -e"set names utf8; \
select {allColChars} from {db}.{src_tabName}{submeterPlus} ;" \
>>{tmp_data_dir}/xxx_{src_tabName}.txt \
""".format(host=host, port=port, user=user, passwd=passwd, db=db, src_tabName=src_tabName, tmp_data_dir=tmp_data_dir, allColChars=allColChars[:-1], submeterPlus=submeterPlus))

def DataUploadHive(src_tabName):
os.system("""/usr/lib/hive-current/bin/hive -e "load data local inpath '{tmp_data_dir}/xxx_{src_tabName}.txt' overwrite into table xxx_{src_tabName};" """.format(src_tabName=src_tabName, tmp_data_dir=tmp_data_dir))
os.system("rm -rf {tmp_data_dir}/xxx_{src_tabName}.txt ".format(src_tabName=src_tabName, tmp_data_dir=tmp_data_dir))

def MysqlData2hive(srcMysql_config, src_tabName, tabType):
mysqlDataDownload(srcMysql_config, src_tabName, tabType)
DataUploadHive(src_tabName)

# Batch Test

6、串行總控調度
/Users/nisj/PycharmProjects/BiDataProc/Mysql2Hive-Auto/mysql2HiveSerialCtl.py
# -*- coding=utf-8 -*-
from hiveTabCreate import *
from mysqlData2Hive import *

warnings.filterwarnings("ignore")


def Mysql2Hive(srcMysql_config, src_tabName, tabType):
HiveCreateTab(srcMysql_config, src_tabName, tabType)
MysqlData2hive(srcMysql_config, src_tabName, tabType)

def Mysql2HiveCtl():
with open("batchPar.conf") as ConfigFile:
ConfigFileList = []
for ConfigLine in ConfigFile:
if '#' not in ConfigLine and ConfigLine.replace('\n', '') != '':
ConfigFileList.append(ConfigLine.replace('\n', ''))
srcMysql_config = src_tabName = tabType = ''
for ConfigLine in ConfigFileList[1:]:
for par in str(ConfigLine).split(', '):
exec(par)
Mysql2Hive(srcMysql_config=srcMysql_config, src_tabName=src_tabName, tabType=tabType)


# batch_test
Mysql2HiveCtl()


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2020 ITdaan.com