@@ -20,13 +20,14 @@ type Subscription struct {
20
20
startAt string
21
21
sessionHandle uintptr
22
22
channel string
23
+ query * string
23
24
bookmark Bookmark
24
25
}
25
26
26
27
// Open will open the subscription handle.
27
28
// It returns an error if the subscription handle is already open or if any step in the process fails.
28
29
// If the remote server is not reachable, it returns an error indicating the failure.
29
- func (s * Subscription ) Open (startAt string , sessionHandle uintptr , channel string , bookmark Bookmark ) error {
30
+ func (s * Subscription ) Open (startAt string , sessionHandle uintptr , channel string , query * string , bookmark Bookmark ) error {
30
31
if s .handle != 0 {
31
32
return fmt .Errorf ("subscription handle is already open" )
32
33
}
@@ -39,13 +40,24 @@ func (s *Subscription) Open(startAt string, sessionHandle uintptr, channel strin
39
40
_ = windows .CloseHandle (signalEvent )
40
41
}()
41
42
43
+ if channel != "" && query != nil {
44
+ return fmt .Errorf ("can not supply both query and channel" )
45
+ }
46
+
42
47
channelPtr , err := syscall .UTF16PtrFromString (channel )
43
48
if err != nil {
44
49
return fmt .Errorf ("failed to convert channel to utf16: %w" , err )
45
50
}
46
51
52
+ var queryPtr * uint16
53
+ if query != nil {
54
+ if queryPtr , err = syscall .UTF16PtrFromString (* query ); err != nil {
55
+ return fmt .Errorf ("failed to convert channel to utf16: %w" , err )
56
+ }
57
+ }
58
+
47
59
flags := s .createFlags (startAt , bookmark )
48
- subscriptionHandle , err := evtSubscribeFunc (sessionHandle , signalEvent , channelPtr , nil , bookmark .handle , 0 , 0 , flags )
60
+ subscriptionHandle , err := evtSubscribeFunc (sessionHandle , signalEvent , channelPtr , queryPtr , bookmark .handle , 0 , 0 , flags )
49
61
if err != nil {
50
62
return fmt .Errorf ("failed to subscribe to %s channel: %w" , channel , err )
51
63
}
@@ -55,6 +67,7 @@ func (s *Subscription) Open(startAt string, sessionHandle uintptr, channel strin
55
67
s .sessionHandle = sessionHandle
56
68
s .channel = channel
57
69
s .bookmark = bookmark
70
+ s .query = query
58
71
return nil
59
72
}
60
73
@@ -109,7 +122,7 @@ func (s *Subscription) readWithRetry(maxReads int) ([]Event, int, error) {
109
122
}
110
123
111
124
// reopen subscription with the same parameters
112
- if openErr := s .Open (s .startAt , s .sessionHandle , s .channel , s .bookmark ); openErr != nil {
125
+ if openErr := s .Open (s .startAt , s .sessionHandle , s .channel , s .query , s . bookmark ); openErr != nil {
113
126
return nil , maxReads , fmt .Errorf ("failed to reopen subscription during recovery: %w" , openErr )
114
127
}
115
128
0 commit comments