Skip to content

Commit 82a09c0

Browse files
authored
Merge pull request #36 from kjellwinblad/kjell/timeout_nif
Add support for timeouts in the NIF backend
2 parents 2908a24 + d4e890c commit 82a09c0

File tree

12 files changed

+314
-38
lines changed

12 files changed

+314
-38
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,19 +50,21 @@ application is loaded (the jq Erlang application is loaded automatically when
5050
* `jq_filter_program_lru_cache_max_size` (default value = 500) - Sets the size of
5151
the LRU caches that are holding compiled JQ programs to prevent frequent
5252
expensive recompilation of the same program.
53-
* `jq_implementation_module` (default value = `jq_port`) - Sets the implementation
53+
* `jq_implementation_module` (default value = `jq_nif`) - Sets the implementation
5454
that will be used. The options are:
5555
* `jq_port` - This implementation uses a port program to interact with jq.
5656
This is the most safe option as a bug in jq cannot
5757
cause the Erlang VM to crash or leak memory.
5858
* `jq_nif` - This implementation uses a NIF library to interact with jq. This
5959
option is faster than the `jq_port` option but it is also less safe even
6060
though we are currently not not aware of any problems with this option.
61+
One can use the function `jq:set_implementation_module/2` to change the
62+
implementation while the application is running.
6163
* `jq_port_nr_of_jq_port_servers` (default value =
6264
`erlang:system_info(schedulers)`) (only relevant for the `jq_port` option) -
6365
Use this option to set how many port programs that will handle jq requests.
6466
Higher values can lead to better performance (due to parallelism) at the
65-
expense of increased memory usage and cache locality.
67+
expense of increased memory usage and worse cache locality.
6668
* `jq_port_restart_period` (default value = 1000000) (only relevant for the
6769
`jq_port` option) - Use this option to set how many `jq:process_json/2` calls
6870
a port program can process before it is restarted. This is a safety option

c_src/Makefile

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@ NIF_MODULE := jq_nif
1313
JQURL := https://github.com/emqx/jqc.git
1414
JQSRC_DIR := libs/jqc
1515
JQSRC := $(JQSRC_DIR)/src/jv.c
16+
JQSRC_VERSION := 73c51aa79b67b99c575ef8923aa0c03cd87a7ac1
1617
RDSURL := https://github.com/emqx/c_reusable_data_structures.git
1718
RDSSRC_DIR := libs/c_reusable_data_structures
1819
RDSSRC := $(RDSSRC_DIR)/Makefile
20+
RDSSRC_VERSION := 77c7a96416065a839417e5078fce9ba9a14982dc
1921
LIBJQ_DIR ?= $(JQSRC_DIR)/.libs
2022
LIBJQ_PREFIX := /usr/local
2123
EXT_LIBS := ext_libs
@@ -104,7 +106,9 @@ PORT_OBJECTS = $(addsuffix .o, $(basename $(PORT_SOURCES)))
104106
COMPILE_C = $(c_verbose) $(CC) $(CFLAGS) $(CPPFLAGS) -c
105107
COMPILE_CPP = $(cpp_verbose) $(CXX) $(CXXFLAGS) $(CPPFLAGS) -c
106108

107-
.PHONY: clean
109+
.PHONY: clean check_jqc_version all check_rds_version
110+
111+
all: check_jqc_version check_rds_version $(C_SRC_OUTPUT)
108112

109113
ifeq ($(JQ_MEMSAN_DEBUG), 1)
110114
$(C_SRC_OUTPUT): $(OBJECTS) port_nif_common.h
@@ -131,16 +135,25 @@ $(ERL_PORT_PROGRAM): $(PORT_OBJECTS)
131135
%.o: %.c $(LIBJQ_NAME) $(JQERLMODSRC) $(RDSSRC) $(PORT_SOURCES)
132136
$(COMPILE_C) $(OUTPUT_OPTION) $<
133137

138+
check_jqc_version:
139+
( cd libs/jqc && \
140+
[ `git rev-parse --verify HEAD` = '$(JQSRC_VERSION)' ] || (cd .. && rm -rf jqc))
141+
142+
134143
$(JQSRC):
135144
git clone -b jq-1.6-emqx --single-branch "$(JQURL)" "$(JQSRC_DIR)"
136-
((cd "$(JQSRC_DIR)" && git checkout 51df83ccf4808867eee79a011c71a93a121be1b6) || \
145+
((cd "$(JQSRC_DIR)" && git checkout $(JQSRC_VERSION)) || \
137146
(echo "Failed to check out jq commit" && \
138147
rm -r "$(JQSRC_DIR)" && \
139148
false))
140149

150+
check_rds_version:
151+
( cd libs/c_reusable_data_structures && \
152+
[ `git rev-parse --verify HEAD` = '$(RDSSRC_VERSION)' ] || (cd .. && rm -rf c_reusable_data_structures))
153+
141154
$(RDSSRC):
142155
git clone -b master --single-branch "$(RDSURL)" "$(RDSSRC_DIR)"
143-
((cd "$(RDSSRC_DIR)" && git checkout 77c7a96416065a839417e5078fce9ba9a14982dc) || \
156+
((cd "$(RDSSRC_DIR)" && git checkout $(RDSSRC_VERSION)) || \
144157
(echo "Failed to check out c_reusable_data_structures commit" && \
145158
rm -r "$(RDSSRC_DIR)" && \
146159
false))

c_src/erlang_jq_nif.c

Lines changed: 131 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
#include <erl_nif.h>
2+
#include "jq.h"
23
#include "jv.h"
34
#include "port_nif_common.h"
45

6+
#include <stdatomic.h>
57
#include <stdbool.h>
68
#include <setjmp.h>
79

@@ -96,6 +98,9 @@ typedef struct {
9698
JQStateCacheEntry_lru_ptr_dynarr caches;
9799
// Lock protecting the above field from concurrent modifications
98100
ErlNifMutex * lock;
101+
// NIF resource that holds a jq state that is used when timing out
102+
// NIF calls
103+
ErlNifResourceType * jq_state_holder_resource_type;
99104
} module_private_data;
100105

101106

@@ -162,6 +167,9 @@ jq_state* get_jq_state(
162167
JQStateCacheEntry_lru_get(cache, new_entry);
163168
if (cache_entry != NULL) {
164169
// Return jq_state from cache
170+
// It is important that we reset the cancel state so
171+
// that the jq execution don't get canceled immediately
172+
jq_reset_cancel_state(cache_entry->state);
165173
return cache_entry->state;
166174
}
167175
} else {
@@ -178,6 +186,7 @@ jq_state* get_jq_state(
178186
erljq_free(error_message);
179187
return NULL;
180188
}
189+
jq_reset_cancel_state(jq);
181190
if (cache != NULL) {
182191
// Add new entry to cache
183192
char * filter_program_string = erljq_alloc(erl_jq_filter.size);
@@ -192,7 +201,7 @@ jq_state* get_jq_state(
192201
}
193202

194203

195-
// Process the JSON obejct value using the compiled filter program in the
204+
// Process the JSON object value using the compiled filter program in the
196205
// given jq_state
197206
static int process_json(
198207
jq_state *jq,
@@ -233,6 +242,14 @@ static int process_json(
233242
(char*)enif_make_new_binary(env, binsz, error_msg_bin_ptr);
234243
memcpy(bin_data, error_message, binsz);
235244
erljq_free(error_message);
245+
// We might have results in the result strings (e.g., when there is a
246+
// timeout) so we need to free them as well
247+
size_t nr_of_result_objects = String_dynarr_size(&result_strings);
248+
for (size_t i = 0; i < nr_of_result_objects; i++) {
249+
String result = String_dynarr_item_at(&result_strings, i);
250+
erljq_free(result.string);
251+
}
252+
String_dynarr_destroy(&result_strings);
236253
}
237254
return res;
238255
}
@@ -253,9 +270,66 @@ static ERL_NIF_TERM make_ok_return(ErlNifEnv* env, ERL_NIF_TERM result) {
253270
return enif_make_tuple2(env, enif_make_atom(env, "ok"), result);
254271
}
255272

273+
typedef struct {
274+
ErlNifMutex* lock;
275+
volatile atomic_bool consumed_by_process_json_nif;
276+
jq_state* state;
277+
} JQCancelableStateHolderResource;
278+
279+
static ERL_NIF_TERM
280+
create_jq_resource_nif(
281+
ErlNifEnv* env,
282+
int argc,
283+
const ERL_NIF_TERM argv[]) {
284+
module_private_data* module_data = enif_priv_data(env);
285+
JQCancelableStateHolderResource* cancelable_state_holder =
286+
enif_alloc_resource(module_data->jq_state_holder_resource_type,
287+
sizeof(JQCancelableStateHolderResource));
288+
cancelable_state_holder->state = NULL;
289+
static char* lock_name = "jq_nif_resource_lock";
290+
cancelable_state_holder->lock = enif_mutex_create(lock_name);
291+
atomic_init(&cancelable_state_holder->consumed_by_process_json_nif, false);
292+
ERL_NIF_TERM term = enif_make_resource(env, cancelable_state_holder);
293+
enif_release_resource(cancelable_state_holder);
294+
return term;
295+
}
296+
297+
static ERL_NIF_TERM
298+
cancel_jq_resource_nif(
299+
ErlNifEnv* env,
300+
int argc,
301+
const ERL_NIF_TERM argv[]) {
302+
module_private_data* module_data = enif_priv_data(env);
303+
JQCancelableStateHolderResource* cancelable_state_holder;
304+
if (enif_get_resource(
305+
env,
306+
argv[0],
307+
module_data->jq_state_holder_resource_type,
308+
(void**)&cancelable_state_holder)) {
309+
if (!atomic_load(&cancelable_state_holder->consumed_by_process_json_nif)) {
310+
// The resource has not been passed to process_json_nif yet. We
311+
// schedule ourselves out and will try again later
312+
enif_consume_timeslice(env, 100);
313+
return enif_make_atom(env, "retry");
314+
}
315+
enif_mutex_lock(cancelable_state_holder->lock);
316+
if (cancelable_state_holder->state != NULL) {
317+
// jq_cancel is a function to cancel the jq execution from
318+
// another thread. This function only exists in the our
319+
// patched version of the jq library.
320+
jq_cancel(cancelable_state_holder->state);
321+
}
322+
enif_mutex_unlock(cancelable_state_holder->lock);
323+
} else {
324+
return enif_make_badarg(env);
325+
}
326+
return enif_make_atom(env, "ok");
327+
}
328+
256329
// NIF function taking a binaries for a filter program and a JSON text
257330
// and returning the result of processing the JSON text with the
258-
// filter program
331+
// filter program. The third argument can be a NIF resource that other
332+
// threads can use to cancel the execution
259333
static ERL_NIF_TERM process_json_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
260334
ERL_NIF_TERM error_msg_bin;
261335
// ----------------------------- init --------------------------------------
@@ -264,7 +338,25 @@ static ERL_NIF_TERM process_json_nif(ErlNifEnv* env, int argc, const ERL_NIF_TER
264338
int ret = JQ_ERROR_UNKNOWN;
265339
ERL_NIF_TERM ret_term;
266340
int dumpopts = 512; // JV_PRINT_SPACE1
267-
341+
JQCancelableStateHolderResource* cancelable_state_holder = NULL;
342+
bool cancelable_state_holder_opened = false;
343+
if (argc > 2) {
344+
// Third argument is a NIF resource for canceling
345+
// the jq execution from another thread
346+
module_private_data* module_data = enif_priv_data(env);
347+
if (!enif_get_resource(
348+
env,
349+
argv[2],
350+
module_data->jq_state_holder_resource_type,
351+
(void**)&cancelable_state_holder)) {
352+
ret = JQ_ERROR_BADARG;
353+
const char* error_message =
354+
"Expected a resource as third argument";
355+
error_msg_bin =
356+
make_error_msg_bin(env, error_message, strlen(error_message));
357+
goto out;
358+
}
359+
}
268360
// --------------------------- read args -----------------------------------
269361
ErlNifBinary erl_jq_filter;
270362
ErlNifBinary erl_json_text;
@@ -280,13 +372,22 @@ static ERL_NIF_TERM process_json_nif(ErlNifEnv* env, int argc, const ERL_NIF_TER
280372
if (setjmp(nomem_handling_jmp_buf)) {
281373
// Give badarg exception as this seems more reasonable than allocating and
282374
// returning an error tuple when we have run out of memory
375+
if (cancelable_state_holder != NULL && !cancelable_state_holder_opened) {
376+
atomic_store(&cancelable_state_holder->consumed_by_process_json_nif, true);
377+
}
283378
return enif_make_badarg(env);
284379
}
285380
// --------- get jq state and compile filter program if not cached ---------
286381
jq = get_jq_state(env, &error_msg_bin, &ret, erl_jq_filter, &remove_jq_object);
287382
if (jq == NULL) {
288383
goto out;
384+
} else if (cancelable_state_holder != NULL) {
385+
cancelable_state_holder->state = jq;
386+
// jq state can be timed out after the following unlock call
387+
atomic_store(&cancelable_state_holder->consumed_by_process_json_nif, true);
388+
cancelable_state_holder_opened = true;
289389
}
390+
290391
ERL_NIF_TERM ret_list = enif_make_list(env, 0);
291392
ret = process_json(
292393
jq,
@@ -296,8 +397,17 @@ static ERL_NIF_TERM process_json_nif(ErlNifEnv* env, int argc, const ERL_NIF_TER
296397
0,
297398
dumpopts,
298399
&error_msg_bin);
400+
if (cancelable_state_holder != NULL) {
401+
// We don't want any timeouts after this point
402+
enif_mutex_lock(cancelable_state_holder->lock);
403+
cancelable_state_holder->state = NULL;
404+
enif_mutex_unlock(cancelable_state_holder->lock);
405+
}
299406

300407
out:// ----------------------------- release -----------------------------------
408+
if (cancelable_state_holder != NULL && !cancelable_state_holder_opened) {
409+
atomic_store(&cancelable_state_holder->consumed_by_process_json_nif, true);
410+
}
301411
switch (ret) {
302412
default:
303413
assert(0 && "invalid ret");
@@ -306,6 +416,7 @@ static ERL_NIF_TERM process_json_nif(ErlNifEnv* env, int argc, const ERL_NIF_TER
306416
case JQ_ERROR_BADARG:
307417
case JQ_ERROR_COMPILE:
308418
case JQ_ERROR_PARSE:
419+
case JQ_ERROR_TIMEOUT:
309420
case JQ_ERROR_PROCESS: {
310421
ret_term = make_error_return(env, ret, error_msg_bin);
311422
break;
@@ -386,6 +497,12 @@ static int get_int_config(
386497
return 0;
387498
}
388499

500+
501+
static void jq_state_holder_resource_dtor(ErlNifEnv* caller_env, void* obj) {
502+
JQCancelableStateHolderResource* cancelable_state_holder = obj;
503+
enif_mutex_destroy(cancelable_state_holder->lock);
504+
}
505+
389506
static int load_helper(
390507
ErlNifEnv* caller_env,
391508
void** priv_data,
@@ -436,6 +553,14 @@ static int load_helper(
436553
erljq_free(data);
437554
return 1;
438555
}
556+
static const char * jq_state_holder_resource_type_name = "jq_state_holder_resource_type";
557+
data->jq_state_holder_resource_type = enif_open_resource_type(
558+
caller_env,
559+
NULL,
560+
jq_state_holder_resource_type_name,
561+
jq_state_holder_resource_dtor,
562+
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER,
563+
NULL);
439564
JQStateCacheEntry_lru_ptr_dynarr_init(&data->caches);
440565
*priv_data = data;
441566
return 0;
@@ -478,6 +603,9 @@ static ErlNifFunc nif_funcs[] = {
478603
for a much longer time than is should).
479604
*/
480605
{"process_json", 2, process_json_nif, ERL_NIF_DIRTY_JOB_CPU_BOUND},
606+
{"create_jq_resource", 0, create_jq_resource_nif, 0},
607+
{"cancel_jq_resource", 1, cancel_jq_resource_nif, 0},
608+
{"process_json_with_jq_resource", 3, process_json_nif, ERL_NIF_DIRTY_JOB_CPU_BOUND},
481609
{"set_filter_program_lru_cache_max_size", 1, set_filter_program_lru_cache_max_size_nif, 0},
482610
{"get_filter_program_lru_cache_max_size", 0, get_filter_program_lru_cache_max_size_nif, 0}
483611
};

c_src/erlang_jq_port.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,14 @@ static bool handle_process_json() {
271271
return false;
272272
} else {
273273
const char* error_str = "error";
274+
// We might have results in the result strings. This can happen
275+
// when we get a result out from jq before an error happens
276+
size_t nr_of_result_objects = String_dynarr_size(&result_strings);
277+
for (size_t i = 0; i < nr_of_result_objects; i++) {
278+
String result = String_dynarr_item_at(&result_strings, i);
279+
erljq_free(result.string);
280+
}
281+
String_dynarr_destroy(&result_strings);
274282
if (write_packet((byte*)error_str, strlen(error_str)) <= 0) {
275283
goto error_on_write_out_1;
276284
}

c_src/port_nif_common.c

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ char* err_tags[] = {
1313
"jq_err_badarg", // 3
1414
"jq_err_compile", // 4
1515
"jq_err_parse", // 5
16-
"jq_err_process" // 6
16+
"jq_err_process", // 6
17+
"timeout" // 7
1718
};
1819
static const char* ERR_MSG_COULD_NOT_INIT = "jq_init: Could not initialize jq";
1920
static const char* ERR_MSG_COMPILATION_FAILED = "Compilation of jq filter failed";
@@ -143,23 +144,38 @@ int process_json_common(
143144
String_dynarr_push(result_strings, port_res_str);
144145
jv_free(res_jv_str);
145146
}
146-
147-
if (jv_invalid_has_msg(jv_copy(result))) {
147+
if (jq_canceled(jq)) {
148+
// Use define for format because the compiler complains if its
149+
// not a literal
150+
#define ERL_JQ_ERR_MSG \
151+
"jq program canceled as it took too long time to execute"
152+
size_t binsz =
153+
snprintf(NULL, 0, ERL_JQ_ERR_MSG) + 1;
154+
char* bin_data = erljq_alloc(binsz);
155+
snprintf(bin_data, binsz, ERL_JQ_ERR_MSG);
156+
#undef ERL_JQ_ERR_MSG
157+
*error_msg_wb = bin_data;
158+
ret = JQ_ERROR_TIMEOUT;
159+
} else if (jv_invalid_has_msg(jv_copy(result))) {
148160
// Uncaught jq exception
149161
jv msg = jv_invalid_get_msg(jv_copy(result));
150162
if (jv_get_kind(msg) == JV_KIND_STRING) {
163+
#define ERL_JQ_ERR_MSG "jq error: %s\n"
151164
size_t binsz =
152-
snprintf(NULL, 0, "jq error: %s\n", jv_string_value(msg)) + 1;
165+
snprintf(NULL, 0, ERL_JQ_ERR_MSG, jv_string_value(msg)) + 1;
153166
char* bin_data = erljq_alloc(binsz);
154-
snprintf(bin_data, binsz, "jq error: %s\n", jv_string_value(msg));
167+
snprintf(bin_data, binsz, ERL_JQ_ERR_MSG, jv_string_value(msg));
168+
#undef ERL_JQ_ERR_MSG
155169
*error_msg_wb = bin_data;
156170
} else {
157171
msg = jv_dump_string(msg, 0);
158-
size_t binsz = snprintf(NULL, 0, "jq error (not a string): %s\n",
172+
#define ERL_JQ_ERR_MSG "jq error (not a string): %s\n"
173+
size_t binsz = snprintf(NULL, 0, ERL_JQ_ERR_MSG,
159174
jv_string_value(msg)) + 1;
160175
char* bin_data = erljq_alloc(binsz);
161-
snprintf(bin_data, binsz, "jq error (not a string): %s\n",
176+
snprintf(bin_data, binsz, ERL_JQ_ERR_MSG,
162177
jv_string_value(msg));
178+
#undef ERL_JQ_ERR_MSG
163179
*error_msg_wb = bin_data;
164180
}
165181
ret = JQ_ERROR_PROCESS;

c_src/port_nif_common.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ enum {
1919
JQ_ERROR_BADARG = 3,
2020
JQ_ERROR_COMPILE = 4,
2121
JQ_ERROR_PARSE = 5,
22-
JQ_ERROR_PROCESS = 6
22+
JQ_ERROR_PROCESS = 6,
23+
JQ_ERROR_TIMEOUT = 7
2324
};
2425

2526
extern char* err_tags[];

0 commit comments

Comments
 (0)