Skip to content

Commit 82f50b7

Browse files
Asynchronous function invocation support
1 parent 2309529 commit 82f50b7

File tree

10 files changed

+762
-149
lines changed

10 files changed

+762
-149
lines changed

cel/env.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -307,14 +307,13 @@ func (e *Env) ParseSource(src common.Source) (*Ast, *Issues) {
307307

308308
// Program generates an evaluable instance of the Ast within the environment (Env).
309309
func (e *Env) Program(ast *Ast, opts ...ProgramOption) (Program, error) {
310-
optSet := e.progOpts
311-
if len(opts) != 0 {
312-
mergedOpts := []ProgramOption{}
313-
mergedOpts = append(mergedOpts, e.progOpts...)
314-
mergedOpts = append(mergedOpts, opts...)
315-
optSet = mergedOpts
316-
}
317-
return newProgram(e, ast, optSet)
310+
return e.newProgram(ast, opts /* async= */, false)
311+
}
312+
313+
// AsyncProgram generates an evaluable instance of the Ast with support for asynchronous extension
314+
// functions.
315+
func (e *Env) AsyncProgram(ast *Ast, opts ...ProgramOption) (AsyncProgram, error) {
316+
return e.newProgram(ast, opts /* async= */, true)
318317
}
319318

320319
// SetFeature sets the given feature flag, as enumerated in options.go.
@@ -427,8 +426,8 @@ func (i *Issues) Err() error {
427426
if i == nil {
428427
return nil
429428
}
430-
if len(i.errs.GetErrors()) > 0 {
431-
return errors.New(i.errs.ToDisplayString())
429+
if len(i.Errors()) > 0 {
430+
return errors.New(i.String())
432431
}
433432
return nil
434433
}

cel/program.go

Lines changed: 149 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package cel
1616

1717
import (
18+
"context"
19+
"errors"
1820
"fmt"
1921

2022
"github.com/google/cel-go/common/types"
@@ -28,19 +30,31 @@ import (
2830
type Program interface {
2931
// Eval returns the result of an evaluation of the Ast and environment against the input vars.
3032
//
31-
// The vars value may either be an `interpreter.Activation` or a `map[string]interface{}`.
33+
// The argument value may either be an `interpreter.Activation` or a `map[string]interface{}`.
3234
//
3335
// If the `OptTrackState` or `OptExhaustiveEval` flags are used, the `details` response will
3436
// be non-nil. Given this caveat on `details`, the return state from evaluation will be:
3537
//
3638
// * `val`, `details`, `nil` - Successful evaluation of a non-error result.
3739
// * `val`, `details`, `err` - Successful evaluation to an error result.
3840
// * `nil`, `details`, `err` - Unsuccessful evaluation.
41+
Eval(interface{}) (ref.Val, *EvalDetails, error)
42+
}
43+
44+
// AsyncProgram is an evaluable view of an Ast which may contain asynchronous compute.
45+
type AsyncProgram interface {
46+
// AsyncEval returns the result of an evaluation of the Ast against a given input.
3947
//
40-
// An unsuccessful evaluation is typically the result of a series of incompatible `EnvOption`
41-
// or `ProgramOption` values used in the creation of the evaluation environment or executable
42-
// program.
43-
Eval(vars interface{}) (ref.Val, *EvalDetails, error)
48+
// The input arguments (apart from Context) and return values for AsyncEval mirror those from
49+
// the standard Eval call.
50+
AsyncEval(context.Context, interface{}) (ref.Val, *EvalDetails, error)
51+
}
52+
53+
// programWrapper embeds both the Program and AsyncProgram interface, but in practice only one
54+
// interface is exposed through the top-level 'cel' package exports.
55+
type programWrapper struct {
56+
Program
57+
AsyncProgram
4458
}
4559

4660
// NoVars returns an empty Activation.
@@ -92,17 +106,19 @@ func (ed *EvalDetails) State() interpreter.EvalState {
92106
// prog is the internal implementation of the Program interface.
93107
type prog struct {
94108
*Env
95-
evalOpts EvalOption
96-
decorators []interpreter.InterpretableDecorator
97-
defaultVars interpreter.Activation
98-
dispatcher interpreter.Dispatcher
99-
interpreter interpreter.Interpreter
100-
interpretable interpreter.Interpretable
101-
attrFactory interpreter.AttributeFactory
109+
async bool
110+
evalOpts EvalOption
111+
decorators []interpreter.InterpretableDecorator
112+
defaultVars interpreter.Activation
113+
dispatcher interpreter.Dispatcher
114+
interpreter *interpreter.Interpreter
115+
interpretable interpreter.Interpretable
116+
asyncInterpretable interpreter.AsyncInterpretable
117+
attrFactory interpreter.AttributeFactory
102118
}
103119

104120
// progFactory is a helper alias for marking a program creation factory function.
105-
type progFactory func(interpreter.EvalState) (Program, error)
121+
type progFactory func(interpreter.EvalState) (*programWrapper, error)
106122

107123
// progGen holds a reference to a progFactory instance and implements the Program interface.
108124
type progGen struct {
@@ -113,7 +129,16 @@ type progGen struct {
113129
// ProgramOption values.
114130
//
115131
// If the program cannot be configured the prog will be nil, with a non-nil error response.
116-
func newProgram(e *Env, ast *Ast, opts []ProgramOption) (Program, error) {
132+
func (e *Env) newProgram(ast *Ast,
133+
opts []ProgramOption,
134+
async bool) (*programWrapper, error) {
135+
optSet := e.progOpts
136+
if len(opts) != 0 {
137+
mergedOpts := []ProgramOption{}
138+
mergedOpts = append(mergedOpts, e.progOpts...)
139+
mergedOpts = append(mergedOpts, opts...)
140+
optSet = mergedOpts
141+
}
117142
// Build the dispatcher, interpreter, and default program value.
118143
disp := interpreter.NewDispatcher()
119144

@@ -123,11 +148,12 @@ func newProgram(e *Env, ast *Ast, opts []ProgramOption) (Program, error) {
123148
Env: e,
124149
decorators: []interpreter.InterpretableDecorator{},
125150
dispatcher: disp,
151+
async: async,
126152
}
127153

128154
// Configure the program via the ProgramOption values.
129155
var err error
130-
for _, opt := range opts {
156+
for _, opt := range optSet {
131157
if opt == nil {
132158
return nil, fmt.Errorf("program options should be non-nil")
133159
}
@@ -159,12 +185,13 @@ func newProgram(e *Env, ast *Ast, opts []ProgramOption) (Program, error) {
159185
if p.evalOpts&OptExhaustiveEval == OptExhaustiveEval {
160186
// State tracking requires that each Eval() call operate on an isolated EvalState
161187
// object; hence, the presence of the factory.
162-
factory := func(state interpreter.EvalState) (Program, error) {
188+
factory := func(state interpreter.EvalState) (*programWrapper, error) {
163189
decs := append(decorators, interpreter.ExhaustiveEval(state))
164190
clone := &prog{
165191
evalOpts: p.evalOpts,
166192
defaultVars: p.defaultVars,
167193
Env: e,
194+
async: p.async,
168195
dispatcher: disp,
169196
interpreter: interp}
170197
return initInterpretable(clone, ast, decs)
@@ -174,7 +201,7 @@ func newProgram(e *Env, ast *Ast, opts []ProgramOption) (Program, error) {
174201
// Enable state tracking last since it too requires the factory approach but is less
175202
// featured than the ExhaustiveEval decorator.
176203
if p.evalOpts&OptTrackState == OptTrackState {
177-
factory := func(state interpreter.EvalState) (Program, error) {
204+
factory := func(state interpreter.EvalState) (*programWrapper, error) {
178205
decs := append(decorators, interpreter.TrackState(state))
179206
clone := &prog{
180207
evalOpts: p.evalOpts,
@@ -191,31 +218,37 @@ func newProgram(e *Env, ast *Ast, opts []ProgramOption) (Program, error) {
191218

192219
// initProgGen tests the factory object by calling it once and returns a factory-based Program if
193220
// the test is successful.
194-
func initProgGen(factory progFactory) (Program, error) {
221+
func initProgGen(factory progFactory) (*programWrapper, error) {
195222
// Test the factory to make sure that configuration errors are spotted at config
196223
_, err := factory(interpreter.NewEvalState())
197224
if err != nil {
198225
return nil, err
199226
}
200-
return &progGen{factory: factory}, nil
227+
pg := &progGen{factory: factory}
228+
wrapper := &programWrapper{Program: pg, AsyncProgram: pg}
229+
return wrapper, nil
201230
}
202231

203232
// initIterpretable creates a checked or unchecked interpretable depending on whether the Ast
204233
// has been run through the type-checker.
205-
func initInterpretable(
206-
p *prog,
234+
func initInterpretable(p *prog,
207235
ast *Ast,
208-
decorators []interpreter.InterpretableDecorator) (Program, error) {
236+
decorators []interpreter.InterpretableDecorator) (*programWrapper, error) {
209237
var err error
210238
// Unchecked programs do not contain type and reference information and may be
211239
// slower to execute than their checked counterparts.
212240
if !ast.IsChecked() {
213-
p.interpretable, err =
214-
p.interpreter.NewUncheckedInterpretable(ast.Expr(), decorators...)
241+
if p.async {
242+
p.asyncInterpretable, err =
243+
p.interpreter.NewAsyncUncheckedInterpretable(ast.Expr(), decorators...)
244+
} else {
245+
p.interpretable, err =
246+
p.interpreter.NewUncheckedInterpretable(ast.Expr(), decorators...)
247+
}
215248
if err != nil {
216249
return nil, err
217250
}
218-
return p, nil
251+
return &programWrapper{Program: p, AsyncProgram: p}, nil
219252
}
220253
// When the AST has been checked it contains metadata that can be used to speed up program
221254
// execution.
@@ -224,16 +257,25 @@ func initInterpretable(
224257
if err != nil {
225258
return nil, err
226259
}
227-
p.interpretable, err = p.interpreter.NewInterpretable(checked, decorators...)
260+
if p.async {
261+
p.asyncInterpretable, err =
262+
p.interpreter.NewAsyncInterpretable(checked, decorators...)
263+
} else {
264+
p.interpretable, err = p.interpreter.NewInterpretable(checked, decorators...)
265+
}
228266
if err != nil {
229267
return nil, err
230268
}
231-
232-
return p, nil
269+
return &programWrapper{Program: p, AsyncProgram: p}, nil
233270
}
234271

235272
// Eval implements the Program interface method.
236273
func (p *prog) Eval(input interface{}) (v ref.Val, det *EvalDetails, err error) {
274+
// In general this should never happen, since only one view (sync, async) is returned back to
275+
// the caller.
276+
if p.interpretable == nil {
277+
return nil, nil, errors.New("async program invoked synchronously")
278+
}
237279
// Configure error recovery for unexpected panics during evaluation. Note, the use of named
238280
// return values makes it possible to modify the error response during the recovery
239281
// function.
@@ -242,16 +284,39 @@ func (p *prog) Eval(input interface{}) (v ref.Val, det *EvalDetails, err error)
242284
err = fmt.Errorf("internal error: %v", r)
243285
}
244286
}()
245-
// Build a hierarchical activation if there are default vars set.
246-
vars, err := interpreter.NewActivation(input)
287+
vars, err := p.vars(input)
288+
v = p.interpretable.Eval(vars)
289+
// The output of an internal Eval may have a value (`v`) that is a types.Err. This step
290+
// translates the CEL value to a Go error response. This interface does not quite match the
291+
// RPC signature which allows for multiple errors to be returned, but should be sufficient.
292+
if types.IsError(v) {
293+
err = v.Value().(error)
294+
}
295+
return
296+
}
297+
298+
// AsyncEval implements the AsyncProgram interface method.
299+
func (p *prog) AsyncEval(ctx context.Context,
300+
input interface{}) (v ref.Val, det *EvalDetails, err error) {
301+
// In general this should never happen, since only one view (sync, async) is returned back to
302+
// the caller.
303+
if p.asyncInterpretable == nil {
304+
return nil, nil, errors.New("sync program invoked asynchronously")
305+
}
306+
// Configure error recovery for unexpected panics during evaluation. Note, the use of named
307+
// return values makes it possible to modify the error response during the recovery
308+
// function.
309+
defer func() {
310+
if r := recover(); r != nil {
311+
err = fmt.Errorf("internal error: %v", r)
312+
}
313+
}()
314+
asyncVars, err := p.asyncVars(input)
247315
if err != nil {
248316
return
249317
}
250-
if p.defaultVars != nil {
251-
vars = interpreter.NewHierarchicalActivation(p.defaultVars, vars)
252-
}
253-
v = p.interpretable.Eval(vars)
254-
// The output of an internal Eval may have a value (`v`) that is a types.Err. This step
318+
v = p.asyncInterpretable.AsyncEval(ctx, asyncVars)
319+
// The output of an internal AsyncEval may have a value (`v`) that is a types.Err. This step
255320
// translates the CEL value to a Go error response. This interface does not quite match the
256321
// RPC signature which allows for multiple errors to be returned, but should be sufficient.
257322
if types.IsError(v) {
@@ -260,6 +325,30 @@ func (p *prog) Eval(input interface{}) (v ref.Val, det *EvalDetails, err error)
260325
return
261326
}
262327

328+
// asyncVars creates an AsyncActivation suitable for tracking async invocations in a manner which
329+
// can be used to orchestrate async calls needed to complete expression evaluation.
330+
func (p *prog) asyncVars(input interface{}) (*interpreter.AsyncActivation, error) {
331+
vars, err := p.vars(input)
332+
if err != nil {
333+
return nil, err
334+
}
335+
return interpreter.NewAsyncActivation(vars), nil
336+
}
337+
338+
// vars creates an Activation from the input, and if applicable extends the set of default values
339+
// configured via ProgramOptions.
340+
func (p *prog) vars(input interface{}) (interpreter.Activation, error) {
341+
vars, err := interpreter.NewActivation(input)
342+
if err != nil {
343+
return nil, err
344+
}
345+
// Build a hierarchical activation if there are default vars set.
346+
if p.defaultVars != nil {
347+
return interpreter.NewHierarchicalActivation(p.defaultVars, vars), nil
348+
}
349+
return vars, nil
350+
}
351+
263352
// Eval implements the Program interface method.
264353
func (gen *progGen) Eval(input interface{}) (ref.Val, *EvalDetails, error) {
265354
// The factory based Eval() differs from the standard evaluation model in that it generates a
@@ -283,3 +372,28 @@ func (gen *progGen) Eval(input interface{}) (ref.Val, *EvalDetails, error) {
283372
}
284373
return v, det, nil
285374
}
375+
376+
// AsyncEval implements the AsyncProgram interface method.
377+
func (gen *progGen) AsyncEval(ctx context.Context,
378+
input interface{}) (ref.Val, *EvalDetails, error) {
379+
// The factory based AsyncEval() differs from the standard evaluation model in that it
380+
// generates a new EvalState instance for each call to ensure that unique evaluations yield
381+
// unique stateful results.
382+
state := interpreter.NewEvalState()
383+
det := &EvalDetails{state: state}
384+
385+
// Generate a new instance of the interpretable using the factory configured during the call to
386+
// newProgram(). It is incredibly unlikely that the factory call will generate an error given
387+
// the factory test performed within the Program() call.
388+
p, err := gen.factory(state)
389+
if err != nil {
390+
return nil, det, err
391+
}
392+
393+
// Evaluate the input, returning the result and the 'state' within EvalDetails.
394+
v, _, err := p.AsyncEval(ctx, input)
395+
if err != nil {
396+
return v, det, err
397+
}
398+
return v, det, nil
399+
}

common/types/ref/provider.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,3 +101,11 @@ type FieldTester func(target interface{}) bool
101101

102102
// FieldGetter is used to get the field value from an input object, if set.
103103
type FieldGetter func(target interface{}) (interface{}, error)
104+
105+
// Resolver abstracts variable and type identifier resolution behind a single interface
106+
// method.
107+
type Resolver interface {
108+
// ResolveName returns the value associated with the given fully qualified name, if
109+
// present.
110+
ResolveName(name string) (interface{}, bool)
111+
}

interpreter/activation.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,7 @@ type Activation interface {
3737

3838
// EmptyActivation returns a variable free activation.
3939
func EmptyActivation() Activation {
40-
// This call cannot fail.
41-
a, _ := NewActivation(map[string]interface{}{})
42-
return a
40+
return emptyActivation
4341
}
4442

4543
// NewActivation returns an activation based on a map-based binding where the map keys are
@@ -197,6 +195,9 @@ func (v *varActivation) ResolveName(name string) (interface{}, bool) {
197195
}
198196

199197
var (
198+
// emptyActivation is a singleton activation which provides no input
199+
emptyActivation = &mapActivation{bindings: map[string]interface{}{}}
200+
200201
// pool of var activations to reduce allocations during folds.
201202
varActivationPool = &sync.Pool{
202203
New: func() interface{} {

0 commit comments

Comments
 (0)