利用sqoop從數據源獲取數據到hive的流程化


  在流程化中,我不太清楚其他朋友是怎么做的,這里參考我司的數據倉庫的調度邏輯將sqoop腳本放到shell里執行,最終結果能基本解決后期日常維護、代碼重用的需求;注意這里只討論從數據源獲取數據到hive而不包括在hive中對數據的處理腳本,但是是值得參考的

目標
 根據需要每天從業務庫導入前一天的數據,在hive中生成一個日表,格式與數據源保持一致即可,比如今天2015年5月11日,那么我們將在凌晨從數據源中獲取的2015年5月10日的數據;這里hive當做一個數據倉庫來使用,先將數據原封不動的抽取過來,相當於三層結構中的ods層。

測試環境

    數據源:MS sqlserver 2012
hadoop-2.5.0 hive-0.13 sqoop-1.4.5
對應的數據庫為fengmingdw

1、數據源的配置文件  

  關於配置文件的存在是為了將所用相關的屬性整合到同一個文件,每一個需要對應屬性的腳本,只要到里面去讀就可以了;在設計過程中這里提供了兩種方法,方法一可以參考一下,建議直接方法二

方法一,自定義配置文件

配置文件格式可以自選,包括諸如xml、ini等等,考慮到shell實現的困難,這里選取最簡單的
形如xxx=aaaa ,這是linux中通用的配置文件方式
#文件命名為datawarehouse
ip=192.168.73.12 #注意等號兩邊不要有空格
username=testUser
passwd=123456
dbname=fdw
connectURL="jdbc:sqlserver://`echo $ip`;DatabaseName=`echo $dbname`"

讀取該文件的方法參考如下

function load_config()
{
filePath="/home/9003547/datasource.d/`echo $1`" #這是我存放的配置文件路徑
echo $filePath
if [ -f "$filePath" ]; then
. $filePath
fi
}
load_config datawarehouse #傳入配置文件名稱
echo $ip #讀取的參數
echo $connectURL

在對應的腳本中加入此代碼就可以獲取到ip、connectURL等配置在datawarehouse中的值

方法二,options-file參數

sqoop中提供了一個選項 --options-file,允許sqoop去讀取一段文件內的腳本
這里提供一個文件,路徑為 /home/9003547/sqooppath/fdw.txt,該文件內容如下
import 
--connect
jdbc:sqlserver://192.168.73.12:1433;DatabaseName=fdw
--username
testUser
--password
123456

在sqoop中可以使用類似語句就可以重復引用文件fdw.txt中的信息

sqoop --options-file /home/9003547/sqooppath/fdw.txt \
--table myTestTable \
--xxxxxx #后面補充相應的參數

  這種情況,對於大量的數據來源庫可能需要自己維護,十幾二十可以用,但是更多的就可能有點乏力了,事實上不管有多少數據源都建議給予一個良好的命名規范,至少能要讓使用者能夠看命名就知道內容

shell中的sqoop與hive腳本

  在這里做簡單的設置,包括傳入一個日期參數作為表名、將hive中存在的表名給刪除、sqoop抽取表數據

日期參數的設置

  因為是每天一個表存放數據,所以采取的方式是表名+日期,這里設置日期參數

#定義時間,允許傳入一個時間,如果未傳遞則默認當前日期的前一天
if [ -z $1 ]
then
d_date=`date +%Y%m%d --date="-1 day"`
else
d_date=$1
fi
tableName=testTable_$d_date

shell中的hive

  在hive中提供了臨時訪問hive終端的方式,即使用 【hive -e “語句塊”】,也可以通過【hive -f 文件名】 去執行hive腳本文件
  在hive中的參數定義,可以直接使用shell中的參數(下面的判斷代碼塊既是如此)

#判斷hive中的表是否存在,存在就刪掉,以支持重復抽取
hive -e "drop table if exists dino.$tableName" #指定了庫名

  另一種參數使用方式是利用hiveconf,但這個不能通過hive -e這種方式去使用,只能在文件中去引用
