Some improvment for reading the code

master
Eloan ANDRÉ 2 years ago
parent 8ff3ee048d
commit 3ec1106010

@ -4,11 +4,11 @@
#https://scapy.readthedocs.io/en/latest/api/scapy.packet.html#scapy.packet.Packet #https://scapy.readthedocs.io/en/latest/api/scapy.packet.html#scapy.packet.Packet
#https://scapy.readthedocs.io/en/latest/api/scapy.contrib.modbus.html #https://scapy.readthedocs.io/en/latest/api/scapy.contrib.modbus.html
import getpass
import scapy.all as scapy import scapy.all as scapy
import scapy.contrib.modbus as mb import scapy.contrib.modbus as mb
from triPacket import triPacket from triPacket import triPacket
import getpass
def decode(pkt): def decode(pkt):
if "ModbusADU" in pkt: if "ModbusADU" in pkt:
@ -81,17 +81,17 @@ def decode(pkt):
#print(bigL) #print(bigL)
bigL = [] bigL = []
print("In order for data sniffed to be stored inside the database, please register the following :") print("In order for data sniffed to be stored inside the database, please register the following :")
db_host = input('host of the database server : ') DB_HOST = input('host of the database server : ')
if not db_host: if not DB_HOST:
db_host = '192.168.128.141' DB_HOST = '192.168.128.141'
db_name = input('name of the database : ') DB_NAME = input('name of the database : ')
if not db_name: if not DB_NAME:
db_name = 'dblodufour1' DB_NAME = 'dblodufour1'
db_user = input('login of the user : ') DB_USER = input('login of the user : ')
if not db_user: if not DB_USER:
db_user = 'lodufour1' DB_USER = 'lodufour1'
db_password = getpass.getpass('user password : ') DB_PASSWORD = getpass.getpass('user password : ')
connec=[db_host,db_name,db_user,db_password] connec=[DB_HOST,DB_NAME,DB_USER,DB_PASSWORD]
miniL = [0,0,0] miniL = [0,0,0]

@ -0,0 +1,98 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#https://scapy.readthedocs.io/en/latest/api/scapy.packet.html#scapy.packet.Packet
#https://scapy.readthedocs.io/en/latest/api/scapy.contrib.modbus.html
import getpass
import scapy.all as scapy
import scapy.contrib.modbus as mb
from triPacket import triPacket
def decode(pkt):
if "ModbusADU" in pkt:
global miniL
modpkt = pkt["ModbusADU"]
typeCall = ""
if "Multiple" in modpkt.payload.name:
typeCall += "m"
else:
typeCall += "s"
if "Coil" in modpkt.payload.name:
typeCall += "C"
else:
typeCall += "R"
miniL[0] = typeCall
if "Read" in modpkt.payload.name:
typeCall += "r"
if "Response" in modpkt.payload.name:
byteCount = modpkt.payload.getfieldval("byteCount")
status = []
if "C" in typeCall:
Value = modpkt.payload.getfieldval("coilStatus")
for j in range(byteCount):
bi=str(bin(Value[j]))
bi=bi[2:]
bi=bi.rjust(8,'0')[::-1]
status.extend(k for k in [*bi])
else:
status = modpkt.payload.getfieldval("registerVal")
for j in range(len(status)):
#bigL.append([miniL[0],status[j],miniL[2]+j])
triPacket([miniL[0][1],miniL[2]+j,status[j]],connec)
miniL = [0,0,0]
else:
startAddr = modpkt.payload.getfieldval("startAddr")
miniL[2] = startAddr
else:
typeCall += "w"
if "Response" in modpkt.payload.name:
pass
else:
if "m" in typeCall:
addr = modpkt.payload.getfieldval("startAddr")
#byteCount = modpkt.payload.getfieldval("quantityOutput")
outputValue = modpkt.payload.getfieldval("outputsValue")
status = []
if "C" in typeCall:
for j in range(len(outputValue)):
val=str(bin(outputValue[j]))[2:].rjust(8,'0')[::-1]
status.extend(k for k in [*val])
for j in range(len(status)):
#bigL.append([miniL[0],status[j],addr+j]) # changer 5 pour mettre l'adresse
triPacket([miniL[0][1],addr+j,status[j]],connec)
#print(bigL)
else:
if "C" in typeCall:
addr = modpkt.payload.getfieldval("outputAddr")
outputValue = modpkt.payload.getfieldval("outputValue")
else:
addr = modpkt.payload.getfieldval("registerAddr")
outputValue = modpkt.payload.getfieldval("registerValue")
triPacket([miniL[0][1],addr,outputValue],connec)
#bigL.append([miniL[0],outputValue,addr])
#print(bigL)
bigL = []
print("In order for data sniffed to be stored inside the database, please register the following :")
DB_HOST = input('host of the database server : ')
if not DB_HOST:
DB_HOST = '192.168.128.141'
DB_NAME = input('name of the database : ')
if not DB_NAME:
DB_NAME = 'dblodufour1'
DB_USER = input('login of the user : ')
if not DB_USER:
DB_USER = 'lodufour1'
DB_PASSWORD = getpass.getpass('user password : ')
connec=[DB_HOST,DB_NAME,DB_USER,DB_PASSWORD]
miniL = [0,0,0]
scapy.sniff(iface="lo", prn=decode)
Loading…
Cancel
Save