Skip to content

Commit cca984f

Browse files
committed
add support for WriteBatchWithIndex
1 parent d57c2c4 commit cca984f

File tree

8 files changed

+845
-31
lines changed

8 files changed

+845
-31
lines changed

RocksDbSharp/Iterator.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,17 @@ public void Dispose()
4242
}
4343
}
4444

45+
/// <summary>
46+
/// Detach the iterator from its handle but don't dispose the handle
47+
/// </summary>
48+
/// <returns></returns>
49+
public IntPtr Detach()
50+
{
51+
var r = handle;
52+
handle = IntPtr.Zero;
53+
return r;
54+
}
55+
4556
public bool Valid()
4657
{
4758
return Native.Instance.rocksdb_iter_valid(handle);

RocksDbSharp/Native.Marshaled.cs

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,33 @@ public void rocksdb_writebatch_put(IntPtr writeOptions, string key, string val,
362362
}
363363
}
364364

365+
public void rocksdb_writebatch_wi_put(IntPtr writeOptions, string key, string val, Encoding encoding)
366+
{
367+
unsafe
368+
{
369+
if (encoding == null)
370+
encoding = Encoding.UTF8;
371+
fixed (char* k = key, v = val)
372+
{
373+
int klength = key.Length;
374+
int vlength = val.Length;
375+
int bklength = encoding.GetByteCount(k, klength);
376+
int bvlength = encoding.GetByteCount(v, vlength);
377+
var buffer = Marshal.AllocHGlobal(bklength + bvlength);
378+
byte* bk = (byte*)buffer.ToPointer();
379+
encoding.GetBytes(k, klength, bk, bklength);
380+
byte* bv = bk + bklength;
381+
encoding.GetBytes(v, vlength, bv, bvlength);
382+
383+
rocksdb_writebatch_wi_put(writeOptions, bk, (ulong)bklength, bv, (ulong)bvlength);
384+
#if DEBUG
385+
Zero(bk, bklength);
386+
#endif
387+
Marshal.FreeHGlobal(buffer);
388+
}
389+
}
390+
}
391+
365392
public void rocksdb_iter_seek(
366393
IntPtr iter,
367394
string key,
@@ -463,6 +490,16 @@ public byte[] rocksdb_writebatch_data(IntPtr wbHandle)
463490
return data;
464491
}
465492

