-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathTrace.py
87 lines (71 loc) · 2.88 KB
/
Trace.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import os, sys, re
import fcntl
import argparse
import logging
import serial
import datetime
import TraceDecoder
_CSV_HEADER = 'timestamp, counter, epoch [ms], message, raw hex'
parser = argparse.ArgumentParser(description='trace - trace decoder application')
parser.add_argument('-i', '--input', choices=['file', 'serial'], default='serial')
parser.add_argument('-m', '--map', default='')
parser.add_argument('path', nargs='?', default='/dev/ttyUSB0:576000:8N1',
help='path to file or serial port identifier')
parser.add_argument('-d', '--debug', default=False, action='store_true')
args = parser.parse_args()
if args.debug:
lv = logging.DEBUG
else:
lv = logging.INFO
logging.basicConfig(level=lv,
format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s',
datefmt='%m-%d %H:%M:%S',
filemode='w')
try:
with open(args.map) as f:
lines = f.readlines()
sigre = re.compile('ESM_SIGNAL\((\w+)\)')
idre = re.compile('ESM_ID\((\w+)\)')
sigs = ['alarm']
idxs = ['tick', 'trace']
for l in lines:
sig = sigre.search(l)
idx = idre.search(l)
if sig:
sigs.append(sig.group(1))
if idx:
idxs.append(idx.group(1))
if args.input == 'serial':
logging.info('Trying to open serial port: ' + args.path)
rs = re.match('([^:]+):([0-9]+):([5678])([NOE])([12])', args.path)
if rs:
breader = serial.Serial(rs.group(1), rs.group(2), timeout=None, bytesize=int(rs.group(3)),
parity=rs.group(4), stopbits=int(rs.group(5)))
fcntl.flock(breader.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
reader = TraceDecoder.SerialTraceReader(breader, 1)
else:
raise ValueError('Wrong serial port format: ' + args.path)
else:
logging.info('Trying to open file: ' + args.path)
breader = open(args.path, 'rb')
reader = TraceDecoder.FileTraceReader(breader, 1000000)
try:
ts_str = datetime.datetime.now().strftime("%Y-%m-%d-%H:%M:%S")
wname = os.path.basename(args.path) + '.csv'
logging.info('Opening output file: ' + wname)
bwriter = open(wname, 'wb')
exporter = TraceDecoder.CSVTraceExporter(bwriter, _CSV_HEADER)
try:
decoder = TraceDecoder.TraceDecoder(reader, exporter, sigs, idxs)
decoder.decode()
finally:
logging.debug('Closing the writer')
bwriter.close()
os.rename(bwriter.name, ts_str + '-' + os.path.basename(bwriter.name))
finally:
logging.debug('Closing the reader')
breader.close()
except KeyboardInterrupt:
logging.info('Exiting due to user keyboard press')
except (ValueError, IOError, serial.SerialException):
logging.error(sys.exc_info()[1])