介绍
由于众所周知的原因,Github访问不稳定。官网虽然介绍,但是案例连接无效,因此保存于此,方便参考学习。
配置
官网样例(GitHub)
drain3.ini
[SNAPSHOT]
snapshot_interval_minutes = 10
compress_state = True
[MASKING]
masking = [
{"regex_pattern":"((?<=[^A-Za-z0-9])|^)(([0-9a-f]{2,}:){3,}([0-9a-f]{2,}))((?=[^A-Za-z0-9])|$)", "mask_with": "ID"},
{"regex_pattern":"((?<=[^A-Za-z0-9])|^)(\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3})((?=[^A-Za-z0-9])|$)", "mask_with": "IP"},
{"regex_pattern":"((?<=[^A-Za-z0-9])|^)([0-9a-f]{6,} ?){3,}((?=[^A-Za-z0-9])|$)", "mask_with": "SEQ"},
{"regex_pattern":"((?<=[^A-Za-z0-9])|^)([0-9A-F]{4} ?){4,}((?=[^A-Za-z0-9])|$)", "mask_with": "SEQ"},
{"regex_pattern":"((?<=[^A-Za-z0-9])|^)(0x[a-f0-9A-F]+)((?=[^A-Za-z0-9])|$)", "mask_with": "HEX"},
{"regex_pattern":"((?<=[^A-Za-z0-9])|^)([\\-\\+]?\\d+)((?=[^A-Za-z0-9])|$)", "mask_with": "NUM"},
{"regex_pattern":"(?<=executed cmd )(\".+?\")", "mask_with": "CMD"}
]
mask_prefix = <:
mask_suffix = :>
[DRAIN]
# engine is Optional parameter. Engine will be "Drain" if the engine argument is not specified.
# engine has two options: 'Drain' and 'JaccardDrain'.
# engine = Drain
sim_th = 0.4
depth = 4
max_children = 100
max_clusters = 1024
extra_delimiters = ["_"]
[PROFILING]
enabled = True
report_sec = 30
案例(GitHub)
简单案例
from drain3.drain import Drain, LogCluster
model = Drain()
entries = str.splitlines(
"""
Dec 10 07:07:38 LabSZ sshd[24206]: input_userauth_request: invalid user test9 [preauth]
Dec 10 07:08:28 LabSZ sshd[24208]: input_userauth_request: invalid user webmaster [preauth]
Dec 10 09:12:32 LabSZ sshd[24490]: Failed password for invalid user ftpuser from 0.0.0.0 port 62891 ssh2
Dec 10 09:12:35 LabSZ sshd[24492]: Failed password for invalid user pi from 0.0.0.0 port 49289 ssh2
Dec 10 09:12:44 LabSZ sshd[24501]: Failed password for invalid user ftpuser from 0.0.0.0 port 60836 ssh2
Dec 10 07:28:03 LabSZ sshd[24245]: input_userauth_request: invalid user pgadmin [preauth]
"""
)
for entry in entries:
cluster, change_type = model.add_log_message(entry)
print(cluster.get_template())
输出结果
Dec 10 07:07:38 LabSZ sshd[24206]: input_userauth_request: invalid user test9 [preauth]
Dec 10 <*> LabSZ <*> input_userauth_request: invalid user <*> [preauth]
Dec 10 09:12:32 LabSZ sshd[24490]: Failed password for invalid user ftpuser from 0.0.0.0 port 62891 ssh2
Dec 10 <*> LabSZ <*> Failed password for invalid user <*> from 0.0.0.0 port <*> ssh2
Dec 10 <*> LabSZ <*> Failed password for invalid user <*> from 0.0.0.0 port <*> ssh2
Dec 10 <*> LabSZ <*> input_userauth_request: invalid user <*> [preauth]
官网案例1
drain_stdin_demo.py
import json
import logging
import sys
from os.path import dirname
from drain3 import TemplateMiner
from drain3.template_miner_config import TemplateMinerConfig
# persistence_type = "NONE"
# persistence_type = "REDIS"
# persistence_type = "KAFKA"
persistence_type = "FILE"
logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(message)s')
if persistence_type == "KAFKA":
from drain3.kafka_persistence import KafkaPersistence
persistence = KafkaPersistence("drain3_state", bootstrap_servers="localhost:9092")
elif persistence_type == "FILE":
from drain3.file_persistence import FilePersistence
persistence = FilePersistence("drain3_state.bin")
elif persistence_type == "REDIS":
from drain3.redis_persistence import RedisPersistence
persistence = RedisPersistence(redis_host='',
redis_port=25061,
redis_db=0,
redis_pass='',
is_ssl=True,
redis_key="drain3_state_key")
else:
persistence = None
config = TemplateMinerConfig()
config.load(f"{dirname(__file__)}/drain3.ini")
config.profiling_enabled = False
template_miner = TemplateMiner(persistence, config)
print(f"Drain3 started with '{persistence_type}' persistence")
print(f"{len(config.masking_instructions)} masking instructions are in use")
print(f"Starting training mode. Reading from std-in ('q' to finish)")
while True:
log_line = input("> ")
if log_line == 'q':
break
result = template_miner.add_log_message(log_line)
result_json = json.dumps(result)
print(result_json)
template = result["template_mined"]
params = template_miner.extract_parameters(template, log_line)
print(f"Parameters: {str(params)}")
print("Training done. Mined clusters:")
for cluster in template_miner.drain.clusters:
print(cluster)
print(f"Starting inference mode, matching to pre-trained clusters. Input log lines or 'q' to finish")
while True:
log_line = input("> ")
if log_line == 'q':
break
cluster = template_miner.match(log_line)
if cluster is None:
print(f"No match found")
else:
template = cluster.get_template()
print(f"Matched template #{cluster.cluster_id}: {template}")
print(f"Parameters: {template_miner.get_parameter_list(template, log_line)}")
官网案例2
drain_bigfile_demo.py
import json
import logging
import os
import subprocess
import sys
import time
from os.path import dirname
from drain3 import TemplateMiner
from drain3.template_miner_config import TemplateMinerConfig
logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format='%(message)s')
in_gz_file = "SSH.tar.gz"
in_log_file = "SSH.log"
if not os.path.isfile(in_log_file):
logger.info(f"Downloading file {in_gz_file}")
p = subprocess.Popen(f"curl https://zenodo.org/record/3227177/files/{in_gz_file} --output {in_gz_file}", shell=True)
p.wait()
logger.info(f"Extracting file {in_gz_file}")
p = subprocess.Popen(f"tar -xvzf {in_gz_file}", shell=True)
p.wait()
config = TemplateMinerConfig()
config.load(f"{dirname(__file__)}/drain3.ini")
config.profiling_enabled = True
template_miner = TemplateMiner(config=config)
line_count = 0
with open(in_log_file) as f:
lines = f.readlines()
start_time = time.time()
batch_start_time = start_time
batch_size = 10000
for line in lines:
line = line.rstrip()
line = line.partition(": ")[2]
result = template_miner.add_log_message(line)
line_count += 1
if line_count % batch_size == 0:
time_took = time.time() - batch_start_time
rate = batch_size / time_took
logger.info(f"Processing line: {line_count}, rate {rate:.1f} lines/sec, "
f"{len(template_miner.drain.clusters)} clusters so far.")
batch_start_time = time.time()
if result["change_type"] != "none":
result_json = json.dumps(result)
logger.info(f"Input ({line_count}): {line}")
logger.info(f"Result: {result_json}")
time_took = time.time() - start_time
rate = line_count / time_took
logger.info(f"--- Done processing file in {time_took:.2f} sec. Total of {line_count} lines, rate {rate:.1f} lines/sec, "
f"{len(template_miner.drain.clusters)} clusters")
sorted_clusters = sorted(template_miner.drain.clusters, key=lambda it: it.size, reverse=True)
for cluster in sorted_clusters:
logger.info(cluster)
print("Prefix Tree:")
template_miner.drain.print_tree()
template_miner.profiler.report(0)