0
mirror of https://github.com/gusmanb/logicanalyzer.git synced 2025-02-05 18:17:44 +00:00
gusmanb-logicanalyzer/Software/decoders/smartcard/pd.py
2024-10-12 12:08:11 +02:00

615 lines
24 KiB
Python

##
## Copyright (C) 2020 Sven Soltermann <sven@handyman.ch>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
##
import sigrokdecode as srd
import struct
import sys
import codecs
import ctypes
class pcap_udp_pkt():
# GSM TAP
h = b''
# layer_2
h += b'\x00\x00\x00\x00\x00\x00' # destination_mac
h += b'\x00\x00\x00\x00\x00\x00' # source_mac
h += b'\x08\x00' # layer_3_protocol
# layer_3
h += b'\x45' # version
h += b'\x00' # DiffServField
h += b'\xFF\xFF' # total_length
h += b'\x2B\x0D' # Identification
h += b'\x40\x00' # Flags
h += b'\x40' # TTL
h += b'\x11' # layer_4_protocol
h += b'\x00\x00' # header_checksum
h += b'\x7F\x00\x00\x01' # source_ip
h += b'\x7F\x00\x00\x01' # dest_ip
# layer_4
h += b'\xcc\x46' # source_port
h += b'\x12\x79' # dest_port
h += b'\x00\x00' # datagram_length
h += b'\x00\x00' # checksum
def __init__(self, ts, data):
self.header = bytearray(pcap_udp_pkt.h)
self.data = b''
self.set_timestamp(ts)
self.set_data(data)
def set_timestamp(self, ts):
self.timestamp = ts
def set_data(self, data):
self.data = list(b'\x02\x04\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + bytes(data));
self.header[16:18] = struct.pack('>H', len(self.data) + 28)
self.header[38:40] = struct.pack('>H', len(self.data) + 8)
def packet(self):
return bytes(self.header) + bytes(self.data)
def record_header(self):
# See https://wiki.wireshark.org/Development/LibpcapFileFormat.
(secs, usecs) = self.timestamp
h = struct.pack('<I', secs) # TS seconds
h += struct.pack('<I', usecs) # TS microseconds
# No truncation, so both lengths are the same.
h += struct.pack('<I', len(self)) # Captured len
h += struct.pack('<I', len(self)) # Original len
return h
def __len__(self):
return len(self.h) + len(self.data)
class Decoder(srd.Decoder):
api_version = 3
id = 'iso7816'
name = 'ISO 7816'
longname = 'Smartcard'
desc = 'ISO 7816 decoder (smartcard)'
license = 'gplv2+'
inputs = ['logic']
outputs = ['iso7816']
tags = ['Embedded/industrial']
channels = (
{'id': 'clk', 'name': 'CLK', 'desc': 'clock'},
{'id': 'data', 'name': 'data', 'desc': 'data'},
)
options = (
{'id': 'clock_option', 'desc': 'Clock option',
'default': 'native', 'values': ('native', 'detect', 'sample_as_clock')},
{'id': 'protocol', 'desc': 'Protocol',
'default': 'auto', 'values': ('auto', 'T=0', 'T=1')},
{'id': 'starts_with_atr', 'desc': 'Starts with ATR',
'default': 'true', 'values': ('true', 'false')},
)
annotations = (
('warnings', 'Human-readable warnings'),
('byte', 'Byte'),
('atr', 'ATR (Answer to Reset)'),
('pps', 'PPS (Protocol and parameters selection)'),
('t0', 'T=0 packet'),
('t1', 'T=1 packet'),
('t1-iblock', 'T=1 I-Block'),
('t1-rblock', 'T=1 R-Block'),
('t1-sblock', 'T=1 S-Block'),
('apdu', 'APDU'),
)
annotation_rows = (
('warnings', 'Warnings', (0,)),
('byte', 'Bytes', (1,)),
('type', 'Type', (2,3,4,5,)),
('t1', 'T=1 Decode', (6,7,8,)),
('apdu', 'apdu', (9,)),
)
binary = (
('pcap', 'PCAP format'),
)
clock_rate = {
0: 372,
1: 372,
2: 558,
3: 744,
4: 1116,
5: 1488,
6: 1860,
9: 512,
10: 768,
11: 1024,
12: 1536,
13: 2048
}
baud_rate = {
0: 1,
1: 1,
2: 2,
3: 4,
4: 8,
5: 16,
6: 32,
7: 64,
8: 12,
9: 20
}
def __init__(self):
self.reset()
def log(self, *args):
print(args, file=sys.stderr)
def reset(self):
self.peeked_byte = None
self.peeked_samplenum = -1
self.wrote_pcap_header = False
self.samplerate = None
self.sample_as_clock = False
self.detect_clock = False
self.fi = self.clock_rate[0]
self.di = self.baud_rate[0]
self.ss = self.es = -1
self.state = 'FIND START'
self.bits = []
self.clock_skip = 372
self.lastSamplePositive = True
self.sampleOverflowCount = 0
def metadata(self, key, value):
if key == srd.SRD_CONF_SAMPLERATE:
self.samplerate = value
if self.samplerate:
self.secs_per_sample = float(1) / float(self.samplerate)
def start(self):
self.out_python = self.register(srd.OUTPUT_PYTHON)
self.out_ann = self.register(srd.OUTPUT_ANN)
self.out_binary = self.register(srd.OUTPUT_BINARY)
self.sample_as_clock = self.options['clock_option'] == "sample_as_clock"
self.detect_clock = self.options['clock_option'] == "detect"
if self.options['starts_with_atr'] == "true":
self.state = 'FIND START'
else:
self.state = 'DATA'
if (self.options['protocol'] == "T=0"):
self.hasT0 = True
self.hasT1 = False
elif (self.options['protocol'] == "T=1"):
self.hasT0 = False
self.hasT1 = True
def pcap_global_header(self):
# See https://wiki.wireshark.org/Development/LibpcapFileFormat.
h = b'\xd4\xc3\xb2\xa1' # Magic, indicate microsecond ts resolution
h += b'\x02\x00' # Major version 2
h += b'\x04\x00' # Minor version 4
h += b'\x00\x00\x00\x00' # Correction vs. UTC, seconds
h += b'\x00\x00\x00\x00' # Timestamp accuracy
h += b'\xff\xff\x00\x00' # Max packet len
h += b'\x01\x00\x00\x00' # Link layer
return h
def get_bytes(self, bits):
byte = 0
for bit in list(reversed(bits)):
byte = (byte << 1) | bit
return byte
def write_pcap_header(self):
if not self.wrote_pcap_header:
self.put(0, 0, self.out_binary, [0, self.pcap_global_header()])
self.wrote_pcap_header = True
def ts_from_samplenum(self, sample):
x = ctypes.c_uint32(sample).value;
ovrflow = ctypes.c_uint32(int(self.sampleOverflowCount * 0xFFFFFFFF * self.secs_per_sample)).value
ts = float(x) * self.secs_per_sample + ovrflow
return (int(ts), int((ts % 1.0) * 1e6))
def read_first_byte(self):
self.bits = []
self.ss = self.samplenum
self.clock_skip = 0
if (self.sample_as_clock):
pins = self.wait({1: 'r'})
self.clock_skip = self.samplenum - self.ss + 2
self.detected_clock_skip = self.clock_skip
self.wait({'skip': int(self.clock_skip / 3)})
self.bits.append(0)
elif (self.detect_clock):
while (True):
pins = self.wait({0: 'r'})
self.clock_skip += 1
if (pins[1] == 1):
self.bits.append(0)
for c in range(int(self.clock_skip / 2)):
self.wait({0: 'r'})
break;
self.detected_clock_skip = self.clock_skip
else:
self.clock_skip = 372
self.bits.append(0)
for c in range(int(self.clock_skip / 2 + self.clock_skip)):
self.wait({0: 'r'})
for x in range(9):
pins = self.wait({'skip': 0})
self.bits.append(pins[1])
if (self.sample_as_clock):
self.wait({'skip': self.clock_skip - 4})
else:
for c in range(int(self.clock_skip)):
self.wait({0: 'r'})
self.log("FIRSTBIT, clockskip:", self.clock_skip,"sample_as_clock: ", self.sample_as_clock)
self.es = self.samplenum
if (self.bits.count(1) % 2 != 0):
self.log("CHKSUM ERROR: " + str(pins[1]) + " sample: "+ str(self.samplenum))
self.put(self.ss, self.samplenum, self.out_ann, [0, ["CHKSUM ERROR bits={bits}".format(bits=self.bits)]])
byte = self.get_bytes(self.bits[1:9])
self.log(self.samplenum, self.bits[1:9], " : ", "0x{:02x}".format(byte))
self.put(self.ss, self.samplenum, self.out_ann, [1, [hex(byte)]])
return byte
def read_byte(self):
if (self.samplenum < 0 and self.lastSamplePositive):
self.lastSamplePositive = False
if (self.lastSamplePositive == False and self.samplenum > 0 ):
self.sampleOverflowCount += 1
self.lastSamplePositive = True
if (self.peeked_byte != None):
byte =self.peeked_byte
self.peeked_byte = None
self.peeked_samplenum = -1
return byte;
self.wait({1: 'f'})
self.sleep_cycles()
return self.read_byte_no_wait();
def peek_byte(self):
self.wait({1: 'f'})
self.sleep_cycles()
self.peeked_samplenum = self.samplenum
self.peeked_byte = self.read_byte_no_wait();
return self.peeked_byte;
def read_byte_no_wait(self):
self.bits = []
self.ss = self.samplenum
for x in range(10):
pins = self.wait({'skip': 0})
#if (x != 0 and x < 9):
#self.log("read_bit: " + str(pins[1]) + " sample: "+ str(self.samplenum))
self.bits.append(pins[1])
if (self.sample_as_clock):
self.wait({'skip': self.clock_skip - 4})
else:
for c in range(int(self.clock_skip)):
self.wait({0: 'r'})
self.es = self.samplenum
if (self.bits.count(1) % 2 != 0):
self.log(self.samplenum, "CHKSUM ERROR: ", pins[1], "bits: ", self.bits)
self.put(self.ss, self.samplenum, self.out_ann, [0, ["CHKSUM ERROR bits={bits}".format(bits=self.bits)]])
byte = self.get_bytes(self.bits[1:9])
self.log(self.samplenum, self.bits[1:9], " : ", "0x{:02x}".format(byte))
self.put(self.ss, self.samplenum, self.out_ann, [1, [hex(byte)]])
return byte
def sleep_cycles(self):
if (self.sample_as_clock):
self.wait({'skip': int(self.clock_skip / 3)})
else:
for c in range(int(self.clock_skip / 3)):
self.wait({0: 'r'})
def handle_atr(self, pins):
atr_start = self.samplenum;
self.log("START ATR:", self.samplenum);
self.state = 'ATR'
tA = []
tB = []
tC = []
tD = []
historicalBytes = []
self.ATR = []
if (self.peeked_byte != None):
atr_start = self.peeked_samplenum
byte = self.read_byte()
else:
byte = self.read_first_byte()
self.ATR.append(byte)
t0 = self.read_byte()
self.ATR.append(t0)
firstT0 = t0
self.hasT0 = False
self.hasT1 = False
self.hasT15 = False
while (firstT0 & 0b11110000):
if (firstT0 & 0b00010000):
byte = self.read_byte()
tA.append(byte)
self.ATR.append(byte)
if (firstT0 & 0b00100000):
byte = self.read_byte()
tB.append(byte)
self.ATR.append(byte)
if (firstT0 & 0b01000000):
byte = self.read_byte()
tC.append(byte)
self.ATR.append(byte)
if (firstT0 & 0b10000000):
byte = self.read_byte()
if ((byte & 0x0F) == 0):
self.hasT0 = True
elif ((byte & 0x0F) == 1):
self.hasT1 = True
elif ((byte & 0x0F) == 15):
self.hasT15 = True
else:
self.log("Invalid Protocol in ATR: ", "T=",(byte & 0x0F))
self.put(atr_start, self.samplenum, self.out_ann, [0, ["Invalid Protocol in ATR T={protocol}".format(protocol=(byte & 0x0F))]])
tD.append(byte)
self.ATR.append(byte)
firstT0 = byte
self.log("TD("+str(len(tD))+"): ", hex(firstT0), "T=",(byte & 0x0F))
else:
firstT0 = 0;
for _ in range(0, t0 & 0x0F):
byte = self.read_byte()
self.ATR.append(byte)
if (self.hasT1 == False and self.hasT0 == False):
self.hasT0 = True
if (self.hasT1 == True or self.hasT15):
byte = self.read_byte()
self.ATR.append(byte)
xor = 0
for i in range(1, len(self.ATR)): xor = xor ^ self.ATR[i]
if (xor != 0):
self.put(atr_start, self.samplenum, self.out_ann, [0, ["Invalid TCK in ATR, got={tck:02x} expected={xor:02x}".format(tck=byte,xor=xor)]])
self.log("Invalid TCK in ATR", hex(byte), hex(xor))
self.put(atr_start, self.samplenum, self.out_ann, [2, ["ATR", "ATR={atr}".format(atr=codecs.encode(bytes(self.ATR), 'hex'))]])
self.put(atr_start, self.samplenum, self.out_python, [0, self.ATR])
self.log("ENDATR", codecs.encode(bytes(self.ATR), 'hex'))
self.state = 'DATA'
if (self.options['protocol'] == "T=0"):
self.hasT0 = True
self.hasT1 = False
elif (self.options['protocol'] == "T=1"):
self.hasT0 = False
self.hasT1 = True
def t1_parse_block(self, es):
packet = []
lrc = 0;
nad = self.read_byte()
lrc = lrc ^ nad
self.put(es, self.samplenum, self.out_ann, [1, "NAD:" + hex(nad)])
sad = nad & 0x70
dad = nad & 0x07
self.log("T=1 NAD=", hex(nad), "SAD=", hex(sad), "DAD=", hex(dad))
pcb = self.read_byte()
lrc = lrc ^ pcb
isSBlock = False
isRBlock = False
isIBlock = False
if (pcb & 0b11000000 == 0b11000000):
# S-Block
isSBlock = True
self.put(es, self.samplenum, self.out_ann, [3, "PCB S " + hex(pcb)])
self.log("PCB S-Block", hex(pcb & 0b00111111))
elif (pcb & 0b10000000 == 0b10000000):
# R-Block
isRBlock = True
self.put(es, self.samplenum, self.out_ann, [3, "PCB R " + hex(pcb)])
self.log("PCB R-Block", hex(pcb & 0b00111111))
else:
# I-Block
isIBlock = True
self.put(es, self.samplenum, self.out_ann, [3, "PCB I " + hex(pcb)])
self.log("PCB I-Block", hex(pcb))
bLen = self.read_byte()
lrc = lrc ^ bLen
if (bLen > 0):
for _ in range(bLen):
byte = self.read_byte()
lrc = lrc ^ byte
if (isIBlock): packet.append(byte)
# CRC to implement
bLrc = self.read_byte()
lrc = lrc ^ bLrc
if (lrc != 0):
self.put(es, self.samplenum, self.out_ann, [0, ["Invalid checksum on T=1 block, , got={got:02x} expected={expected:02x}".format(got=lrc,expected=bLrc)]])
self.log("Invalid checksum on T=1 block", hex(lrc), hex(bLrc))
self.log("block_content", codecs.encode(bytes(packet), 'hex'))
if (isIBlock):
self.put(es, self.samplenum, self.out_ann, [6, ["I-Block", "I-Block len={len} isMultiBlock={multi}".format(len=bLen,multi=(pcb & 0b00100000 > 0))]])
if (isRBlock):
self.put(es, self.samplenum, self.out_ann, [7, ["R-Block", "R-Block flag={flag:02x}".format(flag=pcb & 0b00111111)]])
if (isSBlock):
self.put(es, self.samplenum, self.out_ann, [8, ["S-Block", "S-Block flag={flag:02x}".format(flag=pcb & 0b00111111)]])
if (isIBlock and pcb & 0b00100000): # m-flag
self.log("T=1 Multiblock flag", hex(pcb))
while (True):
isIBlock2,packet2 = self.t1_parse_block(self.samplenum)
if (isIBlock2):
packet = packet + packet2
break
return isIBlock,packet;
def handle_pps(self):
lrc = 0
ss = self.peeked_samplenum
pps = self.read_byte()
pps0 = self.read_byte()
pps1 = 0; pps2 = 0; pps3 = 0
if (pps0 & 0b00010000):
pps1 = self.read_byte()
lrc = lrc ^ pps1
if (pps0 & 0b00100000):
pps2 = self.read_byte()
lrc = lrc ^ pps2
if (pps0 & 0b01000000):
pps3 = self.read_byte()
lrc = lrc ^ pps3
pck = self.read_byte()
lrc = lrc ^ pps ^ pps0 ^ pck
if (lrc != 0):
self.put(ss, self.samplenum, self.out_ann, [0, ["INVALID Checksum on PPS Request, got={got:02x} expected={expected:02x}".format(got=pck,expected=(lrc ^ pps ^ pps0))]])
self.log("INVALID Checksum on PPS Request", hex(lrc))
r_lrc = 0
r_pps = self.read_byte()
r_pps1 = 0; r_pps2 = 0; r_pps3 = 0
if (r_pps != 0xFF):
self.put(ss, self.samplenum, self.out_ann, [0, ["PPS Request not confirmed"]])
self.log("PPS Request not confirmed", r_pps)
r_pps0 = self.read_byte()
if (r_pps0 & 0b00010000):
r_pps1 = self.read_byte()
r_lrc = r_lrc ^ r_pps1
if (r_pps0 & 0b00100000):
r_pps2 = self.read_byte()
r_lrc = r_lrc ^ r_pps2
if (r_pps0 & 0b01000000):
r_pps3 = self.read_byte()
r_lrc = r_lrc ^ r_pps3
r_pck = self.read_byte()
r_lrc = r_lrc ^ r_pps ^ r_pps0 ^ r_pck
if (r_lrc != 0):
self.put(ss, self.samplenum, self.out_ann, [0, ["INVALID Checksum on PPS Response, got={got:02x} expected={expected:02x}".format(got=r_pck,expected=(r_lrc ^ r_pps ^ r_pps0))]])
self.log("INVALID Checksum on PPS Response", hex(r_lrc))
if (pps0 == r_pps0 and pps1 == r_pps1 and pps2 == r_pps2 and pps3 == r_pps3):
if (self.detect_clock or self.sample_as_clock):
tmp_fi = self.clock_rate[int(pps1 >> 4)]
tmp_di = self.baud_rate[int(pps1 & 0x0F)]
tmp_clock_skip = int(tmp_fi / tmp_di)
self.log("Received PPS change: FI", tmp_fi, "DI", tmp_di, "clock_skip", tmp_clock_skip)
self.clock_skip = int(tmp_clock_skip * self.detected_clock_skip / 372)
self.fi = tmp_fi
self.di = tmp_di
self.log("PPS Success new settings (calculated): FI", self.fi, "DI", self.di, "clock_skip", self.clock_skip)
else:
self.fi = self.clock_rate[int(pps1 >> 4)]
self.di = self.baud_rate[int(pps1 & 0x0F)]
self.clock_skip = int(self.fi / self.di)
self.log("PPS Success new settings: FI", self.fi, "DI", self.di, "clock_skip", self.clock_skip)
else:
self.log("INVALID PPS. Request & Response not matching.", hex(r_lrc))
self.put(ss, self.samplenum, self.out_ann, [0, ["INVALID PPS. Request & Response not matching"]])
self.put(ss, self.samplenum, self.out_ann, [3, ["PPS", "PPS DI={di} FI={fi} clock_skip={clock_skip}".format(di=self.di,fi=self.fi,clock_skip=self.clock_skip)]])
def decode(self):
self.write_pcap_header();
while True:
# State machine.
if self.state == 'FIND START':
self.wait({1: 'h'})
self.handle_atr(self.wait({1: 'f'}))
elif self.state == 'DATA':
packet = [];
firstByte = self.peek_byte();
if (firstByte == 0xFF): # PPS Request
self.handle_pps();
continue;
elif (firstByte == 0x3b): # Probably ATR
self.handle_atr(self.wait({'skip': 0}))
continue;
es = self.peeked_samplenum;
if (self.hasT0):
bClass = self.read_byte()
packet.append(bClass); # class
bIns = self.read_byte();
packet.append(bIns); # instruction
packet.append(self.read_byte()); # param1
packet.append(self.read_byte()); # param2
dataLen = self.read_byte();
self.log("DATALEN: ", dataLen)
packet.append(dataLen); # param3
procedureByte = self.read_byte();
if (procedureByte == bIns):
for _ in range(0,dataLen):
packet.append(self.read_byte()); # payload
packet.append(self.read_byte()); # status0
packet.append(self.read_byte()); # status1
elif (procedureByte == 0x60):
packet.append(procedureByte); # status0
packet.append(self.read_byte()); # status1
elif (procedureByte & 0xF0 == 0x60 or procedureByte & 0xF0 == 0x90):
packet.append(procedureByte); # status0
packet.append(self.read_byte()); # status1
else:
self.put(es, self.samplenum, self.out_ann, [0, ["INVALID Procedure Byte"]])
self.log("INVALID Procedure Byte", hex(procedureByte))
self.put(es, self.samplenum, self.out_ann, [4, ["T=0"]])
self.put(es, self.samplenum, self.out_ann, [9, ["APDU", "APDU cls={cls:02x} ins={ins:02x}".format(cls=bClass,ins=bIns), "APDU cls={cls:02x} ins={ins:02x} p1={p1:02x} p2={p2:02x} p3={p3:02x} len={len} status={sw1:02x}{sw2:02x}".format(cls=bClass,ins=bIns,p1=packet[2],p2=packet[3],p3=packet[4],len=dataLen,sw1=packet[-2],sw2=packet[-1])]])
elif (self.hasT1):
isIBlock,packet = self.t1_parse_block(es)
if (isIBlock):
while (True):
isIBlock,packet2 = self.t1_parse_block(es)
if (isIBlock):
packet = packet + packet2
break
self.put(es, self.samplenum, self.out_ann, [4, ["T=1", "T=1 (reassembled)"]])
if (len(packet) >= 8):
self.put(es, self.samplenum, self.out_ann, [9, ["APDU", "APDU cls={cls:02x} ins={ins:02x}".format(cls=packet[0],ins=packet[1]), "APDU cls={cls:02x} ins={ins:02x} p1={p1:02x} p2={p2:02x} p3={p3:02x} len={len} status={sw1:02x}{sw2:02x}".format(cls=packet[0],ins=packet[1],p1=packet[2],p2=packet[3],p3=packet[4],len=len(packet) - 7,sw1=packet[-2],sw2=packet[-1])]])
if (len(packet) > 0):
ts = self.ts_from_samplenum(es)
pkt = pcap_udp_pkt(ts, packet)
self.put(es, self.samplenum, self.out_binary, [0, pkt.record_header()])
self.put(es, self.samplenum, self.out_binary, [0, pkt.packet()])
self.log("PACKETEND", codecs.encode(bytes(packet), 'hex'))
else:
break;