Flume-接入Hive數(shù)倉搭建流程
實(shí)時(shí)流接入數(shù)倉,基本在大公司都會(huì)有,在Flume1.8以后支持taildir source, 其有以下幾個(gè)特點(diǎn),而被廣泛使用:
- 使用正則表達(dá)式匹配目錄中的文件名
 - 監(jiān)控的文件中,一旦有數(shù)據(jù)寫入,F(xiàn)lume就會(huì)將信息寫入到指定的Sink
 - 高可靠,不會(huì)丟失數(shù)據(jù)
 - 不會(huì)對(duì)跟蹤文件有任何處理,不會(huì)重命名也不會(huì)刪除
 - 不支持Windows,不能讀二進(jìn)制文件。支持按行讀取文本文件
 
本文以開源Flume流為例,介紹流接入HDFS ,后面在其上面建立ods層外表。
1.1 taildir source配置
- a1.sources.r1.type = TAILDIR
 - a1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.json
 - a1.sources.r1.filegroups = f1
 - a1.sources.r1.filegroups.f1 =/opt/hoult/servers/logs/start/.*log
 
1.2 hdfs sink 配置
- a1.sinks.k1.type = hdfs
 - a1.sinks.k1.hdfs.path = /user/data/logs/start/logs/start/%Y-%m-%d/
 - a1.sinks.k1.hdfs.filePrefix = startlog.
 - # 配置文件滾動(dòng)方式(文件大小32M)
 - a1.sinks.k1.hdfs.rollSize = 33554432
 - a1.sinks.k1.hdfs.rollCount = 0
 - a1.sinks.k1.hdfs.rollInterval = 0
 - a1.sinks.k1.hdfs.idleTimeout = 0
 - a1.sinks.k1.hdfs.minBlockReplicas = 1
 - # 向hdfs上刷新的event的個(gè)數(shù)
 - a1.sinks.k1.hdfs.batchSize = 100
 - # 使用本地時(shí)間
 - a1.sinks.k1.hdfs.useLocalTimeStamp = true
 
1.3 Agent的配置
- a1.sources = r1
 - a1.sinks = k1
 - a1.channels = c1
 - # taildir source
 - a1.sources.r1.type = TAILDIR
 - a1.sources.r1.positionFile = /opt/hoult/servers/conf/startlog_position.json
 - a1.sources.r1.filegroups = f1
 - a1.sources.r1.filegroups.f1 = /user/data/logs/start/.*log
 - # memorychannel
 - a1.channels.c1.type = memory
 - a1.channels.c1.capacity = 100000
 - a1.channels.c1.transactionCapacity = 2000
 - # hdfs sink
 - a1.sinks.k1.type = hdfs
 - a1.sinks.k1.hdfs.path = /opt/hoult/servers/logs/start/%Y-%m-%d/
 - a1.sinks.k1.hdfs.filePrefix = startlog.
 - # 配置文件滾動(dòng)方式(文件大小32M)
 - a1.sinks.k1.hdfs.rollSize = 33554432
 - a1.sinks.k1.hdfs.rollCount = 0
 - a1.sinks.k1.hdfs.rollInterval = 0
 - a1.sinks.k1.hdfs.idleTimeout = 0
 - a1.sinks.k1.hdfs.minBlockReplicas = 1
 - # 向hdfs上刷新的event的個(gè)數(shù)
 - a1.sinks.k1.hdfs.batchSize = 1000
 - # 使用本地時(shí)間
 - a1.sinks.k1.hdfs.useLocalTimeStamp = true
 - # Bind the source and sink to the channel
 - a1.sources.r1.channels = c1
 - a1.sinks.k1.channel = c1
 
/opt/hoult/servers/conf/flume-log2hdfs.conf
1.4 啟動(dòng)
- flume-ng agent --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console
 - export JAVA_OPTS="-Xms4000m -Xmx4000m -Dcom.sun.management.jmxremote"
 - # 要想使配置文件生效,還要在命令行中指定配置文件目錄
 - flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-log2hdfs.conf -name a1 -Dflume.roog.logger=INFO,console
 
要$FLUME_HOME/conf/flume-env.sh加下面的參數(shù),否則會(huì)報(bào)錯(cuò)誤如下:
1.5 使用自定義攔截器解決Flume Agent替換本地時(shí)間為日志里面的時(shí)間戳
使用netcat source → logger sink來測(cè)試
- # a1是agent的名稱。source、channel、sink的名稱分別為:r1 c1 k1
 - a1.sources = r1
 - a1.channels = c1
 - a1.sinks = k1
 - # source
 - a1.sources.r1.type = netcat
 - a1.sources.r1.bind = linux121
 - a1.sources.r1.port = 9999
 - a1.sources.r1.interceptors = i1
 - a1.sources.r1.interceptors.i1.type = com.hoult.flume.CustomerInterceptor$Builder
 - # channel
 - a1.channels.c1.type = memory
 - a1.channels.c1.capacity = 10000
 - a1.channels.c1.transactionCapacity = 100
 - # sink
 - a1.sinks.k1.type = logger
 - # source、channel、sink之間的關(guān)系
 - a1.sources.r1.channels = c1
 - a1.sinks.k1.channel = c1
 
攔截器主要代碼如下:
- public class CustomerInterceptor implements Interceptor {
 - private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");
 - @Override
 - public void initialize() {
 - }
 - @Override
 - public Event intercept(Event event) {
 - // 獲得body的內(nèi)容
 - String eventBody = new String(event.getBody(), Charsets.UTF_8);
 - // 獲取header的內(nèi)容
 - Map<String, String> headerMap = event.getHeaders();
 - final String[] bodyArr = eventBody.split("\\s+");
 - try {
 - String jsonStr = bodyArr[6];
 - if (Strings.isNullOrEmpty(jsonStr)) {
 - return null;
 - }
 - // 將 string 轉(zhuǎn)成 json 對(duì)象
 - JSONObject jsonObject = JSON.parseObject(jsonStr);
 - String timestampStr = jsonObject.getString("time");
 - //將timestamp 轉(zhuǎn)為時(shí)間日期類型(格式 :yyyyMMdd)
 - long timeStamp = Long.valueOf(timestampStr);
 - String date = formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timeStamp), ZoneId.systemDefault()));
 - headerMap.put("logtime", date);
 - event.setHeaders(headerMap);
 - } catch (Exception e) {
 - headerMap.put("logtime", "unknown");
 - event.setHeaders(headerMap);
 - }
 - return event;
 - }
 - @Override
 - public List<Event> intercept(List<Event> events) {
 - List<Event> out = new ArrayList<>();
 - for (Event event : events) {
 - Event outEvent = intercept(event);
 - if (outEvent != null) {
 - out.add(outEvent);
 - }
 - }
 - return out;
 - }
 - @Override
 - public void close() {
 - }
 - public static class Builder implements Interceptor.Builder {
 - @Override
 - public Interceptor build() {
 - return new CustomerInterceptor();
 - }
 - @Override
 - public void configure(Context context) {
 - }
 - }
 
啟動(dòng)
- flume-ng agent --conf /opt/hoult/servers/flume-1.9.0/conf --conf-file /opt/hoult/servers/conf/flume-test.conf -name a1 -Dflume.roog.logger=INFO,console
 - ## 測(cè)試
 - telnet linux121 9999
 
吳邪,小三爺,混跡于后臺(tái),大數(shù)據(jù),人工智能領(lǐng)域的小菜鳥。
















 
 
 















 
 
 
 