493+
public byte[] rocksdb_writebatch_wi_data(IntPtr wbHandle)
494+
{
495+
var resultPtr = rocksdb_writebatch_wi_data(wbHandle, out ulong size);
496+
var data = new byte[size];
497+
Marshal.Copy(resultPtr, data, 0, (int)size);
498+
// Do not free this memory because it is owned by the write batch and will be freed when it is disposed
499+
// rocksdb_free(resultPtr);
500+
return data;
501+
}
502+
466503
public int rocksdb_writebatch_data(IntPtr wbHandle, byte[] buffer, int offset, int length)
467504
{
468505
var resultPtr = rocksdb_writebatch_data(wbHandle, out ulong size);
@@ -479,6 +516,22 @@ public int rocksdb_writebatch_data(IntPtr wbHandle, byte[] buffer, int offset, i
479516
return (int)size;
480517
}
481518

519+
public int rocksdb_writebatch_wi_data(IntPtr wbHandle, byte[] buffer, int offset, int length)
520+
{
521+
var resultPtr = rocksdb_writebatch_wi_data(wbHandle, out ulong size);
522+
bool fits = (int)size <= length;
523+
if (!fits)
524+
{
525+
// Do not free this memory because it is owned by the write batch and will be freed when it is disposed
526+
// rocksdb_free(resultPtr);
527+
return -1;
528+
}
529+
Marshal.Copy(resultPtr, buffer, offset, (int)size);
530+
// Do not free this memory because it is owned by the write batch and will be freed when it is disposed
531+
// rocksdb_free(resultPtr);
532+
return (int)size;
533+
}
534+
482535
public string rocksdb_property_value_string(IntPtr db, string propname)
483536
{
484537
return MarshalNullTermAsciiStr(rocksdb_property_value(db, propname));
@@ -522,5 +575,126 @@ public unsafe void rocksdb_sstfilewriter_add(
522575
}
523576
}
524577
}
578+
579+
public string rocksdb_writebatch_wi_get_from_batch(
580+
IntPtr wb,
581+
IntPtr options,
582+
string key,
583+
out IntPtr errptr,
584+
ColumnFamilyHandle cf = null,
585+
Encoding encoding = null)
586+
{
587+
if (encoding == null)
588+
encoding = Encoding.UTF8;
589+
unsafe
590+
{
591+
fixed (char* k = key)
592+
{
593+
int klength = key.Length;
594+
int bklength = encoding.GetByteCount(k, klength);
595+
var buffer = Marshal.AllocHGlobal(bklength);
596+
byte* bk = (byte*)buffer.ToPointer();
597+
encoding.GetBytes(k, klength, bk, bklength);
598+
599+
var resultPtr = cf == null
600+
? rocksdb_writebatch_wi_get_from_batch(wb, options, bk, (ulong)bklength, out ulong bvlength, out errptr)
601+
: rocksdb_writebatch_wi_get_from_batch_cf(wb, options, cf.Handle, bk, (ulong)bklength, out bvlength, out errptr);
602+
#if DEBUG
603+
Zero(bk, bklength);
604+
#endif
605+
Marshal.FreeHGlobal(buffer);
606+
607+
if (errptr != IntPtr.Zero)
608+
return null;
609+
if (resultPtr == IntPtr.Zero)
610+
return null;
611+
612+
return MarshalAndFreeRocksDbString(resultPtr, (long)bvlength, encoding);
613+
}
614+
}
615+
}
616+
617+
public byte[] rocksdb_writebatch_wi_get_from_batch(
618+
IntPtr wb,
619+
IntPtr options,
620+
byte[] key,
621+
ulong keyLength,
622+
out IntPtr errptr,
623+
ColumnFamilyHandle cf = null)
624+
{
625+
var resultPtr = cf == null
626+
? rocksdb_writebatch_wi_get_from_batch(wb, options, key, keyLength, out ulong valueLength, out errptr)
627+
: rocksdb_writebatch_wi_get_from_batch_cf(wb, options, cf.Handle, key, keyLength, out valueLength, out errptr);
628+
if (errptr != IntPtr.Zero)
629+
return null;
630+
if (resultPtr == IntPtr.Zero)
631+
return null;
632+
var result = new byte[valueLength];
633+
Marshal.Copy(resultPtr, result, 0, (int)valueLength);
634+
rocksdb_free(resultPtr);
635+
return result;
636+
}
637+
638+
public string rocksdb_writebatch_wi_get_from_batch_and_db(
639+
IntPtr wb,
640+
IntPtr db,
641+
IntPtr read_options,
642+
string key,
643+
out IntPtr errptr,
644+
ColumnFamilyHandle cf = null,
645+
Encoding encoding = null)
646+
{
647+
if (encoding == null)
648+
encoding = Encoding.UTF8;
649+
unsafe
650+
{
651+
fixed (char* k = key)
652+
{
653+
int klength = key.Length;
654+
int bklength = encoding.GetByteCount(k, klength);
655+
var buffer = Marshal.AllocHGlobal(bklength);
656+
byte* bk = (byte*)buffer.ToPointer();
657+
encoding.GetBytes(k, klength, bk, bklength);
658+
659+
var resultPtr = cf == null
660+
? rocksdb_writebatch_wi_get_from_batch_and_db(wb, db, read_options, bk, (ulong)bklength, out ulong bvlength, out errptr)
661+
: rocksdb_writebatch_wi_get_from_batch_and_db_cf(wb, db, read_options, cf.Handle, bk, (ulong)bklength, out bvlength, out errptr);
662+
#if DEBUG
663+
Zero(bk, bklength);
664+
#endif
665+
Marshal.FreeHGlobal(buffer);
666+
667+
if (errptr != IntPtr.Zero)
668+
return null;
669+
if (resultPtr == IntPtr.Zero)
670+
return null;
671+
672+
return MarshalAndFreeRocksDbString(resultPtr, (long)bvlength, encoding);
673+
}
674+
}
675+
}
676+
677+
public byte[] rocksdb_writebatch_wi_get_from_batch_and_db(
678+
IntPtr wb,
679+
IntPtr db,
680+
IntPtr read_options,
681+
byte[] key,
682+
ulong keyLength,
683+
out IntPtr errptr,
684+
ColumnFamilyHandle cf = null)
685+
{
686+
var resultPtr = cf == null
687+
? rocksdb_writebatch_wi_get_from_batch_and_db(wb, db, read_options, key, keyLength, out ulong valueLength, out errptr)
688+
: rocksdb_writebatch_wi_get_from_batch_and_db_cf(wb, db, read_options, cf.Handle, key, keyLength, out valueLength, out errptr);
689+
if (errptr != IntPtr.Zero)
690+
return null;
691+
if (resultPtr == IntPtr.Zero)
692+
return null;
693+
var result = new byte[valueLength];
694+
Marshal.Copy(resultPtr, result, 0, (int)valueLength);
695+
rocksdb_free(resultPtr);
696+
return result;
697+
}
698+
525699
}
526700
}

