Replies: 1 comment
-
I can do |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
What is a problem?
hi, I'm trying to solve the problem of processing and converting logs from Pfsense firewalls to further transfer them to opensearch and visualization.
If you just accept the syslog and pass it on, then there are no problems
For example, data already sent to OpenSearch
{
"_index": "pfsense.local0.info-26.07.2024",
"_id": "qXCy7pABQZxe3Jn3pbaW",
"_version": 1,
"_score": null,
"_source": {
"host": "KRAKEN-CO-FW2",
"ident": "filterlog",
"pid": "18399",
"msgid": "-",
"extradata": "-",
"message": "8,,,1000000103,lagg1.1236,match,block,in,4,0x0,,248,35521,0,none,6,tcp,44,1.124.49.134,1.104.57.203,59278,16314,0,S,2538179760,,1025,,mss",
"@timestamp": "2024-07-26T13:59:59.946170000+03:00"
},
"fields": {
"@timestamp": [
"2024-07-26T10:59:59.946Z"
],
"event_severity": [
"Informational"
]
},
"sort": [
1721991599946
]
}
But after reading the description of the Message fields, it became clear that each field carries certain values and I wanted to parse and process them and pass them to OpenSearch
After analyzing the messages, it became clear that the number of fields varies depending on the tcp, udp, icmp, igmp protocols:
4,,,1000000003,lagg0.751,match,block,in,6,0x00,0x30900,255,UDP,17,90,fe80::1c07:a544:8db7:165,ff02::fb,5353,5353,90
542,,,1708431610,lagg1.2402,match,pass,in,4,0x0,,63,24607,0,none,17,udp,79,10.35.0.19,10.33.6.51,21869,53,59
8,,,1000000103,lagg1.2414,match,block,in,4,0x0,,63,64162,0,DF,6,tcp,60,10.35.0.106,10.33.7.150,52414,24224,0,S,1313279875,,64240,,mss;sackOK;TS;nop;wscalб
354,,,1708933354,lagg0.601,match,pass,in,4,0x0,,53,61052,0,DF,6,tcp,60,1.17.11.180,10.35.0.6,50022,21,0,S,1868235832,,64240,,mss;sackOK;TS;nop;wscale
8,,,1000000103,lagg0.751,match,block,in,4,0x0,,64,43326,0,DF,6,tcp,83,10.26.10.160,1.233.163.113,40096,443,31,FPA,1773137574:1773137605,4020605095,167,,nop;nop;TS
8,,,1000000103,lagg0.751,match,block,in,4,0x0,,1,6429,0,none,2,igmp,32,10.26.11.95,224.0.0.251,datalength=8
4,,,1000000003,lagg0.751,match,block,in,6,0x00,0x00000,255,ICMPv6,58,32,fe80::1c9a:1cdf:8f8a:e5c7,ff02::1:ff7c:e864
58,,,1000006720,lagg1,match,block,in,4,0x0,,255,0,0,none,1,icmp,110,172.20.19.1,172.20.19.4,request,0,090
I chose a decision on rewrite_tag_filter. Verify the incoming syslog and change the tag depending on the conditions
<match pfsense.**>
@type rewrite_tag_filter
@log_level trace
capitalize_regex_backreference yes
key message
pattern /tcp/
tag tcp.${tag}
key message
pattern /udp/
tag udp.${tag}
key message
pattern /icmp/
tag icmp.${tag}
key message
pattern /igmp/
tag igmp.${tag}
key message
pattern /.+/
tag other.${tag}
Next, using this tag, make a filter parser
<filter tcp.pfsense.**>
@type parser
key_name message
reserve_data true
@type regexp
expression /^(?\d+),(?[^,]),(?[^,]),(?\d+),(?[^,]),(?[^,]),(?[^,]),(?[^,]),(?\d+),(?[^,]),(?[^,]),(?\d*),(?\d*),(?\d*),(?[^,]),(?\d),(?[^,]),(?\d),(?[^,]),(?[^,]),(?\d*),(?\d*),(?\d*),(?[^,]),(?\d),(?[^,]),(?\d),(?[^,]),(?[^,])$/
Then transform the data into the fields we need
<filter tcp.pfsense.**>
@type record_transformer
enable_ruby true
rulenumber ${record["rulenumber"]}
tracker ${record["tracker"]}
realinterface ${record["realinterface"]}
reason ${record["reason"]}
action ${record["action"]}
direction ${record["direction"]}
ipversion ${record["ipversion"]}
tos ${record["tos"]}
ecn ${record["ecn"]}
ttl ${record["ttl"]}
id ${record["id"]}
offset ${record["offset"]}
flags ${record["flags"]}
protocolid ${record["protocolid"]}
protocoltext ${record["protocoltext"]}
length ${record["length"]}
sourceaddress ${record["sourceaddress"]}
destinationaddress ${record["destinationaddress"]}
sourceport ${record["sourceport"]}
destinationport ${record["destinationport"]}
datalength ${record["datalength"]}
tcpflags ${record["tcpflags"]}
sequencenumber ${record["sequencenumber"]}
acknumber ${record["acknumber"]}
tcpwindow ${record["tcpwindow"]}
urg ${record["urg"]}
tcpoptions ${record["tcpoptions"]}
@Label @output
but here we get one problem, that when this data is sent to OpenSearch, it will come with the tag that we rewrote earlier. and now a not very flexible solution on the part of OpenSearch is that we can't aggregate the entire flow of events under one index pattern
tried to use label to properly route events in Fluent and tried adding a tag change so that everyone flies to open with one pfsense tag.**
<match tcp.pfsense.**>
@type rewrite_tag_filter
@log_level trace
remove_tag_prefix tcp
key message
pattern /.+/
tag ${tag}
But so it was not possible to achieve a solution in which it was possible to decompose the syslog by parser/transform and then send it to the OpenSearch in one index pattern
Describe the configuration of Fluentd
<match pfsense.**>
@type rewrite_tag_filter
@log_level trace
capitalize_regex_backreference yes
key message
pattern /tcp/
tag tcp.${tag}
key message
pattern /udp/
tag udp.${tag}
key message
pattern /icmp/
tag icmp.${tag}
key message
pattern /igmp/
tag igmp.${tag}
key message
pattern /.+/
tag other.${tag}
Next, using this tag, make a filter parser
<filter tcp.pfsense.**>
@type parser
key_name message
reserve_data true
@type regexp
expression /^(?\d+),(?[^,]),(?[^,]),(?\d+),(?[^,]),(?[^,]),(?[^,]),(?[^,]),(?\d+),(?[^,]),(?[^,]),(?\d*),(?\d*),(?\d*),(?[^,]),(?\d),(?[^,]),(?\d),(?[^,]),(?[^,]),(?\d*),(?\d*),(?\d*),(?[^,]),(?\d),(?[^,]),(?\d),(?[^,]),(?[^,])$/
<filter tcp.pfsense.**>
@type record_transformer
enable_ruby true
rulenumber ${record["rulenumber"]}
tracker ${record["tracker"]}
realinterface ${record["realinterface"]}
reason ${record["reason"]}
action ${record["action"]}
direction ${record["direction"]}
ipversion ${record["ipversion"]}
tos ${record["tos"]}
ecn ${record["ecn"]}
ttl ${record["ttl"]}
id ${record["id"]}
offset ${record["offset"]}
flags ${record["flags"]}
protocolid ${record["protocolid"]}
protocoltext ${record["protocoltext"]}
length ${record["length"]}
sourceaddress ${record["sourceaddress"]}
destinationaddress ${record["destinationaddress"]}
sourceport ${record["sourceport"]}
destinationport ${record["destinationport"]}
datalength ${record["datalength"]}
tcpflags ${record["tcpflags"]}
sequencenumber ${record["sequencenumber"]}
acknumber ${record["acknumber"]}
tcpwindow ${record["tcpwindow"]}
urg ${record["urg"]}
tcpoptions ${record["tcpoptions"]}
@Label @output
Describe the logs of Fluentd
2024-07-26 16:25:12.618576490 +0300 tcp.pfsense.local0.info: {"host":"KRAKEN-CO-FW2","ident":"filterlog","pid":"38176","msgid":"-","extradata":"-","message":"495,,,1707214966,lagg0.751,match,pass,in,4,0x0,,64,0,0,DF,6,tcp,64,10.26.8.67,54.37.197.149,57236,36347,0,S,4054443658,,65535,,mss;nop;wscale;nop;nop;TS;sackOK;eol","rulenumber":"495","subrulenumber":"","anchor":"","tracker":"1707214966","realinterface":"lagg0.751","reason":"match","action":"pass","direction":"in","ipversion":"4","tos":"0x0","ecn":"","ttl":"64","id":"0","offset":"0","flags":"DF","protocolid":"6","protocoltext":"tcp","length":"64","sourceaddress":"10.26.8.67","destinationaddress":"54.37.197.149","sourceport":"57236","destinationport":"36347","datalength":"0","tcpflags":"S","sequencenumber":"4054443658","acknumber":"","tcpwindow":"65535","urg":"","tcpoptions":"mss;nop;wscale;nop;nop;TS;sackOK;eol"}
Environment
Beta Was this translation helpful? Give feedback.
All reactions