blob: fad2144361bae7331b2e4544daf7e5d64cf9df0e [file] [log] [blame]
#!/usr/bin/env python2
#
# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Output pull socket plugin.
Transmits events to an input pull socket plugin running on another Instalog
node.
See socket_common.py for protocol definition.
"""
from __future__ import print_function
from six.moves import xrange
import socket
import instalog_common # pylint: disable=unused-import
from instalog import plugin_base
from instalog.plugins import output_socket
from instalog.plugins import socket_common
from instalog.utils.arg_utils import Arg
_DEFAULT_BATCH_SIZE = 500
_DEFAULT_TIMEOUT = 5
_DEFAULT_HOSTNAME = '0.0.0.0'
_ACCEPT_TIMEOUT = 1
_ACCEPT_LOG_INTERVAL = 60 # interval = _ACCEPT_TIMEOUT * _ACCEPT_LOG_INTERVAL =
# 60s
# TODO(chuntsen): Encryption and authentication
class OutputPullSocket(plugin_base.OutputPlugin):
ARGS = [
Arg('batch_size', int, 'How many events to queue before transmitting.',
default=_DEFAULT_BATCH_SIZE),
Arg('timeout', (int, float), 'Timeout to transmit without full batch.',
default=_DEFAULT_TIMEOUT),
Arg('hostname', (str, unicode), 'Hostname that server should bind to.',
default=_DEFAULT_HOSTNAME),
Arg('port', int, 'Port that server should bind to.',
default=socket_common.DEFAULT_PULL_PORT)
]
def __init__(self, *args, **kwargs):
self._sock = None
self._accept_sock = None
super(OutputPullSocket, self).__init__(*args, **kwargs)
def GetSocket(self):
"""Accepts a socket from input pull socket."""
self._accept_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._accept_sock.settimeout(_ACCEPT_TIMEOUT)
self._accept_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.debug('Socket created')
# Bind socket.
try:
self._accept_sock.bind((self.args.hostname, self.args.port))
except socket.error as msg:
self.exception('Bind failed. Error %d: %s' % (msg[0], msg[1]))
raise
self.debug('Socket bind complete')
try:
# Queue up to 1 requests.
self._accept_sock.listen(1)
self.debug('Socket now listening on %s:%d...',
self.args.hostname, self.args.port)
self._sock, addr = self._accept_sock.accept()
self._sock.settimeout(socket_common.SOCKET_TIMEOUT)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF,
socket_common.SOCKET_BUFFER_SIZE)
self.info('Connected with %s:%d' % (addr[0], addr[1]))
self._accept_sock.shutdown(socket.SHUT_RDWR)
self._accept_sock.close()
# Receive qing.
# We use recvfrom because we need to control unittest mock.
received_char, _unused_addr = self._sock.recvfrom(1)
self.debug('Received a char: %s', received_char)
if not received_char == socket_common.QING:
self.info('Invalid qing: %s', received_char)
self._sock.shutdown(socket.SHUT_RDWR)
self._sock.close()
return False
# Send qong.
self._sock.sendall(socket_common.QING_RESPONSE)
return True
except Exception:
return False
def Main(self):
"""Main Thread of the plugin."""
while not self.IsStopping():
# Since we need to know the number of events being sent before beginning
# the transmission, cache events in memory before making the connection.
events = []
event_stream = self.NewStream()
if not event_stream:
self.Sleep(1)
continue
for event in event_stream.iter(timeout=self.args.timeout,
count=self.args.batch_size):
events.append(event)
# If no events are available, don't bother sending an empty transmission.
if not events:
self.debug('No events available for transmission')
event_stream.Commit()
continue
# Accepts a connection only when we have some events. Or input pull socket
# will connect, wait event number and time out.
success = False
while not success:
for _unused_i in xrange(_ACCEPT_LOG_INTERVAL):
success = self.GetSocket()
if self.IsStopping():
event_stream.Abort()
return
if success:
break
if not success:
self.warning('No connection when listening')
sender = output_socket.OutputSocketSender(
self.logger.name, self._sock, self)
if sender.ProcessRequest(events):
event_stream.Commit()
else:
event_stream.Abort()
if __name__ == '__main__':
plugin_base.main()