RocksDbSharp/Native.Raw.cs

Lines changed: 70 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -532,11 +532,19 @@ public abstract void rocksdb_writebatch_wi_destroy(
532532
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b);
533533
public abstract void rocksdb_writebatch_wi_clear(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b);
534534
public abstract int rocksdb_writebatch_wi_count(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b);
535+
public abstract void rocksdb_writebatch_wi_put(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b,
536+
/*(const char*)*/ byte[] key,
537+
/*(size_t)*/ ulong klen,
538+
/*(const char*)*/ byte[] val,
539+
/*(size_t)*/ ulong vlen);
535540
public abstract unsafe void rocksdb_writebatch_wi_put(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b,
536541
/*(const char*)*/ byte* key,
537542
/*(size_t)*/ ulong klen,
538543
/*(const char*)*/ byte* val,
539544
/*(size_t)*/ ulong vlen);
545+
public abstract void rocksdb_writebatch_wi_put_cf(
546+
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
547+
/*(const char*)*/ byte[] key, /*(size_t)*/ ulong klen, /*(const char*)*/ byte[] val, /*(size_t)*/ ulong vlen);
540548
public abstract unsafe void rocksdb_writebatch_wi_put_cf(
541549
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
542550
/*(const char*)*/ byte* key, /*(size_t)*/ ulong klen, /*(const char*)*/ byte* val, /*(size_t)*/ ulong vlen);
@@ -549,11 +557,19 @@ public abstract void rocksdb_writebatch_wi_putv_cf(
549557
int num_keys, /*(const char* const*)*/ IntPtr keys_list, /*(const size_t*)*/ IntPtr keys_list_sizes,
550558
int num_values, /*(const char* const*)*/ IntPtr values_list,
551559
/*(const size_t*)*/ IntPtr values_list_sizes);
560+
public abstract void rocksdb_writebatch_wi_merge(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b,
561+
/*(const char*)*/ byte[] key,
562+
/*(size_t)*/ ulong klen,
563+
/*(const char*)*/ byte[] val,
564+
/*(size_t)*/ ulong vlen);
552565
public abstract unsafe void rocksdb_writebatch_wi_merge(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b,
553566
/*(const char*)*/ byte* key,
554567
/*(size_t)*/ ulong klen,
555568
/*(const char*)*/ byte* val,
556569
/*(size_t)*/ ulong vlen);
570+
public abstract void rocksdb_writebatch_wi_merge_cf(
571+
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
572+
/*(const char*)*/ byte[] key, /*(size_t)*/ ulong klen, /*(const char*)*/ byte[] val, /*(size_t)*/ ulong vlen);
557573
public abstract unsafe void rocksdb_writebatch_wi_merge_cf(
558574
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
559575
/*(const char*)*/ byte* key, /*(size_t)*/ ulong klen, /*(const char*)*/ byte* val, /*(size_t)*/ ulong vlen);
@@ -566,9 +582,15 @@ public abstract void rocksdb_writebatch_wi_mergev_cf(
566582
int num_keys, /*(const char* const*)*/ IntPtr keys_list, /*(const size_t*)*/ IntPtr keys_list_sizes,
567583
int num_values, /*(const char* const*)*/ IntPtr values_list,
568584
/*(const size_t*)*/ IntPtr values_list_sizes);
585+
public abstract void rocksdb_writebatch_wi_delete(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b,
586+
/*(const char*)*/ byte[] key,
587+
/*(size_t)*/ ulong klen);
569588
public abstract unsafe void rocksdb_writebatch_wi_delete(/*(rocksdb_writebatch_wi_t*)*/ IntPtr b,
570589
/*(const char*)*/ byte* key,
571590
/*(size_t)*/ ulong klen);
591+
public abstract void rocksdb_writebatch_wi_delete_cf(
592+
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
593+
/*(const char*)*/ byte[] key, /*(size_t)*/ ulong klen);
572594
public abstract unsafe void rocksdb_writebatch_wi_delete_cf(
573595
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
574596
/*(const char*)*/ byte* key, /*(size_t)*/ ulong klen);
@@ -578,9 +600,16 @@ public abstract void rocksdb_writebatch_wi_deletev(
578600
public abstract void rocksdb_writebatch_wi_deletev_cf(
579601
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
580602
int num_keys, /*(const char* const*)*/ IntPtr keys_list, /*(const size_t*)*/ IntPtr keys_list_sizes);
603+
public abstract void rocksdb_writebatch_wi_delete_range(
604+
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(const char*)*/ byte[] start_key, /*(size_t)*/ ulong start_key_len,
605+
/*(const char*)*/ byte[] end_key, /*(size_t)*/ ulong end_key_len);
581606
public abstract unsafe void rocksdb_writebatch_wi_delete_range(
582607
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(const char*)*/ byte* start_key, /*(size_t)*/ ulong start_key_len,
583608
/*(const char*)*/ byte* end_key, /*(size_t)*/ ulong end_key_len);
609+
public abstract void rocksdb_writebatch_wi_delete_range_cf(
610+
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
611+
/*(const char*)*/ byte[] start_key, /*(size_t)*/ ulong start_key_len, /*(const char*)*/ byte[] end_key,
612+
/*(size_t)*/ ulong end_key_len);
584613
public abstract unsafe void rocksdb_writebatch_wi_delete_range_cf(
585614
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
586615
/*(const char*)*/ byte* start_key, /*(size_t)*/ ulong start_key_len, /*(const char*)*/ byte* end_key,
@@ -594,53 +623,83 @@ public abstract void rocksdb_writebatch_wi_delete_rangev_cf(
594623
int num_keys, /*(const char* const*)*/ IntPtr start_keys_list,
595624
/*(const size_t*)*/ IntPtr start_keys_list_sizes, /*(const char* const*)*/ IntPtr end_keys_list,
596625
/*(const size_t*)*/ IntPtr end_keys_list_sizes);
626+
public abstract void rocksdb_writebatch_wi_put_log_data(
627+
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(const char*)*/ byte[] blob, /*(size_t)*/ ulong len);
597628
public abstract void rocksdb_writebatch_wi_put_log_data(
598629
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(const char*)*/ IntPtr blob, /*(size_t)*/ ulong len);
599630
public abstract void rocksdb_writebatch_wi_iterate(
600631
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b,
601632
/*(void*)*/ IntPtr state,
602-
/*(void (*put)(void*, const char* k, size_t klen, const char* v, size_t vlen))*/ IntPtr put,
603-
/*(void (*deleted)(void*, const char* k, size_t klen))*/ IntPtr deleted);
633+
/*(void (*put)(void*, const char* k, size_t klen, const char* v, size_t vlen))*/ WriteBatchIteratePutCallback put,
634+
/*(void (*deleted)(void*, const char* k, size_t klen))*/ WriteBatchIterateDeleteCallback deleted);
604635
public abstract /*(const char*)*/ IntPtr rocksdb_writebatch_wi_data(
605636
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b,
606637
/*(size_t*)*/ out ulong size);
607638
public abstract void rocksdb_writebatch_wi_set_save_point(
608639
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b);
609640
public abstract void rocksdb_writebatch_wi_rollback_to_save_point(
610-
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(char**)*/ IntPtr errptr);
641+
/*(rocksdb_writebatch_wi_t*)*/ IntPtr b, /*(char**)*/ out IntPtr errptr);
642+
public abstract /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch(
643+
/*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi,
644+
/*(const rocksdb_options_t*)*/ IntPtr options,
645+
/*(const char*)*/ byte[] key, /*(size_t)*/ ulong keylen,
646+
/*(size_t*)*/ out ulong vallen,
647+
/*(char**)*/ out IntPtr errptr);
611648
public abstract unsafe /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch(
612649
/*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi,
613650
/*(const rocksdb_options_t*)*/ IntPtr options,
614651
/*(const char*)*/ byte* key, /*(size_t)*/ ulong keylen,
615652
/*(size_t*)*/ out ulong vallen,
616-
/*(char**)*/ IntPtr errptr);
653+
/*(char**)*/ out IntPtr errptr);
654+
public abstract /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch_cf(
655+
/*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi,
656+
/*(const rocksdb_options_t*)*/ IntPtr options,
657+
/*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
658+
/*(const char*)*/ byte[] key, /*(size_t)*/ ulong keylen,
659+
/*(size_t*)*/ out ulong vallen,
660+
/*(char**)*/ out IntPtr errptr);
617661
public abstract unsafe /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch_cf(
618662
/*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi,
619663
/*(const rocksdb_options_t*)*/ IntPtr options,
620664
/*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
621665
/*(const char*)*/ byte* key, /*(size_t)*/ ulong keylen,
622666
/*(size_t*)*/ out ulong vallen,
623-
/*(char**)*/ IntPtr errptr);
667+
/*(char**)*/ out IntPtr errptr);
668+
public abstract /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch_and_db(
669+
/*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi,
670+
/*(rocksdb_t*)*/ IntPtr db,
671+
/*(const rocksdb_readoptions_t*)*/ IntPtr read_options,
672+
/*(const char*)*/ byte[] key, /*(size_t)*/ ulong keylen,
673+
/*(size_t*)*/ out ulong vallen,
674+
/*(char**)*/ out IntPtr errptr);
624675
public abstract unsafe /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch_and_db(
625676
/*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi,
626677
/*(rocksdb_t*)*/ IntPtr db,
627-
/*(const rocksdb_readoptions_t*)*/ IntPtr options,
678+
/*(const rocksdb_readoptions_t*)*/ IntPtr read_options,
628679
/*(const char*)*/ byte* key, /*(size_t)*/ ulong keylen,
629680
/*(size_t*)*/ out ulong vallen,
630-
/*(char**)*/ IntPtr errptr);
681+
/*(char**)*/ out IntPtr errptr);
682+
public abstract /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch_and_db_cf(
683+
/*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi,
684+
/*(rocksdb_t*)*/ IntPtr db,
685+
/*(const rocksdb_readoptions_t*)*/ IntPtr read_options,
686+
/*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
687+
/*(const char*)*/ byte[] key, /*(size_t)*/ ulong keylen,
688+
/*(size_t*)*/ out ulong vallen,
689+
/*(char**)*/ out IntPtr errptr);
631690
public abstract unsafe /*(char*)*/ IntPtr rocksdb_writebatch_wi_get_from_batch_and_db_cf(
632691
/*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi,
633692
/*(rocksdb_t*)*/ IntPtr db,
634-
/*(const rocksdb_readoptions_t*)*/ IntPtr options,
693+
/*(const rocksdb_readoptions_t*)*/ IntPtr read_options,
635694
/*(rocksdb_column_family_handle_t*)*/ IntPtr column_family,
636695
/*(const char*)*/ byte* key, /*(size_t)*/ ulong keylen,
637696
/*(size_t*)*/ out ulong vallen,
638-
/*(char**)*/ IntPtr errptr);
697+
/*(char**)*/ out IntPtr errptr);
639698
public abstract void rocksdb_write_writebatch_wi(
640699
/*(rocksdb_t*)*/ IntPtr db,
641-
/*(const rocksdb_writeoptions_t*)*/ IntPtr options,
700+
/*(const rocksdb_writeoptions_t*)*/ IntPtr write_options,
642701
/*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi,
643-
/*(char**)*/ IntPtr errptr);
702+
/*(char**)*/ out IntPtr errptr);
644703
public abstract /*(rocksdb_iterator_t*)*/ IntPtr rocksdb_writebatch_wi_create_iterator_with_base(
645704
/*(rocksdb_writebatch_wi_t*)*/ IntPtr wbwi,
646705
/*(rocksdb_iterator_t*)*/ IntPtr base_iterator);

0 commit comments

Comments
 (0)