格式如下
hive -hiveconf variable1='aaa' -hiveconf variable2='bbb' -f xxx.hql
xxx.hql 內容如下,這里只為了說明參數的引用
select * from testTable where col1 betweeb '${hiveconf:variable1}' and '${hiveconf:variable2}'

shell中的sqoop

#sqoop  抽取數據,這里是全表抽取,即沒有加where和column參數
sqoop --options-file /home/9003547/sqooppath/fengmingdw.txt \
--table $tableName \
--delete-target-dir --target-dir /user/hive/tmp/$tableName \
--hive-import --hive-table dino.$tableName \
--create-hive-table \
--hive-overwrite \
--null-non-string '\\N' \
--null-string '\\N' \
--fields-terminated-by '\t' \
--lines-terminated-by '\n' \
--m 1

完整的腳本將放在本文的最后面,這是一些簡單的屬性說明

1、target-dir 指的是hdfs上的目錄,需要當前用戶具有讀寫權限,否則會被拒絕
2$tableName 是在shell中預先定義的表名,該表名由兩部分組成 表名+日期,中間使用下划線分開
3、hive中的null值,在rdbms上的null值在hive中會直接轉換成字符串“null”,如果不需要可以使用null-non-stringnull-string兩個參數將其替換掉,這里將其轉換為hive中的空值,即‘\N’,記得轉義
4、如果不是一張表,比如只要某些列並限定條,可以使用--columns "col1,col2" --where "col1>2 and col2<10"類似參數

sqoop結果的記錄

  在shell中,我們可以把運行的日志利用符號【>>】將結果指向到指定的文件中,但是如果腳本太多,那么把每一個腳本的記錄都寫入同一個hive表或者rdbms中的表是一個比較好的選擇
  這里提供一個hive中的建表語句

create table dino.etl_table2 (
etl_name string comment 'sqoop的job名,可以用table_name替代'
,table_name string comment '表名,如果MyTestTabel_20150511則MyTestTable'
,etl_date string comment '取日期,如果MyTestTabel_20150511則20150511'
,start_time string comment 'sqoop腳本的開始時間'
,end_time string comment 'sqoop腳本的結束時間'
,etl_rows int comment '保留字段,抽取的數據行數,hive中不知道怎么獲取'
,etl_hours int comment '保留字段,抽取的具體小時時間點'
)
comment 'table for keeping etl audit record'
row format delimited
fields terminated by '\t'
lines terminated by '\n'
stored as textfile;

  這里的保留字段是因為在之前的RDBMS中存在的,保留因為后續可能有需要export到RDBMS,這里根據各自需求可以自行建表
  該表的意義是在每次sqoop之后往hive中寫入一個記錄,相關的腳本如下
  首先在路徑/home/9003547/sqooppath下建了一個sh文件,命名為hive_etl_record.sh

