Merge pull request #12504 from jtattermusch/bump_1_6_1

Bump version to 1.6.1
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
index 28c30e5..237f430 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/completion_queue.pyx.pxi
@@ -41,9 +41,8 @@
     cdef object user_tag = None
     cdef Call operation_call = None
     cdef CallDetails request_call_details = None
-    cdef Metadata request_metadata = None
+    cdef object request_metadata = None
     cdef Operations batch_operations = None
-    cdef Operation batch_operation = None
     if event.type == GRPC_QUEUE_TIMEOUT:
       return Event(
           event.type, False, None, None, None, None, False, None)
@@ -63,14 +62,8 @@
         operation_call = tag.operation_call
         request_call_details = tag.request_call_details
         if tag.request_metadata is not None:
-          request_metadata = tag.request_metadata
-          request_metadata._claim_slice_ownership()
+          request_metadata = tuple(tag.request_metadata)
         batch_operations = tag.batch_operations
-        if tag.batch_operations is not None:
-          for op in batch_operations.operations:
-            batch_operation = <Operation>op
-            if batch_operation._received_metadata is not None:
-              batch_operation._received_metadata._claim_slice_ownership()
         if tag.is_new_request:
           # Stuff in the tag not explicitly handled by us needs to live through
           # the life of the call
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
index 98d7a98..57816f1 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi
@@ -76,7 +76,7 @@
       plugin_callback (callable): Callback accepting a service URL (str/bytes)
-        and callback object (accepting a Metadata,
+        and callback object (accepting a MetadataArray,
         grpc_status_code, and a str/bytes error message). This argument
         when called should be non-blocking and eventually call the callback
         object with the appropriate status code/details and metadata (if
@@ -129,8 +129,7 @@
   def python_callback(
       Metadata metadata, grpc_status_code status,
       bytes error_details):
-    cb(user_data, metadata.c_metadata_array.metadata,
-       metadata.c_metadata_array.count, status, error_details)
+    cb(user_data, metadata.c_metadata, metadata.c_count, status, error_details)
     called_flag[0] = True
   cdef CredentialsMetadataPlugin self = <CredentialsMetadataPlugin>state
   cdef AuthMetadataContext cy_context = AuthMetadataContext()
@@ -139,8 +138,8 @@
     self.plugin_callback(cy_context, python_callback)
   except Exception as error:
     if not called_flag[0]:
-      cb(user_data, Metadata([]).c_metadata_array.metadata,
-         0, StatusCode.unknown, traceback.format_exc().encode())
+      cb(user_data, NULL, 0, StatusCode.unknown,
+         traceback.format_exc().encode())
 cdef void plugin_destroy_c_plugin_state(void *state) with gil:
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index 5950bfa..28cf114 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -59,6 +59,7 @@
   grpc_slice grpc_slice_malloc(size_t length) nogil
   grpc_slice grpc_slice_from_copied_string(const char *source) nogil
   grpc_slice grpc_slice_from_copied_buffer(const char *source, size_t len) nogil
+  grpc_slice grpc_slice_copy(grpc_slice s) nogil
   # Declare functions for function-like macros (because Cython)...
   void *grpc_slice_start_ptr "GRPC_SLICE_START_PTR" (grpc_slice s) nogil
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
index 8ace6ae..9c40ebf 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
@@ -37,7 +37,7 @@
   cdef Server shutting_down_server
   cdef Call operation_call
   cdef CallDetails request_call_details
-  cdef Metadata request_metadata
+  cdef MetadataArray request_metadata
   cdef Operations batch_operations
   cdef bint is_new_request
@@ -51,7 +51,7 @@
   # For Server.request_call
   cdef readonly bint is_new_request
   cdef readonly CallDetails request_call_details
-  cdef readonly Metadata request_metadata
+  cdef readonly object request_metadata
   # For server calls
   cdef readonly Call operation_call
@@ -92,15 +92,20 @@
 cdef class Metadata:
+  cdef grpc_metadata *c_metadata
+  cdef readonly size_t c_count
+cdef class MetadataArray:
   cdef grpc_metadata_array c_metadata_array
-  cdef void _claim_slice_ownership(self)
 cdef class Operation:
   cdef grpc_op c_op
   cdef ByteBuffer _received_message
-  cdef Metadata _received_metadata
+  cdef MetadataArray _received_metadata
   cdef grpc_status_code _received_status_code
   cdef grpc_slice _status_details
   cdef int _received_cancelled
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index 1b2ddd2..0a2a6ee 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -238,7 +238,7 @@
   def __cinit__(self, grpc_completion_type type, bint success,
                 object tag, Call operation_call,
                 CallDetails request_call_details,
-                Metadata request_metadata,
+                object request_metadata,
                 bint is_new_request,
                 Operations batch_operations):
     self.type = type
@@ -437,48 +437,79 @@
 cdef class _MetadataIterator:
   cdef size_t i
-  cdef Metadata metadata
+  cdef size_t _length
+  cdef object _metadatum_indexable
-  def __cinit__(self, Metadata metadata not None):
+  def __cinit__(self, length, metadatum_indexable):
+    self._length = length
+    self._metadatum_indexable = metadatum_indexable
     self.i = 0
-    self.metadata = metadata
   def __iter__(self):
     return self
   def __next__(self):
-    if self.i < len(self.metadata):
-      result = self.metadata[self.i]
+    if self.i < self._length:
+      result = self._metadatum_indexable[self.i]
       self.i = self.i + 1
       return result
       raise StopIteration
+# TODO( Eliminate this; just use an
+# ordinary sequence of pairs of bytestrings all the way down to the
+# grpc_call_start_batch call.
 cdef class Metadata:
+  """Metadata being passed from application to core."""
   def __cinit__(self, metadata_iterable):
+    metadata_sequence = tuple(metadata_iterable)
+    cdef size_t count = len(metadata_sequence)
     with nogil:
-      grpc_metadata_array_init(&self.c_metadata_array)
-    metadata = list(metadata_iterable)
-    for metadatum in metadata:
-      if not isinstance(metadatum, Metadatum):
-        raise TypeError("expected list of Metadatum")
-    self.c_metadata_array.count = len(metadata)
-    self.c_metadata_array.capacity = len(metadata)
-    with nogil:
-      self.c_metadata_array.metadata = <grpc_metadata *>gpr_malloc(
-          self.c_metadata_array.count*sizeof(grpc_metadata)
-      )
-    for i in range(self.c_metadata_array.count):
-      (<Metadatum>metadata[i])._copy_metadatum(&self.c_metadata_array.metadata[i])
+      self.c_metadata = <grpc_metadata *>gpr_malloc(
+          count * sizeof(grpc_metadata))
+      self.c_count = count
+    for index, metadatum in enumerate(metadata_sequence):
+      self.c_metadata[index].key = grpc_slice_copy(
+          (<Metadatum>metadatum).c_metadata.key)
+      self.c_metadata[index].value = grpc_slice_copy(
+          (<Metadatum>metadatum).c_metadata.value)
   def __dealloc__(self):
     with nogil:
-      # this frees the allocated memory for the grpc_metadata_array (although
-      # it'd be nice if that were documented somewhere...)
-      # TODO(atash): document this in the C core
+      for index in range(self.c_count):
+        grpc_slice_unref(self.c_metadata[index].key)
+        grpc_slice_unref(self.c_metadata[index].value)
+      gpr_free(self.c_metadata)
+      grpc_shutdown()
+  def __len__(self):
+    return self.c_count
+  def __getitem__(self, size_t index):
+    if index < self.c_count:
+      key = _slice_bytes(self.c_metadata[index].key)
+      value = _slice_bytes(self.c_metadata[index].value)
+      return Metadatum(key, value)
+    else:
+      raise IndexError()
+  def __iter__(self):
+    return _MetadataIterator(self.c_count, self)
+cdef class MetadataArray:
+  """Metadata being passed from core to application."""
+  def __cinit__(self):
+    with nogil:
+      grpc_init()
+      grpc_metadata_array_init(&self.c_metadata_array)
+  def __dealloc__(self):
+    with nogil:
@@ -493,21 +524,7 @@
     return Metadatum(key=key, value=value)
   def __iter__(self):
-    return _MetadataIterator(self)
-  cdef void _claim_slice_ownership(self):
-    cdef grpc_metadata_array new_c_metadata_array
-    grpc_metadata_array_init(&new_c_metadata_array)
-    new_c_metadata_array.metadata = <grpc_metadata *>gpr_malloc(
-        self.c_metadata_array.count*sizeof(grpc_metadata))
-    new_c_metadata_array.count = self.c_metadata_array.count
-    for i in range(self.c_metadata_array.count):
-      new_c_metadata_array.metadata[i].key = _copy_slice(
-          self.c_metadata_array.metadata[i].key)
-      new_c_metadata_array.metadata[i].value = _copy_slice(
-          self.c_metadata_array.metadata[i].value)
-    grpc_metadata_array_destroy(&self.c_metadata_array)
-    self.c_metadata_array = new_c_metadata_array
+    return _MetadataIterator(self.c_metadata_array.count, self)
 cdef class Operation:
@@ -547,14 +564,13 @@
     if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and
         self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT):
       raise TypeError("self must be an operation receiving metadata")
-    return self._received_metadata
-  @property
-  def received_metadata_or_none(self):
-    if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and
-        self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT):
-      return None
-    return self._received_metadata
+    # TODO( Drop the "all Cython
+    # objects must be legitimate for use from Python at any time" policy in
+    # place today, shift the policy toward "Operation objects are only usable
+    # while their calls are active", and move this making-a-copy-because-this-
+    # data-needs-to-live-much-longer-than-the-call-from-which-it-arose to the
+    # lowest Python layer.
+    return tuple(self._received_metadata)
   def received_status_code(self):
