import json
import logging
import os
import threading
import time
logger = logging.getLogger(__name__)
class Denylist(object):
def __init__(self, path):
self._denylist_lock = threading.RLock()
self._path = path
def Read(self):
"""Reads the denylist from the denylist file.
A dict containing bad devices.
with self._denylist_lock:
denylist = dict()
if not os.path.exists(self._path):
return denylist
with open(self._path, 'r') as f:
denylist = json.load(f)
except (IOError, ValueError) as e:
logger.warning('Unable to read denylist: %s', str(e))
if not isinstance(denylist, dict):
logger.warning('Ignoring %s: %s (a dict was expected instead)',
self._path, denylist)
denylist = dict()
return denylist
def Write(self, denylist):
"""Writes the provided denylist to the denylist file.
denylist: list of bad devices to write to the denylist file.
with self._denylist_lock:
with open(self._path, 'w') as f:
json.dump(denylist, f)
def Extend(self, devices, reason='unknown'):
"""Adds devices to denylist file.
devices: list of bad devices to be added to the denylist file.
reason: string specifying the reason for denylist (eg: 'unauthorized')
timestamp = time.time()
event_info = {
'timestamp': timestamp,
'reason': reason,
device_dicts = {device: event_info for device in devices}'Adding %s to denylist %s for reason: %s', ','.join(devices),
self._path, reason)
with self._denylist_lock:
denylist = self.Read()
def Reset(self):
"""Erases the denylist file if it exists."""'Resetting denylist %s', self._path)
with self._denylist_lock:
if os.path.exists(self._path):