if [ $# != 5 ];then
echo 'must be provide 5 paras'
echo $#
exit 2
fi
echo "傳入的參數值為以下"
for arg in "$@"
do
echo $arg
done
#參數賦值
table_name=$1
etl_date=$2
etl_status=$3
start_time=$4
etl_errors=$5

hive -e "
insert into table dino.view_etl_audit
select '$table_name'as etl_name
,'$table_name'as etl_table
,'$etl_date' as etl_date
,$etl_status as etl_status
,'$start_time' as start_time
,from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss')as end_time
,'$etl_errors'as etl_errors
,'\\N' as etl_rows
,'\\N'as etl_hour
from dino.dual;
"

注:dual表是一個自建的表,相當於oracle中的dual,只有一列屬性為string,並且值只有一個為”x”
這里寫圖片描述
這個表的存在只是為了在select一些自給的值的時候使用,只要注意里面只有一行數據就可以了
這個hive_etl_record.sh的存在是為了實現代碼重用,在實際的sqoop中可以方便的直接傳遞參數。下面是使用這個文件的實際方法

#判斷sqoop執行結果,緊跟在上面的sqoop語句之后
if [$? -ne 0];then
etl_status=0
etl_error="errors"
else
etl_status=1
etl_error="\\N"
fi

#日志寫入到hive中,參數順序對應說明
#table_name=$1
#etl_date=$2
#etl_status=$3
#start_time=$4
#etl_errors=$5

./home/9003547/sqooppath/hive_etl_record.sh "$table" "$d_date" "$etl_status" start_date" "$start_date" "$etl_errors"

完整的代碼

在上述中,更多的是對零散代碼的說明,已經貼出了

option-file中的代碼  fdw.txt
hive_etl_record.sh的配置

現在貼出一個完整的使用腳本

#!/bin/bash
. ~/.bashrc
#這里要用戶自己填寫的,有兩個地方,第一個是設置tableName的設置,第二個是sqoop語句要自己看表看需求來寫
#使用說明
# 抽取指定的表名的數據
# 如果用戶在執行該腳本的時候不傳入日期參數則默認為當前日期的前一天進行
# 否則為用戶指定的日期,格式為yyyyMMdd,如要抽取2015-05-10的數據,那么執行的方式應該是如下
# sh 腳本名 20150510

#聲明獲取的數據表名
table=ods_crm_rolegroup

logfile=/home/9003547/sqooplog/$table.txt
#日期
#得到業務日期,如果沒有傳入日期參數,則由系統直接賦值
if [ -z $1 ]
then
d_date=`date +%Y%m%d --date="-1 day"`
else
d_date=$1
fi
echo $d_date >> $logfile

d_date_format=`echo ${d_date} | awk '{print substr($d_date,1,4)"-"substr($0,5,2)"-"substr($0,7,2)}'`
echo ${d_date_format} >> $logfile

start_date=`date '+%Y-%m-%d %H:%M:%S'`
echo ${start_date} >> $logfile
tableName=$table"_"$d_date
echo $tableName
#判斷表在hive中是否存在,存在就刪除,這里hive中的庫名為dino
hive -e "drop table if exists dino.$tableName"

#sqoop 抽取數據,這里全表抽取
sqoop --options-file /home/9003547/sqooppath/fdw.txt \
--table $tableName \
--delete-target-dir --target-dir /user/hive/tmp/$tableName \
--hive-import --hive-table dino.$tableName \
--hive-drop-import-delims --create-hive-table \
--hive-overwrite \
--null-non-string '\\N' \
--null-string '\\N' \
--fields-terminated-by '\t' \
--lines-terminated-by '\n' \
--m 1

#判斷sqoop執行結果,etl_status為0表示失敗,為1表示成功,由於這里無法獲取到錯誤信息,只能以"errors"代替
if [ $? -ne 0 ];then
etl_status=0
etl_error="errors"
echo "執行失敗" >> $logfile
else
etl_status=1
#etl_error="\\N"
echo "執行成功" >> $logfile
fi
#日志寫入到hive中,參數順序對應說明

#table_name=$1
#etl_date=$2
#etl_status=$3
#start_time=$4
#etl_errors=$5
#write audit of etl's recored into hive's table

sh /home/9003547/sqooppath/hive_etl_record.sh "$table" "$d_date" "$etl_status" "$start_date" "$etl_errors"

以上就是完整的腳本說明
  這個東西的調度方式可以采用linux自帶的crontab,但是如果有條件最好還是弄一個可視化圖形進行操作,包括作業的順序、重跑等,在系統后台維護各個作業的依賴關系,這樣對所有的作業維護才是最輕松的,具體的話這個設計到web或者客戶端開發,需要更專業的同事實現

可能的優化

  這個腳本只是按照最簡單的方式去處理,如果而已嘗試着用python直接將sqoop語句寫進去會更好,這樣對日志輸出又或者配置文件維護都會更方便一點,完善的地方包括但不限於以下內容
1、明細日志輸出到文件
2、sqoop腳本錯誤結果獲取存入庫中,這樣每天不但不用翻明細的日志文件,也可以直接在hive中select出來
3、如果可以將現在寫在hive中的日志記錄寫入RDBMS,速度快一點也不需要在export回去,最直接的就是寫入mysql,因為hive的元數據也存儲在這里呀,現成的數據庫可以使用。

內容很簡單,分享出來一起學習,如果有做改進,我會第一時間修改本文以提供最新的進展,如果看到的朋友有建議也請留言交流
关注微信公众号

注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2020 ITdaan.com