@@ -601,9 +617,8 @@
   cdef Operation op = Operation()
   op.c_op.flags = flags
- = metadata.c_metadata_array.count
- = (
-      metadata.c_metadata_array.metadata)
+ = metadata.c_count
+ = metadata.c_metadata
   op.is_valid = True
   return op
@@ -631,9 +646,8 @@
   op.c_op.flags = flags = (
-      metadata.c_metadata_array.count)
- = (
-      metadata.c_metadata_array.metadata)
+      metadata.c_count)
+ = metadata.c_metadata = code
   op._status_details = _slice_from_bytes(details)
@@ -646,7 +660,7 @@
   cdef Operation op = Operation()
   op.c_op.flags = flags
-  op._received_metadata = Metadata([])
+  op._received_metadata = MetadataArray() = (
   op.is_valid = True
@@ -669,7 +683,7 @@
   cdef Operation op = Operation()
   op.c_op.flags = flags
-  op._received_metadata = Metadata([])
+  op._received_metadata = MetadataArray() = (
       &op._received_metadata.c_metadata_array) = (
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
index dd276fd..b8db274 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/server.pyx.pxi
@@ -44,7 +44,7 @@
     cdef OperationTag operation_tag = OperationTag(tag)
     operation_tag.operation_call = Call()
     operation_tag.request_call_details = CallDetails()
-    operation_tag.request_metadata = Metadata([])
+    operation_tag.request_metadata = MetadataArray()
     operation_tag.references.extend([self, call_queue, server_queue])
     operation_tag.is_new_request = True
     operation_tag.batch_operations = Operations([])