Monday, 6 February 2017

Netflow version 9 collector in python

In my last post, I had provided a code snippet how to write netflow version 5 collector in python.In this post, I will give a code snippet of netflow version 9 collector. Please note that this code is not production ready. I had just written it for proof of concept.

import socket, struct
import threading,logging
from socket import inet_ntoa
from utils.enums import template_field
from utils.parse import parse

LOG_FILENAME = 'log.out'
#logging.basicConfig(filename=LOG_FILENAME,format='%(levelname)s:%(message)s',level=logging.DEBUG,)
logging.basicConfig(format='%(levelname)s:%(message)s',level=logging.DEBUG,)

SIZE_OF_HEADER = 20

#templates = [{"id":265,"data_length":48,"description":[{"field_type": 21, "field_length": 4}, {"field_type": 22, "field_length": 4}, {"field_type": 1, "field_length": 4}, {"field_type": 2, "field_length": 4}, {"field_type": 10, "field_length": 2}, {"field_type": 14, "field_length": 2}, {"field_type": 8, "field_length": 4}, {"field_type": 12, "field_length": 4}, {"field_type": 4, "field_length": 1}, {"field_type": 5, "field_length": 1}, {"field_type": 7, "field_length": 2}, {"field_type": 11, "field_length": 2}, {"field_type": 48, "field_length": 1}, {"field_type": 51, "field_length":1}, {"field_type": 15, "field_length": 4}, {"field_type": 13, "field_length": 1}, {"field_type": 9, "field_length": 1}, {"field_type": 6, "field_length": 1}, {"field_type": 61, "field_length": 1}, {"field_type": 17, "field_length": 2}, {"field_type": 16, "field_length": 2}]}]

templates=[]

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('0.0.0.0', 9998))

def processPacket(data,addr):
    (version, count) = struct.unpack('!HH',data[0:4])
    logging.debug("Version %s, count %s "%(version,count))
    if version != 9:
       logging.error("Not NetFlow v9!")
       return None
    uptime = socket.ntohl(struct.unpack('I',data[4:8])[0])
    epochseconds = socket.ntohl(struct.unpack('I',data[8:12])[0])
    logging.debug("Uptime %s , epochseconds %s "% (uptime, epochseconds))
    data=data[SIZE_OF_HEADER:]
    while len(data) >0:
        (flow_set_id, flow_set_length) = struct.unpack('!HH',data[0:4])
        logging.debug("flow_set_id %d, flow_set_length %d "%(flow_set_id,flow_set_length))
        my_data = data[4:flow_set_length]
        data = data[flow_set_length:]
        if flow_set_id == 0:
           # data template found.
           template={}
           (template_id, template_field_length) = struct.unpack('!HH',my_data[0:4])
           logging.debug("template_id %d, template_field_length %d "%(template_id,template_field_length))
           my_data=my_data[4:]
           template['id']= template_id
           template['description']=[]
           template['data_length']=0
           template['address']=addr[0]
           for i in xrange(0,template_field_length*4,4):
               template_element={}
               template_element['field_type']=parse(my_data[i:i+2],"INT",2)
               template_field_length = parse(my_data[i+2:i+4],"INT",2)
               template_element['field_length']=template_field_length
               template['data_length'] += template_field_length
               template['description'].append(template_element)
           for temp in templates:
               if temp["id"]== template_id:
                  #update dict
                  templates.remove(temp)
                  break
           templates.append(template)
           logging.debug(templates)


        if flow_set_id == 1:  # options template found.Lets add it to template list
            while len(my_data) >6:
              template={}
              (template_id, option_scope_length) = struct.unpack('!HH',my_data[0:4])
              logging.debug("option template_id %d, option_scope_length %d "%(template_id,option_scope_length))
              option_length = struct.unpack('!H',my_data[4:6])[0]
              my_data=my_data[6:]
              if template_id == 0 or option_scope_length >0:
                 # probably padding or special case. Right now not handling
                 my_data=my_data[option_scope_length:]
                 break
              else:
                 template['id']= template_id
                 template['description']=[]
                 template['data_length']=0
                 template['address']=addr[0]
                 for i in xrange(0,option_length,4):
                     template_element={}
                     template_element['field_type']=parse(my_data[i:i+2],"INT",2)
                     template_field_length = parse(my_data[i+2:i+4],"INT",2)
                     template_element['field_length']=template_field_length
                     template['data_length'] += template_field_length
                     template['description'].append(template_element)
                 for temp in templates:
                     if temp["id"]== template_id:
                        #update dict
                        templates.remove(temp)
                        break
                 templates.append(template)
                 logging.debug(templates)
              my_data=my_data[option_length:]
              #padding = flow_set_length - (10 +option_scope_length + option_length)
              #my_data=my_data[padding:]
          if flow_set_id > 255:
           # let us parse flow data
           # first check if template present
           my_template = None
           for template in templates:
               if flow_set_id == template["id"] and addr[0] == template['address']: #check if template from same ip exist
                  my_template = template
                  break
           if not my_template:
              logging.debug("No suitable template found")
           else:
              nf_data=[]
              template_total_data_length = my_template['data_length']
              while len(my_data) >= template_total_data_length:
                  for field in my_template['description']:
                      field_name = template_field[field['field_type']]['name']
                      field_type = template_field[field['field_type']]['data_type']
                      field_length = field['field_length']
                      if field_length ==0:
                         field_length = template_field[field['field_type']]['default']
                      logging.debug("Data length = %d "%(field_length))
                      ext_data = parse(my_data[:field_length],field_type,field_length)
                      logging.debug ("%s : %s"%(field_name, ext_data))
                      nf_data.append({field_name:ext_data})
                      my_data = my_data[field_length:]
              logging.info(nf_data)


while True:
      buf, addr = sock.recvfrom(1500)
      t = threading.Thread(target=processPacket, args=(buf,addr))
      t.start()



1 comment:

  1. i am getting an Import Error no module named utils.enums

    ReplyDelete