blob: f0f6e23677481b11ab120c911c65ab0e26e275c8 [file] [log] [blame]
##
## This file is part of the libsigrokdecode project.
##
## Copyright (C) 2014 Google, Inc
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
import sigrokdecode as srd
# 600Khz datarate
UI_US = 1000000/600000.0
# Threshold to discriminate half-1 from 0
THRESHOLD_US = (UI_US + 2 * UI_US) / 2
class Decoder(srd.Decoder):
api_version = 2
id = 'usb_pd_bmc'
name = 'pd_bmc'
longname = 'Biphase Mark Decoder for USB Power Delivery baseband protocol'
desc = 'Power Delivery protocol over CC wire on USB type-C'
license = 'gplv2+'
inputs = ['logic']
outputs = ['pd_packet']
channels = (
{'id': 'cc', 'name': 'CC', 'desc': 'Control channel'},
)
options = (
)
annotations = (
('packet', 'USB-PD packet'),
('bits', 'decoded BMC'),
('warnings', 'Human-readable warning'),
)
annotation_rows = (
('bits', 'Bits', (1,)),
('title', 'packet', (0,)),
('warnings', 'Warnings', (2,)),
)
def us2samples(self, us):
if self.samplerate is None:
raise Exception("Need the samplerate.")
return int(us * self.samplerate / 1000000)
def __init__(self, **kwargs):
self.samplerate = None
self.samplenum = 0
self.seq = 0
self.previous = 0
self.oldpins = [0]
self.startsample = None
self.packet = []
self.edges = []
self.half_one = False
self.start_one = 0
self.tstamp = 0
def metadata(self, key, value):
if key == srd.SRD_CONF_SAMPLERATE:
self.samplerate = value
# 0 is 2 UI, space larger than 1.5x 0 is definitely wrong
self.maxbit = self.us2samples(3 * UI_US)
# duration threshold between half 1 and 0
self.threshold = self.us2samples(THRESHOLD_US)
def start(self):
self.out_python = self.register(srd.OUTPUT_PYTHON)
self.out_ann = self.register(srd.OUTPUT_ANN)
self.out_binary = self.register(srd.OUTPUT_BINARY)
self.out_bitrate = self.register(srd.OUTPUT_META,
meta=(int, 'Bitrate', 'Bitrate during the packet'))
def putbitx(self, start, data):
self.put(start, self.samplenum, self.out_ann, data)
def putx(self, data):
self.put(self.startsample, self.previous, self.out_ann, data)
def putp(self, data):
self.put(self.startsample, self.previous, self.out_python, data)
def putb(self, data):
self.put(self.startsample, self.previous, self.out_binary, data)
def decode(self, ss, es, data):
if self.samplerate is None:
raise Exception("Cannot decode without samplerate.")
for (self.samplenum, pins) in data:
# find edges ...
if self.oldpins == pins:
continue
self.oldpins, (cc,) = pins, pins
# First sample of the packet, just record the start date
if not self.startsample:
self.startsample = self.samplenum
self.previous = self.samplenum
continue
diff = self.samplenum - self.previous
# Large idle : use it as the end of packet
if diff > self.maxbit:
tstamp = float(self.startsample) / self.samplerate
#brate float(self.previous - self.start) / len(self.packet) / self.samplerate if len(self.packet) else 0
# the last edge of the packet
self.edges.append(self.previous)
# Export the packet
self.putx([0, ['BMC %d' % (self.seq),'PD']])
self.putp({'TIMESTAMP':tstamp, 'BITS': self.packet, 'EDGES':self.edges})
#self.putb((1, bytes(self.packet))
# Reset for next packet
self.startsample = self.samplenum
self.packet = []
self.edges = []
self.half_one = False
self.start_one = 0
self.seq += 1
else: # add the bit to the packet
is_zero = diff > self.threshold
if is_zero and not self.half_one:
self.packet.append(0)
self.putbitx(self.previous,[1,['0','0']])
self.edges.append(self.previous)
elif not is_zero and self.half_one:
self.packet.append(1)
self.putbitx(self.start_one,[1,['1','1']])
self.edges.append(self.start_one)
self.half_one = False
elif not is_zero and not self.half_one:
self.half_one = True
self.start_one = self.previous
else: # Invalid BMC sequence
self.putbitx(self.start_one,[2,['invalid duration %2.2fus' % (float(diff)*1000000/self.samplerate),'%d' % (diff)]])
self.putbitx(self.start_one,[1,['X','X']])
#TODO try to recover
self.packet.append(0)
self.edges.append(self.previous)
self.half_one = False
self.previous = self.samplenum