// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package disksort
import (
// CoGBK buffers all input and continues on FinishBundle. Use with small single-bundle data only.
type CoGBK struct {
UID exec.UnitID
Edge *graph.MultiEdge
Out exec.Node
wEnc exec.WindowEncoder
wDec exec.WindowDecoder
keyEnc exec.ElementEncoder
keyDec exec.ElementDecoder
valEnc []exec.ElementEncoder
valDec []exec.ElementDecoder
sorter disksort.Interface
func (n *CoGBK) ID() exec.UnitID {
return n.UID
func iterValueLess(a, b interface{}) bool {
av := a.(*iterValue)
bv := b.(*iterValue)
c := bytes.Compare(av.Key, bv.Key)
if c == 0 {
return av.Index < bv.Index
return c < 0
func (n *CoGBK) Up(ctx context.Context) error {
n.wEnc = exec.MakeWindowEncoder(n.Edge.Input[0].From.WindowingStrategy().Fn.Coder())
n.wDec = exec.MakeWindowDecoder(n.Edge.Input[0].From.WindowingStrategy().Fn.Coder())
n.keyEnc = exec.MakeElementEncoder(n.Edge.Input[0].From.Coder.Components[0])
n.keyDec = exec.MakeElementDecoder(n.Edge.Input[0].From.Coder.Components[0])
for _, input := range n.Edge.Input {
n.valEnc = append(n.valEnc, exec.MakeElementEncoder(input.From.Coder.Components[1]))
n.valDec = append(n.valDec, exec.MakeElementDecoder(input.From.Coder.Components[1]))
s, err := disksort.NewMergeSorter(disksort.MergeOptions{
Name: fmt.Sprintf("beam.%s.%s", n.Edge.Name(), n.Edge.Scope()),
MaxInMemory: 1024 * 1024,
MaxBytesInMemory: 1024 * 1024 * 256,
CompressShards: true,
Marshaler: iterValueMarshaler{},
Lesser: sortutil.LesserFunc(iterValueLess),
if err != nil {
return fmt.Errorf("error creating MergeSorter: %v", err)
n.sorter = s
return nil
func (n *CoGBK) StartBundle(ctx context.Context, id string, data exec.DataContext) error {
return n.Out.StartBundle(ctx, id, data)
func (n *CoGBK) ProcessElement(ctx context.Context, elm *exec.FullValue, _ ...exec.ReStream) error {
index := elm.Elm.(int) // index of Inputs
value := elm.Elm2.(*exec.FullValue) // actual KV<K,V>
fullVal := &exec.FullValue{Elm: value.Elm2, Timestamp: value.Timestamp} // strip K from KV<K,V>
var buf bytes.Buffer
if err := n.valEnc[index].Encode(fullVal, &buf); err != nil {
return fmt.Errorf("failed to encode val %v for CoGBK: %v", elm, err)
val := append([]byte{}, buf.Bytes()...)
// Encode KV per window
for _, w := range elm.Windows {
ws := []typex.Window{w}
fullKey := &exec.FullValue{Elm: value.Elm, Timestamp: value.Timestamp, Windows: ws} // strip V from KV<K,V>
if err := n.keyEnc.Encode(fullKey, &buf); err != nil {
return fmt.Errorf("failed to encode key %v for CoGBK: %v", elm, err)
if err := n.wEnc.Encode(ws, &buf); err != nil {
return fmt.Errorf("failed to encode window %v for CoGBK: %v", w, err)
key := append([]byte{}, buf.Bytes()...)
if err := n.sorter.Add(&iterValue{
Key: key,
Index: index,
Value: val,
}); err != nil {
return err
return nil
type iterStream struct {
n *CoGBK
iter disksort.Iterator
key []byte
idx int
buffer *iterValue // first element of this/next shard
prev *iterStream
opened bool
func (i *iterStream) Open() (exec.Stream, error) {
// TODO(schroederc): add disksort support for reiteration
if i.opened {
return nil, fmt.Errorf("disksort CoGBK does not support ReStreams")
i.opened = true
return i, nil
func (i *iterStream) Read() (*exec.FullValue, error) {
iv, err :=
if err != nil {
return nil, err
val, err := i.n.valDec[i.idx].Decode(bytes.NewBuffer(iv.Value))
if err != nil {
return nil, fmt.Errorf("error decoding value: %v", err)
return val, nil
func (i *iterStream) next() (*iterValue, error) {
if i.prev != nil {
for {
_, err :=
if err == io.EOF {
} else if err != nil {
return nil, fmt.Errorf("previous iterator error: %v", err)
i.buffer = i.prev.buffer
i.prev = nil
if i.iter == nil {
return nil, io.EOF
} else if i.buffer != nil {
if bytes.Equal(i.buffer.Key, i.key) && i.buffer.Index == i.idx {
iv := i.buffer
i.buffer = nil
return iv, nil
return nil, io.EOF
next, err := i.iter.Next()
if err == io.EOF {
return nil, io.EOF
} else if err != nil {
return nil, fmt.Errorf("error reading CoGBK[%v] element for key %q: %v", i.n.ID(), string(i.key), err)
iv := next.(*iterValue)
if bytes.Equal(iv.Key, i.key) && iv.Index == i.idx {
return iv, nil
i.buffer = iv
return nil, io.EOF
func (i *iterStream) Close() error {
i.iter = nil
return nil
type iterValue struct {
Key []byte
Index int
Value []byte
func (v *iterValue) Size() int { return len(v.Key) + len(v.Value) + 3*binary.MaxVarintLen64 }
func (v *iterValue) String() string {
return fmt.Sprintf("key=%q index=%d len(val)=%d", string(v.Key), v.Index, len(v.Value))
type iterValueMarshaler struct{}
func (iterValueMarshaler) Marshal(v interface{}) ([]byte, error) {
iv := v.(*iterValue)
buf := make([]byte, binary.MaxVarintLen64+len(iv.Key)+binary.MaxVarintLen64+binary.MaxVarintLen64+len(iv.Value))
i := binary.PutVarint(buf, int64(len(iv.Key)))
copy(buf[i:], iv.Key)
i += len(iv.Key)
i += binary.PutVarint(buf[i:], int64(iv.Index))
i += binary.PutVarint(buf[i:], int64(len(iv.Value)))
copy(buf[i:], iv.Value)
i += len(iv.Value)
return buf[:i], nil
func (iterValueMarshaler) Unmarshal(rec []byte) (interface{}, error) {
var iv iterValue
r := bytes.NewReader(rec)
keySize, err := binary.ReadVarint(r)
if err != nil {
return nil, fmt.Errorf("error reading key size: %v", err)
iv.Key = make([]byte, int(keySize))
if n, err := r.Read(iv.Key); err != nil || n != int(keySize) {
return nil, fmt.Errorf("error reading key %d!=%d: %v", n, keySize, err)
idx, err := binary.ReadVarint(r)
if err != nil {
return nil, fmt.Errorf("error reading index: %v", err)
iv.Index = int(idx)
valueSize, err := binary.ReadVarint(r)
if err != nil {
return nil, fmt.Errorf("error reading value size: %v", err)
iv.Value = make([]byte, int(valueSize))
if n, err := r.Read(iv.Value); err != nil || n != int(valueSize) {
return nil, fmt.Errorf("error reading value %d!=%d: %v", n, valueSize, err)
return &iv, nil
func (n *CoGBK) FinishBundle(ctx context.Context) error {
iter, err := n.sorter.Iterator()
if err != nil {
return fmt.Errorf("error creating disksort.Iterator: %v", err)
var totalKeys int
next, iterErr := iter.Next()
for {
if iterErr == io.EOF {
if err := iter.Close(); err != nil {
return fmt.Errorf("error closing disksort.Iterator: %v", err)
if *verbose {
log.Printf("CoGBK = %d", totalKeys)
return n.Out.FinishBundle(ctx)
} else if iterErr != nil {
return fmt.Errorf("error reading disksort.Iterator: %v", iterErr)
iv := next.(*iterValue)
next = nil
buf := bytes.NewBuffer(iv.Key)
fullKey, err := n.keyDec.Decode(buf)
if err != nil {
return fmt.Errorf("error decoding key %q: %v", string(iv.Key), err)
ws, err := n.wDec.Decode(buf)
if err != nil {
return fmt.Errorf("error decoding key window %q: %v", string(iv.Key), err)
fullKey.Windows = ws
values := make([]exec.ReStream, len(n.Edge.Input))
for i := 0; i < len(values); i++ {
is := &iterStream{n: n, key: iv.Key, idx: i, iter: iter}
if i == iv.Index {
is.buffer = iv
} else if i < iv.Index {
is.iter = nil // already closed
} else {
is.prev = values[i-1].(*iterStream)
values[i] = is
iv = nil
if err := n.Out.ProcessElement(ctx, fullKey, values...); err != nil {
return err
// Advance to next key
last := values[len(values)-1].(*iterStream)
for {
next, iterErr =
if iterErr == io.EOF {
if last.buffer != nil {
next = last.buffer
iterErr = nil
} else if iterErr != nil {
func (n *CoGBK) Down(ctx context.Context) error {
return nil
func (n *CoGBK) String() string {
return fmt.Sprintf("CoGBK. Out:%v", n.Out.ID())