九色国产,午夜在线视频,新黄色网址,九九色综合,天天做夜夜做久久做狠狠,天天躁夜夜躁狠狠躁2021a,久久不卡一区二区三区

打開APP
userphoto
未登錄

開通VIP,暢享免費(fèi)電子書等14項(xiàng)超值服

開通VIP
Apache Spark 實(shí)現(xiàn)可擴(kuò)展日志分析,挖掘系統(tǒng)最大潛力(2)

  在本節(jié)中,我們將嘗試清理和解析日志數(shù)據(jù)集,以便真正從每個(gè)日志消息中提取包含有意義信息的結(jié)構(gòu)化屬性。

  日志數(shù)據(jù)理解

  如果你熟悉 Web 服務(wù)器日志,你將認(rèn)識(shí)到上面顯示的數(shù)據(jù)是通用的日志格式。

  

  我們需要使用一些特定的技術(shù)來解析、匹配和提取日志數(shù)據(jù)中的這些屬性。

  使用正則表達(dá)式進(jìn)行數(shù)據(jù)解析和提取

  接下來,我們必須將半結(jié)構(gòu)化的日志數(shù)據(jù)解析為單獨(dú)的列。我們將使用專門的內(nèi)置函數(shù)regexp_extract()進(jìn)行解析。此函數(shù)將針對(duì)具有一個(gè)或多個(gè)捕獲組的正則表達(dá)式匹配列,并允許提取其中一個(gè)匹配的組。我們將對(duì)希望提取的每個(gè)字段使用一個(gè)正則表達(dá)式。

  到目前為止,你一定已經(jīng)聽說或使用了大量正則表達(dá)式。如果你發(fā)現(xiàn)正則表達(dá)式令人困惑,并且希望了解更多關(guān)于正則表達(dá)式的信息,我們建議你訪問RegexOne 網(wǎng)站。你可能還會(huì)發(fā)現(xiàn),Goyvaerts 和 Levithan 編寫的《正則表達(dá)式手冊(cè)》是非常有用的參考資料。

  讓我們看下我們使用的數(shù)據(jù)集中的日志總數(shù)。

  print((base_df.count(), len(base_df.columns)))

  #Output

  (3461613, 1)

  看起來我們總共有大約 346 萬條日志消息。一個(gè)不小的數(shù)字!讓我們提取并查看一些日志消息。

  sample_logs=[item['value'] for item in base_df.take(15)]

  sample_logs

  

  提取主機(jī)名

  讓我們嘗試編寫一些正則表達(dá)式來從日志中提取主機(jī)名。

  host_pattern=r'(^\S+\.[\S+\.]+\S+)\s'

  hosts=[re.search(host_pattern, item)(1)

  if re.search(host_pattern, item)

  else 'no match'

  for item in sample_logs]

  hosts

  ['199.72.81.55’,

  ** 'unicomp6.unicomp.net’,**

  ** '199.120.110.21’,**

  ** 'burger.letters’,**

  …,

  …,

  ** 'unicomp6.unicomp.net’,**

  ** 'd104.aa.net’,**

  ** 'd104.aa.net’]**

  提取時(shí)間戳

  現(xiàn)在讓我們嘗試使用正則表達(dá)式從日志中提取時(shí)間戳字段。

  ts_pattern=r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'

  timestamps=[re.search(ts_pattern, item)(1) for item in sample_logs]

  timestamps

  ['01/Jul/1995:00:00:01 -0400’,

  '01/Jul/1995:00:00:06 -0400’,

  '01/Jul/1995:00:00:09 -0400’,

  …,

  …,

  '01/Jul/1995:00:00:14 -0400’,

  '01/Jul/1995:00:00:15 -0400’,

  '01/Jul/1995:00:00:15 -0400’]

  提取 HTTP 請(qǐng)求方法、URI 和協(xié)議

  現(xiàn)在讓我們嘗試使用正則表達(dá)式從日志中提取 HTTP 請(qǐng)求方法、URI 和協(xié)議模式字段。

  method_uri_protocol_pattern=r'"(\S+)\s(\S+)\s*(\S*)"'

  method_uri_protocol=[re.search(method_uri_protocol_pattern, item)s()

  if re.search(method_uri_protocol_pattern, item)

  else 'no match'

  for item in sample_logs]

  method_uri_protocol

  [('GET’, '/history/apollo/’, 'HTTP/1.0’),

  ** ('GET’, '/shuttle/countdown/’, 'HTTP/1.0’),**

  …,

  …,

  ** ('GET’, '/shuttle/countdown/count.gif’, 'HTTP/1.0’),**

  ** ('GET’, '/images/NASA-logosmall.gif’, 'HTTP/1.0’)]**

  提取 HTTP 狀態(tài)碼

  現(xiàn)在讓我們嘗試使用正則表達(dá)式從日志中提取 HTTP 狀態(tài)碼。

  content_size_pattern=r'\s(\d+)$'

  content_size=[re.search(content_size_pattern, item)(1) for item in sample_logs]

  print(content_size)

  ['200’, '200’, '200’, '304’, …, '200’, '200’]

  提取 HTTP 響應(yīng)內(nèi)容大小

  現(xiàn)在讓我們嘗試使用正則表達(dá)式從日志中提取 HTTP 響應(yīng)內(nèi)容大小。

  content_size_pattern=r'\s(\d+)$'

  content_size=[re.search(content_size_pattern, item)(1) for item in sample_logs]

  print(content_size)

  ['6245’, '3985’, '4085’, '0’, …, '1204’, '40310’, '786’]

  把它們放在一起

  現(xiàn)在,讓我們嘗試?yán)们懊鏄?gòu)建的所有正則表達(dá)式模式,并使用regexp_extract(…)方法構(gòu)建 DataFrame,所有日志屬性都整齊地提取到各自的列中。

  from pyspark.sqlctions import regexp_extract

  logs_df=base_df.select(regexp_extract('value', host_pattern, 1).alias('host'),

  regexp_extract('value', ts_pattern, 1).alias('timestamp'),

  regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),

  regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),

  regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),

  regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),

  regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))

  logs_df(10, truncate=True)

  print((logs_df.count(), len(logs_df.columns)))

  

  查找缺失值

  缺失值和空值是數(shù)據(jù)分析和機(jī)器學(xué)習(xí)的禍根。讓我們看看我們的數(shù)據(jù)解析和提取邏輯是如何工作的。首先,讓我們驗(yàn)證原始數(shù)據(jù)框中有沒有空行。

  (base_df

  .filter(base_df['value']

  .isNull())

  .count())

  0

  沒問題!現(xiàn)在,如果我們的數(shù)據(jù)解析和提取畢業(yè)證書工作正常,我們就不應(yīng)該有任何可能存在空值的行。讓我們來試試吧!

  bad_rows_df=logs_df.filter(logs_df['host'].isNull()|

  logs_df['timestamp'].isNull() |

  logs_df['method'].isNull() |

  logs_df['endpoint'].isNull() |

  logs_df['status'].isNull() |

  logs_df['content_size'].isNull()|

  logs_df['protocol'].isNull())

  bad_rows_df.count()

  33905

  哎喲!看起來我們的數(shù)據(jù)中有超過 33K 的缺失值!我們能搞定嗎?

  請(qǐng)記住,這不是一個(gè)常規(guī)的 pandas DataFrame,你無法直接查詢并獲得哪些列為空。我們所謂的大數(shù)據(jù)集駐留在磁盤上,它可能存在于 Spark 集群中的多個(gè)節(jié)點(diǎn)上。那么我們?nèi)绾握页瞿男┝杏锌赡転榭漳?

  查找 Null 值

  我們通??梢允褂靡韵录夹g(shù)找出哪些列具有空值。(注意:這種方法是從 StackOverflow 上的一個(gè)絕妙的回答改造而來的。)

  from pyspark.sqlctions import col

  from pyspark.sqlctions import sum as spark_sum

  def count_null(col_name):

  return spark_sum(col(col_name).isNull().cast('integer')).alias(col_name)

  # Build up a list of column expressions, one per column.

  exprs=[count_null(col_name) for col_name in logs_df.columns]

  # Run the aggregation. The *exprs converts the list of expressions into

  # variable function arguments.

  logs_df.agg(*exprs)()

  

  看起來status列中有一個(gè)缺失值而其它的都在content_size列中。讓我們看看能不能找出問題所在!

  處理 HTTP 狀態(tài)中的空值

  狀態(tài)列解析使用的原始正則表達(dá)式是:

  regexp_extract('value', r'\s(\d{3})\s', 1).cast('integer')

  .alias( 'status')

  是否有更多的數(shù)字使正則表達(dá)式出錯(cuò)?還是數(shù)據(jù)點(diǎn)本身的問題?讓我們?cè)囍页龃鸢浮?/p>

  注意:在下面的表達(dá)式中,~表示“非”。

  null_status_df=base_df.filter(~base_df['value'].rlike(r'\s(\d{3})\s'))

  null_status_df.count()

  1

  讓我們看看這條糟糕的記錄是什么樣子?

  null_status_df(truncate=False)

  

  看起來像一條有很多信息丟失的記錄!讓我們通過日志數(shù)據(jù)解析管道來傳遞它。

  bad_status_df=null_status_df.select(regexp_extract('value', host_pattern, 1).alias('host'),

  regexp_extract('value', ts_pattern, 1).alias('timestamp'),

  regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),

  regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),

  regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),

  regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),

  regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))

  bad_status_df(truncate=False)

 ?。▓D片)

  看起來這條記錄本身是一個(gè)不完整的記錄,沒有有用的信息,最好的選擇是刪除這條記錄,如下所示!

  logs_df=logs_df[logs_df['status'].isNotNull()]

  exprs=[count_null(col_name) for col_name in logs_df.columns]

  logs_df.agg(*exprs)()

  

  處理 HTTP content size 列中的空值

  根據(jù)之前的正則表達(dá)式,content_size列的原始解析正則表達(dá)式為:

  regexp_extract('value', r'\s(\d+)$', 1).cast('integer')

  .alias('content_size')

  原始數(shù)據(jù)集中是否有數(shù)據(jù)丟失?讓我們?cè)囍页龃鸢赴?我們首先嘗試找出基本 DataFrame 中可能缺少內(nèi)容大小的記錄。

  null_content_size_df=base_df.filter(~base_df['value'].rlike(r'\s\d+$'))

  null_content_size_df.count()

  33905

  這個(gè)數(shù)值似乎與處理后的 DataFrame 中缺失的內(nèi)容大小的數(shù)量相匹配。讓我們來看看我們的數(shù)據(jù)框中缺少內(nèi)容大小的前十條記錄。

  null_content_size_df.take(10)

  很明顯,糟糕的原始數(shù)據(jù)記錄對(duì)應(yīng)錯(cuò)誤響應(yīng),其中沒有發(fā)回任何內(nèi)容,服務(wù)器為content_size字段發(fā)出了一個(gè)“-”。

  因?yàn)槲覀儾幌霃奈覀兊姆治鲋衼G棄這些行,所以我們把它們代入或填充為 0。

  修復(fù) content_size 為 null 的行

  最簡(jiǎn)單的解決方案是像前面討論的那樣,用 0 替換logs_df中的 null 值。Spark DataFrame API 提供了一組專門為處理 null 值而設(shè)計(jì)的函數(shù)和字段,其中包括:

  fillna():用指定的非空值填充空值。

  na:它返回一個(gè)DataFrameNaFunctions對(duì)象,其中包含許多用于在空列上進(jìn)行操作的函數(shù)。

  有幾種方法可以調(diào)用這個(gè)函數(shù)。最簡(jiǎn)單的方法就是用已知值替換所有空列。但是,為了安全起見,最好傳遞一個(gè)包含(column_name, value)映射的 Python 字典。這就是我們要做的。下面是文檔中的一個(gè)示例:

  >>> df4.na.fill({'age': 50, 'name': 'unknown'})()

  +---+------+-------+

  |age|height| name|

  +---+------+-------+

  | 10| 80| Alice|

  | 5| null| Bob|

  | 50| null| Tom|

  | 50| null|unknown|

  +---+------+-------+

  現(xiàn)在我們使用這個(gè)函數(shù),用 0 填充content_size字段中所有缺失的值!

  logs_df=logs_df.na.fill({'content_size': 0})

  exprs=[count_null(col_name) for col_name in logs_df.columns]

  logs_df.agg(*exprs)()

  

  看,沒有缺失值了!

  處理時(shí)間字段(時(shí)間戳)

  現(xiàn)在我們有了一個(gè)干凈的、已解析的 DataFrame,我們必須將 timestamp 字段解析為一個(gè)實(shí)際的時(shí)間戳。通用的日志格式時(shí)間有點(diǎn)不標(biāo)準(zhǔn)。用戶定義函數(shù)(UDF)是解析它最直接的方法。

  from pyspark.sqlctions import udf

  month_map={

  'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,

  'Aug':8, 'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12

  }

  def parse_clf_time(text):

  """ Convert Common Log time format into a Python datetime object

  Args:

  text (str): date and time in Apache time format [dd/mmm/yyyy:hh:mm:ss (+/-)zzzz]

  Returns:

  a string suitable for passing to CAST('timestamp')

  """

  # NOTE: We're ignoring the time zones here, might need to be handled depending on the problem you are solving

  return "{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}".format(

  int(text[7:11]),

  month_map[text[3:6]],

  int(text[0:2]),

  int(text[12:14]),

  int(text[15:17]),

  int(text[18:20])

  )

  現(xiàn)在,讓我們使用這個(gè)函數(shù)來解析 DataFrame 中的time列。

  udf_parse_time=udf(parse_clf_time)

  logs_df=(logs_df.select('*', udf_parse_time(logs_df['timestamp'])

  .cast('timestamp')

  .alias('time'))

  .drop('timestamp')

  logs_df(10, truncate=True)

  

  一切看起來都很好!讓我們通過檢查 DataFrame 的模式來驗(yàn)證這一點(diǎn)。

  logs_df.printSchema()

  root

  |-- host: string (nullable=true)

  |-- method: string (nullable=true)

  |-- endpoint: string (nullable=true)

  |-- protocol: string (nullable=true)

  |-- status: integer (nullable=true)

  |-- content_size: integer (nullable=false)

  |-- time: timestamp (nullable=true)

  現(xiàn)在,讓我們緩存logs_df,因?yàn)槲覀儗⒃谙乱徊糠值臄?shù)據(jù)分析部分中大量地使用它!

  logs_df.cache()

本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊舉報(bào)。
打開APP,閱讀全文并永久保存 查看更多類似文章
猜你喜歡
類似文章
Apache Spark實(shí)現(xiàn)可擴(kuò)展日志分析,挖掘系統(tǒng)最大潛力
Spark應(yīng)用——網(wǎng)絡(luò)服務(wù)器日志解析
OGG
用JavaScript(js)對(duì)時(shí)間格式化
【R語言 字符串處理】stringr 包的強(qiáng)大之處
CH3.3創(chuàng)建和使用數(shù)據(jù)庫(續(xù))
更多類似文章 >>
生活服務(wù)
熱點(diǎn)新聞
分享 收藏 導(dǎo)長(zhǎng)圖 關(guān)注 下載文章
綁定賬號(hào)成功
后續(xù)可登錄賬號(hào)暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服