Skip to content

Commit de35880

Browse files
committed
feat: csv supports null and default value
Signed-off-by: OxalisCu <[email protected]>
1 parent 75f84ba commit de35880

File tree

2 files changed

+36
-13
lines changed

2 files changed

+36
-13
lines changed

pymilvus/bulk_writer/buffer.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ def _persist_parquet(self, local_path: str, **kwargs):
289289

290290
def _persist_csv(self, local_path: str, **kwargs):
291291
sep = self._config.get("sep", ",")
292-
# nullkey is not supported in csv now
292+
nullkey = self._config.get("nullkey", "")
293293

294294
header = list(self._buffer.keys())
295295
data = pd.DataFrame(columns=header)
@@ -307,17 +307,14 @@ def _persist_csv(self, local_path: str, **kwargs):
307307
# 2. or convert arr into a string using json.dumps(arr) first and then add it to df
308308
# I choose method 2 here
309309
if field_schema.dtype in {
310-
DataType.JSON,
311-
DataType.ARRAY,
312310
DataType.SPARSE_FLOAT_VECTOR,
313311
DataType.BINARY_VECTOR,
314312
DataType.FLOAT_VECTOR,
315313
}:
316-
dt = np.dtype("str")
317314
arr = []
318315
for val in v:
319316
arr.append(json.dumps(val))
320-
data[k] = pd.Series(arr, dtype=dt)
317+
data[k] = pd.Series(arr, dtype=np.dtype("str"))
321318
elif field_schema.dtype in {DataType.FLOAT16_VECTOR, DataType.BFLOAT16_VECTOR}:
322319
# special process for float16 vector, the self._buffer stores bytes for
323320
# float16 vector, convert the bytes to float list
@@ -330,19 +327,31 @@ def _persist_csv(self, local_path: str, **kwargs):
330327
for val in v:
331328
arr.append(json.dumps(np.frombuffer(val, dtype=dt).tolist()))
332329
data[k] = pd.Series(arr, dtype=np.dtype("str"))
330+
elif field_schema.dtype in {
331+
DataType.JSON,
332+
DataType.ARRAY,
333+
}:
334+
arr = []
335+
for val in v:
336+
if val is None:
337+
arr.append(nullkey)
338+
else:
339+
arr.append(json.dumps(val))
340+
data[k] = pd.Series(arr, dtype=np.dtype("str"))
333341
elif field_schema.dtype in {DataType.BOOL}:
334-
dt = np.dtype("str")
335-
arr = ["true" if x else "false" for x in v]
336-
data[k] = pd.Series(arr, dtype=dt)
337-
elif field_schema.dtype.name in NUMPY_TYPE_CREATOR:
338-
dt = NUMPY_TYPE_CREATOR[field_schema.dtype.name]
339-
data[k] = pd.Series(v, dtype=dt)
342+
arr = []
343+
for val in v:
344+
if val is not None:
345+
arr.append("true" if val else "false")
346+
data[k] = pd.Series(arr, dtype=np.dtype("str"))
340347
else:
341-
data[k] = pd.Series(v)
348+
data[k] = pd.Series(v, dtype=NUMPY_TYPE_CREATOR[field_schema.dtype.name])
342349

343350
file_path = Path(local_path + ".csv")
344351
try:
345-
data.to_csv(file_path, sep=sep, index=False)
352+
# pd.Series will convert None to np.nan,
353+
# so we can use 'na_rep=nullkey' to replace NaN with nullkey
354+
data.to_csv(file_path, sep=sep, na_rep=nullkey, index=False)
346355
except Exception as e:
347356
self._throw(f"Failed to persist file {file_path}, error: {e}")
348357

pymilvus/bulk_writer/bulk_writer.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,20 @@ def _verify_row(self, row: dict):
199199
self._throw(f"The field '{field.name}' is missed in the row")
200200

201201
dtype = DataType(field.dtype)
202+
203+
# deal with null (None)
204+
if field.nullable and row[field.name] is None:
205+
if (
206+
field.default_value is not None
207+
and field.default_value.WhichOneof("data") is not None
208+
):
209+
# set default value
210+
data_type = field.default_value.WhichOneof("data")
211+
row[field.name] = getattr(field.default_value, data_type)
212+
else:
213+
# skip field check if the field is null
214+
continue
215+
202216
if dtype in {
203217
DataType.BINARY_VECTOR,
204218
DataType.FLOAT_VECTOR,

0 commit comments

Comments
 (0)