Splunk Search

How to use field values with greater than and less than to do KV lookups?

md
Explorer

I have a KV store based lookup for Port Address Translation. 

Given the first 3 octets of a public facing IP and a port, I need to lookup the first 3 octets of the private address from this lookup.

The lookup contains the first 3 octets of the public IP, the first 3 octets of the private IP, the maximum port for that private IP and the minimum port for that private subnet range. 

Starting with a public_address of 123.45.67.8, port 1042 something like this works:

| inputlookup PAT_translation_table where public_address="123.45.67" lower_port<="1042" upper_port>="1042"

It returns the field private_address with a value like 10.1.2 and then I append on the .8 to get the internal IP.

I need to be able to do this with multiple results from other searches, however. Something like this:

<initial search results that include src_ip and src_port>
| rex field=src_ip "(?<first3octets>\d{1,3}\.\d{1,3}\.\d{1,3})(?<lastoctet>\.\d{1,3})
| inputlookup PAT_translation_table append=true where 'public_address'=first3octets  'lower_port'<=src_port  'upper_port'>=src_port

In this example, inputlookup returns nothing. If I just use the lookup command, I can't use greater than or less than so it returns all the values as an mvfield for private_address, an mvfield for upper_port, and a separate mvfield for lower_port. How would I query that?!

Do any of you have any suggestions how I can do this?


 

Labels (1)
Tags (1)
0 Karma
1 Solution

md
Explorer

We ended up using having to use lookup instead of inputlookup.

This is what we came up with:

base search
| call_to_macro(ip_field, port_field)

The macro adds a field named translated_IP to the event with either the PAT translated IP or the original IP if it's not a PAT IP. For the macro:

eval translated_IP=$ip_field$, port=$port_field$
| rex field=translated_IP "(?<first3octets>\d{1,3}\.\d{1,3}\.\d{1,3})(?<lastoctet>\.\d{1,3})"
```This lookup returns all port ranges for that group of the first 3 octets of the address as 3 separate multivalue fields.```
| lookup PAT_Address_Translation Public_Address AS first3octets OUTPUT Private_Address, Lower_Port, Upper_Port
```mvzip ties together the values of 2 multivalue fields in the order they appear. We have to nest them since we have 3 multivalue fields.```
| eval port_range=mvzip(mvzip(Private_Address,Lower_Port),Upper_Port)
```In cases where the input IP is not in the PAT IP range, we need to make sure port_range is not null or mvexpand errors out.```
| eval port_range=coalesce(port_range, "")

```mvexpand turns each possible value into it's own row.```

| mvexpand port_range

```we then separate the fields and use a where command to eliminate the values that the port_field isn't within the range of.```
| rex field=port_range "(?<Private_Address>\d{1,3}\.\d{1,3}\.\d{1,3}),(?<Lower_Port>\d+),(?<Upper_Port>\d+)"
| where (Lower_Port<=port AND Upper_Port>=port) OR port_range=""
| eval internal_IP=Private_Address.lastoctet
```Finally, we make sure translated_IP is either the translated result or the original IP if it's not in the PAT table```
| eval translated_IP=coalesce(internal_IP,translated_IP)
```Clean up of added fields```
| fields - Lower_Port,Upper_Port,Private_Address,first3octets,lastoctet,internal_IP,port_range

View solution in original post

0 Karma

md
Explorer

We ended up using having to use lookup instead of inputlookup.

This is what we came up with:

base search
| call_to_macro(ip_field, port_field)

The macro adds a field named translated_IP to the event with either the PAT translated IP or the original IP if it's not a PAT IP. For the macro:

eval translated_IP=$ip_field$, port=$port_field$
| rex field=translated_IP "(?<first3octets>\d{1,3}\.\d{1,3}\.\d{1,3})(?<lastoctet>\.\d{1,3})"
```This lookup returns all port ranges for that group of the first 3 octets of the address as 3 separate multivalue fields.```
| lookup PAT_Address_Translation Public_Address AS first3octets OUTPUT Private_Address, Lower_Port, Upper_Port
```mvzip ties together the values of 2 multivalue fields in the order they appear. We have to nest them since we have 3 multivalue fields.```
| eval port_range=mvzip(mvzip(Private_Address,Lower_Port),Upper_Port)
```In cases where the input IP is not in the PAT IP range, we need to make sure port_range is not null or mvexpand errors out.```
| eval port_range=coalesce(port_range, "")

```mvexpand turns each possible value into it's own row.```

| mvexpand port_range

```we then separate the fields and use a where command to eliminate the values that the port_field isn't within the range of.```
| rex field=port_range "(?<Private_Address>\d{1,3}\.\d{1,3}\.\d{1,3}),(?<Lower_Port>\d+),(?<Upper_Port>\d+)"
| where (Lower_Port<=port AND Upper_Port>=port) OR port_range=""
| eval internal_IP=Private_Address.lastoctet
```Finally, we make sure translated_IP is either the translated result or the original IP if it's not in the PAT table```
| eval translated_IP=coalesce(internal_IP,translated_IP)
```Clean up of added fields```
| fields - Lower_Port,Upper_Port,Private_Address,first3octets,lastoctet,internal_IP,port_range

0 Karma

bowesmana
SplunkTrust
SplunkTrust

Here's a somewhat kludgy example of how you can do the comparison LT and GT for port range on the MV fields you get from the lookup - most of this is just setup for an example to show MV fields for IPS/ranges.

You need the last 4 lines - note that this will allow for 10 port ranges per local IP in the foreach command

| makeresults
| eval last_octet=(random() % 253 + 1)
| eval src_ip=(random() % 253 + 1).".".(random() % 253 + 1).".".(random() % 253 + 1).".".last_octet
| eval src_port=random() % 32768 + 512
| eval n=mvrange(1,11,1) 
| mvexpand n
| eval local_ip="10.1.".n.".".last_octet
| eval low_port=random() % (2048 * n) + 512
| eval high_port=random() % (4096 * n) + low_port
| stats list(local_ip) as local_ip list(low_port) as low_port list(high_port) as high_port by src_ip src_port
| eval ix=-1
| foreach 0 1 2 3 4 5 6 7 8 9 10 [ eval l=tonumber(mvindex(low_port, <<FIELD>>)), h=tonumber(mvindex(high_port, <<FIELD>>)), ix=if(ix=-1 AND src_port >= l AND src_port <= h, <<FIELD>>, ix) ]
| eval local_addr=mvindex(local_ip, ix)
| fields - l h ix

 

0 Karma
Get Updates on the Splunk Community!

Built-in Service Level Objectives Management to Bridge the Gap Between Service & ...

Wednesday, May 29, 2024  |  11AM PST / 2PM ESTRegister now and join us to learn more about how you can ...

Get Your Exclusive Splunk Certified Cybersecurity Defense Engineer at Splunk .conf24 ...

We’re excited to announce a new Splunk certification exam being released at .conf24! If you’re headed to Vegas ...

Share Your Ideas & Meet the Lantern team at .Conf! Plus All of This Month’s New ...

Splunk Lantern is Splunk’s customer success center that provides advice from Splunk experts on valuable data ...