datastore: enable to set query batch size (#90)
* datastore: enable to set query batch size
This adds q.BatchSize() method to avoid the error that claims too many
datastore.next() calls due to the lack of specifying the `count`
property (= batch size).
Fixes #88
* Disallow negative values for BatchSize
* Deal with negative value for t.limit
and clean up to be more readable
* Add comment about possible values for t.count
* Use the same `if` block as in `Run()` for `count`
* Make BatchSize restrict to be greater than zero
diff --git a/datastore/query.go b/datastore/query.go
index cf7bf56..c1ea4ad 100644
--- a/datastore/query.go
+++ b/datastore/query.go
@@ -87,6 +87,7 @@
eventual bool
limit int32
offset int32
+ count int32
start *pb.CompiledCursor
end *pb.CompiledCursor
@@ -241,6 +242,19 @@
return q
}
+// BatchSize returns a derivative query to fetch the supplied number of results
+// at once. This value should be greater than zero, and equal to or less than
+// the Limit.
+func (q *Query) BatchSize(size int) *Query {
+ q = q.clone()
+ if size <= 0 || size > math.MaxInt32 {
+ q.err = errors.New("datastore: query batch size overflow")
+ return q
+ }
+ q.count = int32(size)
+ return q
+}
+
// Start returns a derivative query with the given start point.
func (q *Query) Start(c Cursor) *Query {
q = q.clone()
@@ -325,6 +339,9 @@
if q.offset != 0 {
dst.Offset = proto.Int32(q.offset)
}
+ if q.count != 0 {
+ dst.Count = proto.Int32(q.count)
+ }
dst.CompiledCursor = q.start
dst.EndCompiledCursor = q.end
dst.Compile = proto.Bool(true)
@@ -394,7 +411,7 @@
if !res.GetMoreResults() {
break
}
- if err := callNext(c, res, newQ.offset-n, 0); err != nil {
+ if err := callNext(c, res, newQ.offset-n, q.count); err != nil {
return 0, err
}
}
@@ -409,15 +426,15 @@
// callNext issues a datastore_v3/Next RPC to advance a cursor, such as that
// returned by a query with more results.
-func callNext(c context.Context, res *pb.QueryResult, offset, limit int32) error {
+func callNext(c context.Context, res *pb.QueryResult, offset, count int32) error {
if res.Cursor == nil {
return errors.New("datastore: internal error: server did not return a cursor")
}
req := &pb.NextRequest{
Cursor: res.Cursor,
}
- if limit >= 0 {
- req.Count = proto.Int32(limit)
+ if count >= 0 {
+ req.Count = proto.Int32(count)
}
if offset != 0 {
req.Offset = proto.Int32(offset)
@@ -523,6 +540,7 @@
t := &Iterator{
c: c,
limit: q.limit,
+ count: q.count,
q: q,
prevCC: q.start,
}
@@ -536,9 +554,15 @@
return t
}
offset := q.offset - t.res.GetSkippedResults()
+ var count int32
+ if t.count > 0 && (t.limit < 0 || t.count < t.limit) {
+ count = t.count
+ } else {
+ count = t.limit
+ }
for offset > 0 && t.res.GetMoreResults() {
t.prevCC = t.res.CompiledCursor
- if err := callNext(t.c, &t.res, offset, t.limit); err != nil {
+ if err := callNext(t.c, &t.res, offset, count); err != nil {
t.err = err
break
}
@@ -566,6 +590,9 @@
// limit is the limit on the number of results this iterator should return.
// A negative value means unlimited.
limit int32
+ // count is the number of results this iterator should fetch at once. This
+ // should be equal to or greater than zero.
+ count int32
// q is the original query which yielded this iterator.
q *Query
// prevCC is the compiled cursor that marks the end of the previous batch
@@ -605,7 +632,13 @@
return nil, nil, t.err
}
t.prevCC = t.res.CompiledCursor
- if err := callNext(t.c, &t.res, 0, t.limit); err != nil {
+ var count int32
+ if t.count > 0 && (t.limit < 0 || t.count < t.limit) {
+ count = t.count
+ } else {
+ count = t.limit
+ }
+ if err := callNext(t.c, &t.res, 0, count); err != nil {
t.err = err
return nil, nil, t.err
}
diff --git a/datastore/query_test.go b/datastore/query_test.go
index f1b9de8..45e5313 100644
--- a/datastore/query_test.go
+++ b/datastore/query_test.go
@@ -464,7 +464,7 @@
},
{
desc: "standard query",
- query: NewQuery("kind").Order("-I").Filter("I >", 17).Filter("U =", "Dave").Limit(7).Offset(42),
+ query: NewQuery("kind").Order("-I").Filter("I >", 17).Filter("U =", "Dave").Limit(7).Offset(42).BatchSize(5),
want: &pb.Query{
Kind: proto.String("kind"),
Filter: []*pb.Query_Filter{
@@ -497,6 +497,7 @@
},
Limit: proto.Int32(7),
Offset: proto.Int32(42),
+ Count: proto.Int32(5),
},
},
{