blob: f8ec7d91f1da13123527382d908ea1599421c32e [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2014 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for adb."""
from __future__ import absolute_import
import io
import logging
import struct
import sys
import unittest
import common_mock
from adb import adb_commands
from adb import adb_protocol
BANNER = b'blazetest'
LOCAL_ID = 16
REMOTE_ID = 2
def _ConvertCommand(command):
return sum(c << (i * 8) for i, c in enumerate(bytearray(command)))
def _MakeHeader(command, arg0, arg1, data):
command = _ConvertCommand(command)
magic = command ^ 0xFFFFFFFF
checksum = adb_protocol._CalculateChecksum(data)
return struct.pack('<6I', command, arg0, arg1, len(data), checksum, magic)
def _MakeSyncHeader(command, *int_parts):
command = _ConvertCommand(command)
return struct.pack('<%dI' % (len(int_parts) + 1), command, *int_parts)
def _MakeWriteSyncPacket(command, data='', size=None):
return _MakeSyncHeader(command, size or len(data)) + data
class BaseAdbTest(unittest.TestCase):
def setUp(self):
super(BaseAdbTest, self).setUp()
self.usb = common_mock.MockUsb()
def tearDown(self):
try:
self.usb.Close()
finally:
super(BaseAdbTest, self).tearDown()
def _ExpectWrite(self, command, arg0, arg1, data):
self.usb.ExpectWrite(_MakeHeader(command, arg0, arg1, data))
self.usb.ExpectWrite(data)
if command == b'WRTE':
self._ExpectRead(b'OKAY', REMOTE_ID, LOCAL_ID)
def _ExpectRead(self, command, arg0, arg1, data=''):
self.usb.ExpectRead(_MakeHeader(command, arg0, arg1, data))
if data:
self.usb.ExpectRead(data)
if command == b'WRTE':
self._ExpectWrite(b'OKAY', LOCAL_ID, REMOTE_ID, '')
def _ExpectConnection(self):
self._ExpectWrite(b'CNXN', 0x01000000, 256*1024, b'host::%s\0' % BANNER)
self._ExpectRead(b'CNXN', 0x01000000, 4096, b'device::\0')
def _ExpectOpen(self, service):
self._ExpectWrite(b'OPEN', LOCAL_ID, 0, service)
self._ExpectRead(b'OKAY', REMOTE_ID, LOCAL_ID)
def _ExpectClose(self):
self._ExpectRead(b'CLSE', REMOTE_ID, LOCAL_ID)
# TODO(maruel): The new adb_protocol doesn't bother sending a CLSE back.
# self._ExpectWrite(b'CLSE', LOCAL_ID, REMOTE_ID, '')
def _Connect(self):
return adb_commands.AdbCommands.Connect(
self.usb, BANNER, rsa_keys=[], auth_timeout_ms=0)
class AdbTest(BaseAdbTest):
def _ExpectCommand(self, service, command, *responses):
self._ExpectConnection()
self._ExpectOpen(b'%s:%s\0' % (service, command))
for response in responses:
self._ExpectRead(b'WRTE', REMOTE_ID, LOCAL_ID, response)
self._ExpectClose()
def testSmallResponseShell(self):
command = 'keepin it real'
response = 'word.'
self._ExpectCommand(b'shell', command, response)
cmd = self._Connect()
self.assertEqual(response, cmd.Shell(command))
cmd.Close()
def testBigResponseShell(self):
command = 'keepin it real big'
responses = ['other stuff, ', 'and some words.'] * 50
self._ExpectCommand(b'shell', command, *responses)
cmd = self._Connect()
self.assertEqual(''.join(responses), cmd.Shell(command))
cmd.Close()
def testStreamingResponseShell(self):
command = 'keepin it real big'
# expect multiple lines
responses = ['other stuff, ', 'and some words.']
self._ExpectCommand(b'shell', command, *responses)
cmd = self._Connect()
actual = ''.join(cmd.StreamingShell(command))
self.assertEqual(''.join(responses), actual)
cmd.Close()
def testReboot(self):
self._ExpectCommand(b'reboot', b'', b'')
cmd = self._Connect()
cmd.Reboot()
cmd.Close()
def testRebootBootloader(self):
self._ExpectCommand(b'reboot', b'bootloader', b'')
cmd = self._Connect()
cmd.RebootBootloader()
cmd.Close()
def testRemount(self):
self._ExpectCommand(b'remount', b'', b'')
cmd = self._Connect()
cmd.Remount()
cmd.Close()
def testRoot(self):
self._ExpectCommand(b'root', b'', b'')
cmd = self._Connect()
cmd.Root()
cmd.Close()
class FilesyncAdbTest(BaseAdbTest):
def _ExpectClose(self):
# TODO(maruel): There's an inconsistency between sync protocol and raw adb
# protocol.
self._ExpectWrite(b'CLSE', LOCAL_ID, REMOTE_ID, '')
self._ExpectRead(b'CLSE', REMOTE_ID, LOCAL_ID)
def _ExpectSyncCommand(self, write_commands, read_commands):
self._ExpectConnection()
self._ExpectOpen(b'sync:\0')
while write_commands or read_commands:
if write_commands:
command = write_commands.pop(0)
self._ExpectWrite(b'WRTE', LOCAL_ID, REMOTE_ID, command)
if read_commands:
command = read_commands.pop(0)
self._ExpectRead(b'WRTE', REMOTE_ID, LOCAL_ID, command)
self._ExpectClose()
def testPush(self):
filedata = 'alo there, govnah'
mtime = 100
send = [
_MakeWriteSyncPacket(b'SEND', '/data,33272'),
_MakeWriteSyncPacket(b'DATA', filedata),
_MakeWriteSyncPacket(b'DONE', size=mtime),
]
data = 'OKAY\0\0\0\0'
self._ExpectSyncCommand([''.join(send)], [data])
self._Connect().Push(io.BytesIO(filedata), '/data', mtime=mtime)
def testPull(self):
filedata = "g'ddayta, govnah"
recv = _MakeWriteSyncPacket(b'RECV', '/data')
data = [
_MakeWriteSyncPacket(b'DATA', filedata),
_MakeWriteSyncPacket(b'DONE'),
]
self._ExpectSyncCommand([recv], [''.join(data)])
self.assertEqual(filedata, self._Connect().Pull('/data'))
if __name__ == '__main__':
if '-v' in sys.argv:
logging.basicConfig(level=logging.DEBUG) # pragma: no cover
adb_protocol._LOG.setLevel(logging.DEBUG) # pragma: no cover
else:
logging.basicConfig(level=logging.ERROR)
unittest.main()