| package events |
| |
| import ( |
| "context" |
| "fmt" |
| "sync" |
| "sync/atomic" |
| "time" |
| ) |
| |
| var txCounter int64 // replace this with something that won't break |
| |
| // nextTXID provides the next transaction identifier. |
| func nexttxID() int64 { |
| // TODO(stevvooe): Need to coordinate this with existing transaction logs. |
| // For now, this is a toy, but not a racy one. |
| return atomic.AddInt64(&txCounter, 1) |
| } |
| |
| type transaction struct { |
| ctx context.Context |
| id int64 |
| parent *transaction // if nil, no parent transaction |
| finish sync.Once |
| start, end time.Time // informational |
| } |
| |
| // begin creates a sub-transaction. |
| func (tx *transaction) begin(ctx context.Context, poster Poster) *transaction { |
| id := nexttxID() |
| |
| child := &transaction{ |
| ctx: ctx, |
| id: id, |
| parent: tx, |
| start: time.Now(), |
| } |
| |
| // post the transaction started event |
| poster.Post(ctx, child.makeTransactionEvent("begin")) // transactions are really just events |
| |
| return child |
| } |
| |
| // commit writes out the transaction. |
| func (tx *transaction) commit(poster Poster) { |
| tx.finish.Do(func() { |
| tx.end = time.Now() |
| poster.Post(tx.ctx, tx.makeTransactionEvent("commit")) |
| }) |
| } |
| |
| func (tx *transaction) rollback(poster Poster, cause error) { |
| tx.finish.Do(func() { |
| tx.end = time.Now() |
| event := tx.makeTransactionEvent("rollback") |
| event = fmt.Sprintf("%s error=%q", event, cause.Error()) |
| poster.Post(tx.ctx, event) |
| }) |
| } |
| |
| func (tx *transaction) makeTransactionEvent(status string) Event { |
| // TODO(stevvooe): obviously, we need more structure than this. |
| event := fmt.Sprintf("%v %v", status, tx.id) |
| if tx.parent != nil { |
| event += " parent=" + fmt.Sprint(tx.parent.id) |
| } |
| |
| return event |
| } |
| |
| type txKey struct{} |
| |
| func getTx(ctx context.Context) (*transaction, bool) { |
| tx := ctx.Value(txKey{}) |
| if tx == nil { |
| return nil, false |
| } |
| |
| return tx.(*transaction), true |
| } |
| |
| // WithTx returns a new context with an event transaction, such that events |
| // posted to the underlying context will be committed to the event log as a |
| // group, organized by a transaction id, when commit is called. |
| func WithTx(pctx context.Context) (ctx context.Context, commit func(), rollback func(err error)) { |
| poster := G(pctx) |
| parent, _ := getTx(pctx) |
| tx := parent.begin(pctx, poster) |
| |
| return context.WithValue(pctx, txKey{}, tx), func() { |
| tx.commit(poster) |
| }, func(err error) { |
| tx.rollback(poster, err) |
| } |
| } |