在本節(jié)中,我們將嘗試清理和解析日志數(shù)據(jù)集,以便真正從每個(gè)日志消息中提取包含有意義信息的結(jié)構(gòu)化屬性。
日志數(shù)據(jù)理解
如果你熟悉 Web 服務(wù)器日志,你將認(rèn)識(shí)到上面顯示的數(shù)據(jù)是通用的日志格式。
我們需要使用一些特定的技術(shù)來解析、匹配和提取日志數(shù)據(jù)中的這些屬性。
使用正則表達(dá)式進(jìn)行數(shù)據(jù)解析和提取
接下來,我們必須將半結(jié)構(gòu)化的日志數(shù)據(jù)解析為單獨(dú)的列。我們將使用專門的內(nèi)置函數(shù)regexp_extract()進(jìn)行解析。此函數(shù)將針對(duì)具有一個(gè)或多個(gè)捕獲組的正則表達(dá)式匹配列,并允許提取其中一個(gè)匹配的組。我們將對(duì)希望提取的每個(gè)字段使用一個(gè)正則表達(dá)式。
到目前為止,你一定已經(jīng)聽說或使用了大量正則表達(dá)式。如果你發(fā)現(xiàn)正則表達(dá)式令人困惑,并且希望了解更多關(guān)于正則表達(dá)式的信息,我們建議你訪問RegexOne 網(wǎng)站。你可能還會(huì)發(fā)現(xiàn),Goyvaerts 和 Levithan 編寫的《正則表達(dá)式手冊(cè)》是非常有用的參考資料。
讓我們看下我們使用的數(shù)據(jù)集中的日志總數(shù)。
print((base_df.count(), len(base_df.columns)))
#Output
(3461613, 1)
看起來我們總共有大約 346 萬條日志消息。一個(gè)不小的數(shù)字!讓我們提取并查看一些日志消息。
sample_logs=[item['value'] for item in base_df.take(15)]
sample_logs
提取主機(jī)名
讓我們嘗試編寫一些正則表達(dá)式來從日志中提取主機(jī)名。
host_pattern=r'(^\S+\.[\S+\.]+\S+)\s'
hosts=[re.search(host_pattern, item)(1)
if re.search(host_pattern, item)
else 'no match'
for item in sample_logs]
hosts
['199.72.81.55’,
** 'unicomp6.unicomp.net’,**
** '199.120.110.21’,**
** 'burger.letters’,**
…,
…,
** 'unicomp6.unicomp.net’,**
** 'd104.aa.net’,**
** 'd104.aa.net’]**
提取時(shí)間戳
現(xiàn)在讓我們嘗試使用正則表達(dá)式從日志中提取時(shí)間戳字段。
ts_pattern=r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'
timestamps=[re.search(ts_pattern, item)(1) for item in sample_logs]
timestamps
['01/Jul/1995:00:00:01 -0400’,
'01/Jul/1995:00:00:06 -0400’,
'01/Jul/1995:00:00:09 -0400’,
…,
…,
'01/Jul/1995:00:00:14 -0400’,
'01/Jul/1995:00:00:15 -0400’,
'01/Jul/1995:00:00:15 -0400’]
提取 HTTP 請(qǐng)求方法、URI 和協(xié)議
現(xiàn)在讓我們嘗試使用正則表達(dá)式從日志中提取 HTTP 請(qǐng)求方法、URI 和協(xié)議模式字段。
method_uri_protocol_pattern=r'"(\S+)\s(\S+)\s*(\S*)"'
method_uri_protocol=[re.search(method_uri_protocol_pattern, item)s()
if re.search(method_uri_protocol_pattern, item)
else 'no match'
for item in sample_logs]
method_uri_protocol
[('GET’, '/history/apollo/’, 'HTTP/1.0’),
** ('GET’, '/shuttle/countdown/’, 'HTTP/1.0’),**
…,
…,
** ('GET’, '/shuttle/countdown/count.gif’, 'HTTP/1.0’),**
** ('GET’, '/images/NASA-logosmall.gif’, 'HTTP/1.0’)]**
提取 HTTP 狀態(tài)碼
現(xiàn)在讓我們嘗試使用正則表達(dá)式從日志中提取 HTTP 狀態(tài)碼。
content_size_pattern=r'\s(\d+)$'
content_size=[re.search(content_size_pattern, item)(1) for item in sample_logs]
print(content_size)
['200’, '200’, '200’, '304’, …, '200’, '200’]
提取 HTTP 響應(yīng)內(nèi)容大小
現(xiàn)在讓我們嘗試使用正則表達(dá)式從日志中提取 HTTP 響應(yīng)內(nèi)容大小。
content_size_pattern=r'\s(\d+)$'
content_size=[re.search(content_size_pattern, item)(1) for item in sample_logs]
print(content_size)
['6245’, '3985’, '4085’, '0’, …, '1204’, '40310’, '786’]
把它們放在一起
現(xiàn)在,讓我們嘗試?yán)们懊鏄?gòu)建的所有正則表達(dá)式模式,并使用regexp_extract(…)方法構(gòu)建 DataFrame,所有日志屬性都整齊地提取到各自的列中。
from pyspark.sqlctions import regexp_extract
logs_df=base_df.select(regexp_extract('value', host_pattern, 1).alias('host'),
regexp_extract('value', ts_pattern, 1).alias('timestamp'),
regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))
logs_df(10, truncate=True)
print((logs_df.count(), len(logs_df.columns)))
查找缺失值
缺失值和空值是數(shù)據(jù)分析和機(jī)器學(xué)習(xí)的禍根。讓我們看看我們的數(shù)據(jù)解析和提取邏輯是如何工作的。首先,讓我們驗(yàn)證原始數(shù)據(jù)框中有沒有空行。
(base_df
.filter(base_df['value']
.isNull())
.count())
0
沒問題!現(xiàn)在,如果我們的數(shù)據(jù)解析和提取畢業(yè)證書工作正常,我們就不應(yīng)該有任何可能存在空值的行。讓我們來試試吧!
bad_rows_df=logs_df.filter(logs_df['host'].isNull()|
logs_df['timestamp'].isNull() |
logs_df['method'].isNull() |
logs_df['endpoint'].isNull() |
logs_df['status'].isNull() |
logs_df['content_size'].isNull()|
logs_df['protocol'].isNull())
bad_rows_df.count()
33905
哎喲!看起來我們的數(shù)據(jù)中有超過 33K 的缺失值!我們能搞定嗎?
請(qǐng)記住,這不是一個(gè)常規(guī)的 pandas DataFrame,你無法直接查詢并獲得哪些列為空。我們所謂的大數(shù)據(jù)集駐留在磁盤上,它可能存在于 Spark 集群中的多個(gè)節(jié)點(diǎn)上。那么我們?nèi)绾握页瞿男┝杏锌赡転榭漳?
查找 Null 值
我們通??梢允褂靡韵录夹g(shù)找出哪些列具有空值。(注意:這種方法是從 StackOverflow 上的一個(gè)絕妙的回答改造而來的。)
from pyspark.sqlctions import col
from pyspark.sqlctions import sum as spark_sum
def count_null(col_name):
return spark_sum(col(col_name).isNull().cast('integer')).alias(col_name)
# Build up a list of column expressions, one per column.
exprs=[count_null(col_name) for col_name in logs_df.columns]
# Run the aggregation. The *exprs converts the list of expressions into
# variable function arguments.
logs_df.agg(*exprs)()
看起來status列中有一個(gè)缺失值而其它的都在content_size列中。讓我們看看能不能找出問題所在!
處理 HTTP 狀態(tài)中的空值
狀態(tài)列解析使用的原始正則表達(dá)式是:
regexp_extract('value', r'\s(\d{3})\s', 1).cast('integer')
.alias( 'status')
是否有更多的數(shù)字使正則表達(dá)式出錯(cuò)?還是數(shù)據(jù)點(diǎn)本身的問題?讓我們?cè)囍页龃鸢浮?/p>
注意:在下面的表達(dá)式中,~表示“非”。
null_status_df=base_df.filter(~base_df['value'].rlike(r'\s(\d{3})\s'))
null_status_df.count()
1
讓我們看看這條糟糕的記錄是什么樣子?
null_status_df(truncate=False)
看起來像一條有很多信息丟失的記錄!讓我們通過日志數(shù)據(jù)解析管道來傳遞它。
bad_status_df=null_status_df.select(regexp_extract('value', host_pattern, 1).alias('host'),
regexp_extract('value', ts_pattern, 1).alias('timestamp'),
regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))
bad_status_df(truncate=False)
?。▓D片)
看起來這條記錄本身是一個(gè)不完整的記錄,沒有有用的信息,最好的選擇是刪除這條記錄,如下所示!
logs_df=logs_df[logs_df['status'].isNotNull()]
exprs=[count_null(col_name) for col_name in logs_df.columns]
logs_df.agg(*exprs)()
處理 HTTP content size 列中的空值
根據(jù)之前的正則表達(dá)式,content_size列的原始解析正則表達(dá)式為:
regexp_extract('value', r'\s(\d+)$', 1).cast('integer')
.alias('content_size')
原始數(shù)據(jù)集中是否有數(shù)據(jù)丟失?讓我們?cè)囍页龃鸢赴?我們首先嘗試找出基本 DataFrame 中可能缺少內(nèi)容大小的記錄。
null_content_size_df=base_df.filter(~base_df['value'].rlike(r'\s\d+$'))
null_content_size_df.count()
33905
這個(gè)數(shù)值似乎與處理后的 DataFrame 中缺失的內(nèi)容大小的數(shù)量相匹配。讓我們來看看我們的數(shù)據(jù)框中缺少內(nèi)容大小的前十條記錄。
null_content_size_df.take(10)
很明顯,糟糕的原始數(shù)據(jù)記錄對(duì)應(yīng)錯(cuò)誤響應(yīng),其中沒有發(fā)回任何內(nèi)容,服務(wù)器為content_size字段發(fā)出了一個(gè)“-”。
因?yàn)槲覀儾幌霃奈覀兊姆治鲋衼G棄這些行,所以我們把它們代入或填充為 0。
修復(fù) content_size 為 null 的行
最簡(jiǎn)單的解決方案是像前面討論的那樣,用 0 替換logs_df中的 null 值。Spark DataFrame API 提供了一組專門為處理 null 值而設(shè)計(jì)的函數(shù)和字段,其中包括:
fillna():用指定的非空值填充空值。
na:它返回一個(gè)DataFrameNaFunctions對(duì)象,其中包含許多用于在空列上進(jìn)行操作的函數(shù)。
有幾種方法可以調(diào)用這個(gè)函數(shù)。最簡(jiǎn)單的方法就是用已知值替換所有空列。但是,為了安全起見,最好傳遞一個(gè)包含(column_name, value)映射的 Python 字典。這就是我們要做的。下面是文檔中的一個(gè)示例:
>>> df4.na.fill({'age': 50, 'name': 'unknown'})()
+---+------+-------+
|age|height| name|
+---+------+-------+
| 10| 80| Alice|
| 5| null| Bob|
| 50| null| Tom|
| 50| null|unknown|
+---+------+-------+
現(xiàn)在我們使用這個(gè)函數(shù),用 0 填充content_size字段中所有缺失的值!
logs_df=logs_df.na.fill({'content_size': 0})
exprs=[count_null(col_name) for col_name in logs_df.columns]
logs_df.agg(*exprs)()
看,沒有缺失值了!
處理時(shí)間字段(時(shí)間戳)
現(xiàn)在我們有了一個(gè)干凈的、已解析的 DataFrame,我們必須將 timestamp 字段解析為一個(gè)實(shí)際的時(shí)間戳。通用的日志格式時(shí)間有點(diǎn)不標(biāo)準(zhǔn)。用戶定義函數(shù)(UDF)是解析它最直接的方法。
from pyspark.sqlctions import udf
month_map={
'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
'Aug':8, 'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12
}
def parse_clf_time(text):
""" Convert Common Log time format into a Python datetime object
Args:
text (str): date and time in Apache time format [dd/mmm/yyyy:hh:mm:ss (+/-)zzzz]
Returns:
a string suitable for passing to CAST('timestamp')
"""
# NOTE: We're ignoring the time zones here, might need to be handled depending on the problem you are solving
return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(
int(text[7:11]),
month_map[text[3:6]],
int(text[0:2]),
int(text[12:14]),
int(text[15:17]),
int(text[18:20])
)
現(xiàn)在,讓我們使用這個(gè)函數(shù)來解析 DataFrame 中的time列。
udf_parse_time=udf(parse_clf_time)
logs_df=(logs_df.select('*', udf_parse_time(logs_df['timestamp'])
.cast('timestamp')
.alias('time'))
.drop('timestamp')
logs_df(10, truncate=True)
一切看起來都很好!讓我們通過檢查 DataFrame 的模式來驗(yàn)證這一點(diǎn)。
logs_df.printSchema()
root
|-- host: string (nullable=true)
|-- method: string (nullable=true)
|-- endpoint: string (nullable=true)
|-- protocol: string (nullable=true)
|-- status: integer (nullable=true)
|-- content_size: integer (nullable=false)
|-- time: timestamp (nullable=true)
現(xiàn)在,讓我們緩存logs_df,因?yàn)槲覀儗⒃谙乱徊糠值臄?shù)據(jù)分析部分中大量地使用它!
logs_df.cache()
聯(lián)系客服