blob: c1c0fdfd2aa6b9d666824fea641db916d7989656 [file] [log] [blame]
##
## This file is part of the libsigrokdecode project.
##
## Copyright (C) 2014 Google, Inc
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
import sigrokdecode as srd
import traceback
from zlib import crc32
NO_EOP = -1
HARD_RESET = -2
# Control Message type
CTRL_TYPES = {
0 : "reserved",
1 : "GOOD_CRC",
2 : "GOTO_MIN",
3 : "ACCEPT",
4 : "REJECT",
5 : "PING",
6 : "PS_RDY",
7 : "GET_SOURCE_CAP",
8 : "GET_SINK_CAP",
9 : "DR_SWAP",
10 : "PR_SWAP",
11 : "VCONN_SWAP",
12 : "WAIT",
13 : "SOFT_RESET",
14 : "reserved",
15 : "reserved"
}
# Data message type
DATA_TYPES = {
1 : "SOURCE_CAP",
2 : "REQUEST",
3 : "BIST",
4 : "SINK_CAP",
15 : "VDM"
}
DEC4B5B = [
0x10, # Error /* 00000 */,
0x10, # Error /* 00001 */,
0x10, # Error /* 00010 */,
0x10, # Error /* 00011 */,
0x10, # Error /* 00100 */,
0x10, # Error /* 00101 */,
0x13, # Sync-3 /* 00110 */,
0x14, # RST-1 /* 00111 K-code: Hard Reset #1 */,
0x10, # Error /* 01000 */,
0x01, # 1 = 0001 /* 01001 */,
0x04, # 4 = 0100 /* 01010 */,
0x05, # 5 = 0101 /* 01011 */,
0x10, # Error /* 01100 */,
0x16, # EOP /* 01101 K-code: EOP End Of Packet */,
0x06, # 6 = 0110 /* 01110 */,
0x07, # 7 = 0111 /* 01111 */,
0x10, # Error /* 10000 */,
0x12, # Sync-2 /* 10001 K-code: Startsynch #2 */,
0x08, # 8 = 1000 /* 10010 */,
0x09, # 9 = 1001 /* 10011 */,
0x02, # 2 = 0010 /* 10100 */,
0x03, # 3 = 0011 /* 10101 */,
0x0A, # A = 1010 /* 10110 */,
0x0B, # B = 1011 /* 10111 */,
0x11, # Sync-1 /* 11000 K-code: Startsynch #1 */,
0x15, # RST-2 /* 11001 K-code: Hard Reset #2 */,
0x0C, # C = 1100 /* 11010 */,
0x0D, # D = 1101 /* 11011 */,
0x0E, # E = 1110 /* 11100 */,
0x0F, # F = 1111 /* 11101 */,
0x00, # 0 = 0000 /* 11110 */,
0x10, # Error /* 11111 */,
]
SYM_ERR=0x10
SYNC1=0x11
SYNC2=0x12
SYNC3=0x13
RST1=0x14
RST2=0x15
EOP=0x16
SYNC_CODES=[SYNC1, SYNC2, SYNC3]
HRST_CODES=[RST1, RST1, RST1, RST2]
# different SOPs: SOP, SOP', SOP", SOP'_Debug, SOP"_Debug
SOP = [SYNC1, SYNC1, SYNC1, SYNC2, "SOP" ]
SOPP = [SYNC1, SYNC1, SYNC3, SYNC3, "SOP'"]
SOPPP = [SYNC1, SYNC3, SYNC1, SYNC3, 'SOP"']
SOPPDebug = [SYNC1, RST2, RST2, SYNC3, "SOP'_Debug"]
SOPPPDebug = [SYNC1, RST2, SYNC3, SYNC2, 'SOP"_Debug']
START_OF_PACKETS = [SOP, SOPP, SOPPP, SOPPDebug, SOPPPDebug]
CableReset = [ RST1, SYNC1, RST1, SYNC3, "Cable Reset"]
HardReset = [ RST1, RST1, RST1, RST2, "Hard Reset" ]
SYM_NAME = [
['0x0', '0'],
['0x1', '1'],
['0x2', '2'],
['0x3', '3'],
['0x4', '4'],
['0x5', '5'],
['0x6', '6'],
['0x7', '7'],
['0x8', '8'],
['0x9', '9'],
['0xA', 'A'],
['0xB', 'B'],
['0xC', 'C'],
['0xD', 'D'],
['0xE', 'E'],
['0xF', 'F'],
['ERROR', 'X'],
['SYNC-1', 'S1'],
['SYNC-2', 'S2'],
['SYNC-3', 'S3'],
['RST-1', 'R1'],
['RST-2', 'R2'],
['EOP', '#'],
]
RDO_FLAGS = {
(1 << 24) : "no_suspend",
(1 << 25) : "comm_cap",
(1 << 26) : "cap_mismatch",
(1 << 27) : "give_back"
}
PDO_TYPE=["", "BATT:", "VAR:", "<bad>"]
PDO_FLAGS = {
(1 << 29) : "dual_role_power",
(1 << 28) : "suspend",
(1 << 27) : "ext",
(1 << 26) : "comm_cap",
(1 << 25) : "dual_role_data"
}
BIST_MODES = {
0 : "Receiver",
1 : "Transmit",
2 : "Counters",
3 : "Carrier 0",
4 : "Carrier 1",
5 : "Carrier 2",
6 : "Carrier 3",
7 : "Eye",
}
VDM_CMDS = {
1 : "Disc Ident",
2 : "Disc SVID",
3 : "Disc Mode",
4 : "Enter Mode",
5 : "Exit Mode",
6 : "Attention",
# 16..31 : SVID Specific Commands
# DisplayPort Commands
16 : "DP Status",
17 : "DP Configure",
}
VDM_ACK = ["REQ", "ACK", "NAK", "BSY"]
class Decoder(srd.Decoder):
api_version = 2
id = 'usb_pd_packet'
name = 'usb_pd_packet'
longname = 'USB Power Delivery protocol'
desc = 'USB Power Delivery protocol'
license = 'gplv2+'
inputs = ['pd_packet']
outputs = ['usb_pd']
options = (
{'id':'address_format', 'desc':'Displayed slave address format', 'default':'shifted'},
)
annotations = (
('type', 'Packet Type'),
('Preamble', 'Preamble'),
('SOP', 'Start of Packet'),
('Head', 'Header'),
('Data', 'Data'),
('CRC', 'Checksum'),
('EOP', 'End Of Packet'),
('Sym', '4b5b symbols'),
('warnings', 'Warnings'),
('src', 'Source Message'),
('snk', 'Sink Message'),
('payload', 'Payload'),
('text', 'Plain text'),
)
annotation_rows = (
('4B5B', 'symbols', (7,)),
('Phase', 'parts', (1,2,3,4,5,6,)),
('payload', 'Payload', (11,)),
('type', 'Type', (0,9,10,)),
('warnings', 'Warnings', (8,)),
('text', 'Full text', (12,)),
)
def get_request(self, rdo):
pos = (rdo >> 28) & 7
op_ma = ((rdo >> 10) & 0x3ff) * 10
max_ma = (rdo & 0x3ff) * 10
flags = ""
for f in RDO_FLAGS.keys():
if rdo & f:
flags+=" "+RDO_FLAGS[f]
return "[%d]%d/%d mA%s" % (pos,op_ma,max_ma,flags)
def get_source_cap(self, pdo):
t = (pdo >> 30) & 3
if t == 0:
mv = ((pdo >> 10) & 0x3ff) * 50
ma = ((pdo >> 0) & 0x3ff) * 10
p = "%.1fV %.1fA" % (mv/1000.0,ma/1000.0)
elif t == 1:
minv = ((pdo >> 10) & 0x3ff) * 50
maxv = ((pdo >> 20) & 0x3ff) * 50
mw = ((pdo >> 0) & 0x3ff) * 250
p = "%.1f/%.1fV %.1fW" % (minv/1000.0,maxv/1000.0,mw/1000.0)
elif t == 2:
minv = ((pdo >> 10) & 0x3ff) * 50
maxv = ((pdo >> 20) & 0x3ff) * 50
ma = ((pdo >> 0) & 0x3ff) * 10
p = "%.1f/%.1fV %.1fA" % (minv/1000.0,maxv/1000.0,ma/1000.0)
else:
p = ""
flags = ""
for f in PDO_FLAGS.keys():
if pdo & f:
flags+=" "+PDO_FLAGS[f]
return "%s%s%s" % (PDO_TYPE[t],p,flags)
def get_sink_cap(self, pdo):
t = (pdo >> 30) & 3
if t == 0:
mv = ((pdo >> 10) & 0x3ff) * 50
ma = ((pdo >> 0) & 0x3ff) * 10
p = "%.1fV %.1fA" % (mv/1000.0,ma/1000.0)
elif t == 1:
minv = ((pdo >> 10) & 0x3ff) * 50
maxv = ((pdo >> 20) & 0x3ff) * 50
mw = ((pdo >> 0) & 0x3ff) * 250
p = "%.1f/%.1fV %.1fW" % (minv/1000.0,maxv/1000.0,mw/1000.0)
elif t == 2:
minv = ((pdo >> 10) & 0x3ff) * 50
maxv = ((pdo >> 20) & 0x3ff) * 50
ma = ((pdo >> 0) & 0x3ff) * 10
p = "%.1f/%.1fV %.1fA" % (minv/1000.0,maxv/1000.0,ma/1000.0)
else:
p = ""
flags = ""
for f in PDO_FLAGS.keys():
if pdo & f:
flags+=" "+PDO_FLAGS[f]
return "%s%s%s" % (PDO_TYPE[t],p,flags)
def get_vdm(self, idx, data):
if idx == 0: #VDM header
vid = data >> 16
struct = data & (1 << 15)
txt = "VDM"
if struct: #Structured VDM
cmd = data & 0x1f
src = data & (1 << 5)
ack = (data >> 6) & 3
pos = (data >> 8) & 7
ver = (data >> 13) & 3
txt="%s:" % (VDM_ACK[ack])
txt+="%s" %(VDM_CMDS[cmd]) if cmd in VDM_CMDS else "cmd? "
txt+=" pos %d" % (pos) if pos else ""
else: #Unstructured VDM
txt="unstruct [%04x]" % (data & 0x7fff)
txt+=" SVID:%04x" % (vid)
else: #VDM payload
txt = "VDO:%08x" % (data)
return txt
def get_bist(self, idx, data):
mode = data >> 28
counter = data & 0xffff
mode_name =BIST_MODES[mode] if mode in BIST_MODES else "INVALID"
if mode == 2:
mode_name = "Counter[= %d]" % (counter)
# TODO check all 0 bits are 0 / emit warnings
return "mode %s" % (mode_name) if idx == 0 else "invalid BRO"
def putpayload(self, s0, s1, idx):
t = self.head_type()
txt = "???"
if t == 2:
txt = self.get_request(self.data[idx])
elif t == 1:
txt = self.get_source_cap(self.data[idx])
elif t == 4:
txt = self.get_sink_cap(self.data[idx])
elif t == 15:
txt = self.get_vdm(idx, self.data[idx])
elif t == 3:
txt = self.get_bist(idx, self.data[idx])
self.putx(s0, s1, [11, [txt, txt]])
self.text += " - " + txt
def puthead(self, with_payload = False):
ann_type = 9 if self.head_power_role() else 10
role = "SRC" if self.head_power_role() else "SNK"
if self.head_data_role() != self.head_power_role():
role += "/DFP" if self.head_data_role() else "/UFP"
t = self.head_type()
if self.head_count() == 0:
shortm = CTRL_TYPES[t]
else:
shortm = DATA_TYPES[t] if t in DATA_TYPES else "DAT???"
longm = "{:s}[{:d}]:{:s}".format(role, self.head_id(), shortm)
self.putx(0, -1, [ann_type, [longm, shortm]])
self.text += longm
def head_id(self):
return (self.head >> 9) & 7
def head_power_role(self):
return (self.head >> 8) & 1
def head_data_role(self):
return (self.head >> 5) & 1
def head_rev(self):
return ((self.head >> 6) & 3) + 1
def head_type(self):
return self.head & 0xF
def head_count(self):
return (self.head >> 12) & 7
def putx(self, s0, s1, data):
self.put(self.edges[s0], self.edges[s1], self.out_ann, data)
def putwarn(self, longm, shortm):
self.putx(0, -1, [8, [longm, shortm]])
# def usb_crc32(self, s):
# return (crc32(s, 0) & 0xFFFFFFFF) ^ 0xFFFFFFFF
def rec_sym(self, i, sym):
self.putx(i, i+5, [7, SYM_NAME[sym]])
def get_sym(self, i, rec=True):
v = self.bits[i] | (self.bits[i+1] << 1) | (self.bits[i+2] << 2) | (self.bits[i+3]<<3) | (self.bits[i+4] << 4)
sym = DEC4B5B[v]
if rec:
self.rec_sym(i, sym)
return sym
def get_short(self):
i = self.idx
k = [self.get_sym(i), self.get_sym(i+5), self.get_sym(i+10), self.get_sym(i+15)]
# TODO check bad symbols
val = k[0] | (k[1] << 4) | (k[2] << 8) | (k[3] << 12)
self.idx += 20
return val
def get_word(self):
lo = self.get_short()
hi = self.get_short()
return lo | (hi << 16)
def is_hard_reset(self, sym_array):
first = 1 if sym_array[0]==HardReset[0] else 0
second = 1 if sym_array[1]==HardReset[1] else 0
third = 1 if sym_array[2]==HardReset[2] else 0
fourth = 1 if sym_array[3]==HardReset[3] else 0
return first + second + third + fourth >= 3
def is_cable_reset(self, sym_array):
first = 1 if sym_array[0]==CableReset[0] else 0
second = 1 if sym_array[1]==CableReset[1] else 0
third = 1 if sym_array[2]==CableReset[2] else 0
fourth = 1 if sym_array[3]==CableReset[3] else 0
return first + second + third + fourth >= 3
def get_start_of_packet(self, sym_array):
for i in range(5):
first = 1 if sym_array[0]==START_OF_PACKETS[i][0] else 0
second = 1 if sym_array[1]==START_OF_PACKETS[i][1] else 0
third = 1 if sym_array[2]==START_OF_PACKETS[i][2] else 0
fourth = 1 if sym_array[3]==START_OF_PACKETS[i][3] else 0
if first + second + third + fourth >= 3:
return START_OF_PACKETS[i][4]
else:
return None
def scan_eop(self):
for i in range(len(self.bits) - 19):
k = [self.get_sym(i, rec=False), self.get_sym(i+5, rec=False),
self.get_sym(i+10, rec=False), self.get_sym(i+15, rec=False)]
hard_reset = self.is_hard_reset(k)
cable_reset = self.is_cable_reset(k)
start_of_packet = self.get_start_of_packet(k)
# We have an interesting symbol sequence
if hard_reset or cable_reset or start_of_packet!=None:
# annotate the preamble
self.putx(0, i, [1, ['Preamble', '...']])
# annotate each symbol
self.rec_sym(i, k[0])
self.rec_sym(i+5, k[1])
self.rec_sym(i+10, k[2])
self.rec_sym(i+15, k[3])
if hard_reset:
self.text += "HRST"
return HARD_RESET
elif cable_reset:
self.putx(i, i+20, [2, [CableReset[4], 'CRST']])
else:
self.putx(i, i+20, [2, [start_of_packet, 'S']])
return i+20
self.putx(0, len(self.bits), [1, ['Junk???', 'XXX']])
self.text += "Junk???"
self.putwarn("No start of packet found","XXX")
return NO_EOP
def __init__(self, **kwargs):
self.samplerate = None
self.idx = 0
self.packet_seq = 0
def metadata(self, key, value):
if key == srd.SRD_CONF_SAMPLERATE:
self.samplerate = value
print(value)
def start(self):
self.out_python = self.register(srd.OUTPUT_PYTHON)
self.out_ann = self.register(srd.OUTPUT_ANN)
self.out_binary = self.register(srd.OUTPUT_BINARY)
self.out_bitrate = self.register(srd.OUTPUT_META,
meta=(int, 'Bitrate', 'Bitrate from Start bit to Stop bit'))
def decode(self, ss, es, data):
try:
self.decode_(ss, es, data)
except IndexError:
print(traceback.format_exc())
def decode_(self, ss, es, data):
self.tstamp = data['TIMESTAMP']
self.bits = data['BITS']
self.edges = data['EDGES']
self.data = []
self.idx = 0
self.samplenum = 0
if len(self.edges) < 50:
return # Not a real PD packet
self.packet_seq += 1
self.text = "#%d (%8.6fms): " % (self.packet_seq, self.tstamp*1000)
self.idx = self.scan_eop()
if self.idx < 0:
# Full text trace of the issue
self.putx(0, self.idx, [12, [self.text, 'TXT']])
return # No real packet : ABORT
# Packet header
self.head = self.get_short()
self.putx(self.idx-20, self.idx, [3, ['H:%04x' % (self.head), 'HD']])
self.puthead()
# Decode data payload
for i in range(self.head_count()):
self.data.append(self.get_word())
self.putx(self.idx-40, self.idx, [4, ['[%d]%08x' % (i, self.data[i]), 'D%d' % (i)]])
self.putpayload(self.idx-40, self.idx, i)
# CRC check
self.crc = self.get_word()
# TODO check CRC
#self.putwarn("Bad CRC %08x != %08x" % (self.crc, TODO) ,"CRC!")
self.putx(self.idx-40, self.idx, [5, ['CRC:%08x' % (self.crc), 'CRC']])
# End of Packet
self.eop = self.get_sym(self.idx)
self.idx += 5
if self.eop == EOP:
self.putx(self.idx-5, self.idx, [6, ['EOP', 'E']])
else:
self.putwarn("No EOP","EOP!")
# Full text trace
self.putx(0, self.idx, [12, [self.text, 'TXT']])