ci_results_archiver: Add afe_special_tasks query to afe_connection

Add queries and methods for afe_special_tasks, host_labels and
host_attributes. Update fake_afe_connection for testing.

BUG=None
TEST=Ran bin/run_lint, run_tests and run_yapf

Change-Id: I06a4e7dfadfa7f3813d85e6f5dc68071fa8d2464
Reviewed-on: https://chromium-review.googlesource.com/1398961
Commit-Ready: Abe Smith <abesmith@chromium.org>
Tested-by: Abe Smith <abesmith@chromium.org>
Reviewed-by: Garry Wang <xianuowang@chromium.org>
diff --git a/ci_results_archiver/utils/afe_connection.py b/ci_results_archiver/utils/afe_connection.py
index e6753b4..d9faec7 100644
--- a/ci_results_archiver/utils/afe_connection.py
+++ b/ci_results_archiver/utils/afe_connection.py
@@ -71,6 +71,51 @@
 ;
 """
 
+# MySQL query used to retrieve AFE special tasks.
+_AFE_SPECIAL_TASKS_QUERY = """
+SELECT
+  t.id AS afe_special_task_id,
+  t.host_id,
+  h.hostname,
+  h.status,
+  t.task,
+  t.is_active,
+  t.is_complete,
+  t.time_started,
+  t.time_finished,
+  t.success
+FROM afe_special_tasks AS t
+  INNER JOIN afe_hosts AS h ON h.id = t.host_id
+WHERE t.id >= %(afe_special_task_id_start)s
+  AND h.invalid = 0
+ORDER BY t.id ASC
+LIMIT %(limit)s
+;
+"""
+
+# MySQL query used to retrieve AFE host labels.
+_AFE_HOSTS_LABELS_QUERY = """
+SELECT
+  hosts_labels.host_id,
+  label.name
+FROM afe_hosts_labels AS hosts_labels
+  INNER JOIN afe_labels AS label ON label.id = hosts_labels.label_id
+WHERE hosts_labels.host_id IN %(afe_host_ids)s
+;
+"""
+
+# MySQL query used to retrieve AFE host attributes.
+_AFE_HOST_ATTRIBUTES_QUERY = """
+SELECT
+  host_id,
+  attribute,
+  value
+FROM afe_host_attributes
+WHERE host_id IN %(afe_host_ids)s
+  AND attribute IN ('HWID', 'serial_number', 'servo_host', 'servo_port')
+;
+"""
+
 
 class AfeConnection(object):
   """Connection to AFE DB.
@@ -139,3 +184,63 @@
         query=_AFE_JOB_DEPENDENCY_LABELS_QUERY,
         params={'afe_job_ids': afe_job_ids},
         description='Dumping AFE job dependency labels')
+
+  def QuerySpecialTasks(self, afe_special_task_id_start, limit):
+    """Retrieves AFE special tasks from the AFE database.
+
+    Rows are returned by the ascending order of the ID.
+
+    Args:
+      afe_special_task_id_start: Minimum AFE special task ID.
+      limit: Maximum number of rows returned.
+
+    Returns:
+      A list of dictionaries representing AFE special task rows.
+
+    Raises:
+      MySQLdb.Error: On MySQL errors.
+    """
+    return self._mysql_wrapper.RunQuery(
+        query=_AFE_SPECIAL_TASKS_QUERY,
+        params={
+            'afe_special_task_id_start': afe_special_task_id_start,
+            'limit': limit
+        },
+        description='Dumping AFE special tasks')
+
+  def QueryHostAttributes(self, afe_host_ids):
+    """Retrieves AFE host attribute rows from the AFE database.
+
+    Only the following attributes are returned for a host: HWID, serial_number,
+    servo_host, servo_port.
+
+    Args:
+      afe_host_ids: A list AFE host ids to retrieve host attributes of.
+
+    Returns:
+      A list of dictionaries representing AFE host attribute rows.
+
+    Raises:
+      MySQLdb.Error: On MySQL errors.
+    """
+    return self._mysql_wrapper.RunQuery(
+        query=_AFE_HOST_ATTRIBUTES_QUERY,
+        params={'afe_host_ids': afe_host_ids},
+        description='Dumping AFE host attributes')
+
+  def QueryHostLabels(self, afe_host_ids):
+    """Retrieves AFE host label rows from the AFE database.
+
+    Args:
+      afe_host_ids: A list of AFE host ids to retrieve host labels of.
+
+    Returns:
+      A list of dictionaries representing AFE host label rows.
+
+    Raises:
+      MySQLdb.Error: On MySQL errors.
+    """
+    return self._mysql_wrapper.RunQuery(
+        query=_AFE_HOSTS_LABELS_QUERY,
+        params={'afe_host_ids': afe_host_ids},
+        description='Dumping AFE host labels')
diff --git a/ci_results_archiver/utils/test/fake_afe_connection.py b/ci_results_archiver/utils/test/fake_afe_connection.py
index 7abc01d..48f9077 100644
--- a/ci_results_archiver/utils/test/fake_afe_connection.py
+++ b/ci_results_archiver/utils/test/fake_afe_connection.py
@@ -15,13 +15,25 @@
     jobs: List of job dictionaries.
     job_keyvals: List of job keyval dictionaries.
     job_dependency_labels: List of job dependency label dictionaries.
+    special_tasks: List of special task dictionaries.
+    host_attributes: List of host attribute dictionaries.
+    host_labels: List of host label dictionaries.
   """
 
-  def __init__(self, jobs=None, job_keyvals=None, job_dependency_labels=None):
+  def __init__(self,
+               jobs=None,
+               job_keyvals=None,
+               job_dependency_labels=None,
+               special_tasks=None,
+               host_attributes=None,
+               host_labels=None):
     """Constructor."""
     self.jobs = jobs or []
     self.job_keyvals = job_keyvals or []
     self.job_dependency_labels = job_dependency_labels or []
+    self.special_tasks = special_tasks or []
+    self.host_attributes = host_attributes or []
+    self.host_labels = host_labels or []
 
   def QueryJobs(self, afe_job_id_start, limit):
     """Retrieves AFE job rows from the local database."""
@@ -45,3 +57,29 @@
         if job_dependency_label['afe_job_id'] in afe_job_ids
     ]
     return job_dependency_labels
+
+  def QuerySpecialTasks(self, afe_special_task_id_start, limit):
+    """Retrieve AFE special agent task rows from the local database."""
+    special_tasks = [
+        special_task for special_task in self.special_tasks
+        if special_task['afe_special_task_id'] >= afe_special_task_id_start
+    ]
+    special_tasks.sort(
+        key=lambda special_task: special_task['afe_special_task_id'])
+    return special_tasks[:limit]
+
+  def QueryHostAttributes(self, afe_host_ids):
+    """Retrieve AFE host attribute rows from the local database."""
+    host_attributes = [
+        host_attribute for host_attribute in self.host_attributes
+        if host_attribute['host_id'] in afe_host_ids
+    ]
+    return host_attributes
+
+  def QueryHostLabels(self, afe_host_ids):
+    """Retrieve AFE host label rows from the local database."""
+    host_labels = [
+        host_label for host_label in self.host_labels
+        if host_label['host_id'] in afe_host_ids
+    ]
+    return host_labels