datastore: enable to set query batch size (#90)

* datastore: enable to set query batch size

This adds q.BatchSize() method to avoid the error that claims too many
datastore.next() calls due to the lack of specifying the `count`
property (= batch size).

Fixes #88

* Disallow negative values for BatchSize

* Deal with negative value for t.limit

and clean up to be more readable

* Add comment about possible values for t.count

* Use the same `if` block as in `Run()` for `count`

* Make BatchSize restrict to be greater than zero
diff --git a/datastore/query.go b/datastore/query.go
index cf7bf56..c1ea4ad 100644
--- a/datastore/query.go
+++ b/datastore/query.go
@@ -87,6 +87,7 @@
 	eventual bool
 	limit    int32
 	offset   int32
+	count    int32
 	start    *pb.CompiledCursor
 	end      *pb.CompiledCursor
 
@@ -241,6 +242,19 @@
 	return q
 }
 
+// BatchSize returns a derivative query to fetch the supplied number of results
+// at once. This value should be greater than zero, and equal to or less than
+// the Limit.
+func (q *Query) BatchSize(size int) *Query {
+	q = q.clone()
+	if size <= 0 || size > math.MaxInt32 {
+		q.err = errors.New("datastore: query batch size overflow")
+		return q
+	}
+	q.count = int32(size)
+	return q
+}
+
 // Start returns a derivative query with the given start point.
 func (q *Query) Start(c Cursor) *Query {
 	q = q.clone()
@@ -325,6 +339,9 @@
 	if q.offset != 0 {
 		dst.Offset = proto.Int32(q.offset)
 	}
+	if q.count != 0 {
+		dst.Count = proto.Int32(q.count)
+	}
 	dst.CompiledCursor = q.start
 	dst.EndCompiledCursor = q.end
 	dst.Compile = proto.Bool(true)
@@ -394,7 +411,7 @@
 		if !res.GetMoreResults() {
 			break
 		}
-		if err := callNext(c, res, newQ.offset-n, 0); err != nil {
+		if err := callNext(c, res, newQ.offset-n, q.count); err != nil {
 			return 0, err
 		}
 	}
@@ -409,15 +426,15 @@
 
 // callNext issues a datastore_v3/Next RPC to advance a cursor, such as that
 // returned by a query with more results.
-func callNext(c context.Context, res *pb.QueryResult, offset, limit int32) error {
+func callNext(c context.Context, res *pb.QueryResult, offset, count int32) error {
 	if res.Cursor == nil {
 		return errors.New("datastore: internal error: server did not return a cursor")
 	}
 	req := &pb.NextRequest{
 		Cursor: res.Cursor,
 	}
-	if limit >= 0 {
-		req.Count = proto.Int32(limit)
+	if count >= 0 {
+		req.Count = proto.Int32(count)
 	}
 	if offset != 0 {
 		req.Offset = proto.Int32(offset)
@@ -523,6 +540,7 @@
 	t := &Iterator{
 		c:      c,
 		limit:  q.limit,
+		count:  q.count,
 		q:      q,
 		prevCC: q.start,
 	}
@@ -536,9 +554,15 @@
 		return t
 	}
 	offset := q.offset - t.res.GetSkippedResults()
+	var count int32
+	if t.count > 0 && (t.limit < 0 || t.count < t.limit) {
+		count = t.count
+	} else {
+		count = t.limit
+	}
 	for offset > 0 && t.res.GetMoreResults() {
 		t.prevCC = t.res.CompiledCursor
-		if err := callNext(t.c, &t.res, offset, t.limit); err != nil {
+		if err := callNext(t.c, &t.res, offset, count); err != nil {
 			t.err = err
 			break
 		}
@@ -566,6 +590,9 @@
 	// limit is the limit on the number of results this iterator should return.
 	// A negative value means unlimited.
 	limit int32
+	// count is the number of results this iterator should fetch at once. This
+	// should be equal to or greater than zero.
+	count int32
 	// q is the original query which yielded this iterator.
 	q *Query
 	// prevCC is the compiled cursor that marks the end of the previous batch
@@ -605,7 +632,13 @@
 			return nil, nil, t.err
 		}
 		t.prevCC = t.res.CompiledCursor
-		if err := callNext(t.c, &t.res, 0, t.limit); err != nil {
+		var count int32
+		if t.count > 0 && (t.limit < 0 || t.count < t.limit) {
+			count = t.count
+		} else {
+			count = t.limit
+		}
+		if err := callNext(t.c, &t.res, 0, count); err != nil {
 			t.err = err
 			return nil, nil, t.err
 		}
diff --git a/datastore/query_test.go b/datastore/query_test.go
index f1b9de8..45e5313 100644
--- a/datastore/query_test.go
+++ b/datastore/query_test.go
@@ -464,7 +464,7 @@
 		},
 		{
 			desc:  "standard query",
-			query: NewQuery("kind").Order("-I").Filter("I >", 17).Filter("U =", "Dave").Limit(7).Offset(42),
+			query: NewQuery("kind").Order("-I").Filter("I >", 17).Filter("U =", "Dave").Limit(7).Offset(42).BatchSize(5),
 			want: &pb.Query{
 				Kind: proto.String("kind"),
 				Filter: []*pb.Query_Filter{
@@ -497,6 +497,7 @@
 				},
 				Limit:  proto.Int32(7),
 				Offset: proto.Int32(42),
+				Count:  proto.Int32(5),
 			},
 		},
 		{