Skip to content

Commit 49abec3

Browse files
authored
improvements to produce/consume exceptions (#839)
* improvements to produce/consume exceptions * review changes
1 parent 18a3ffe commit 49abec3

File tree

16 files changed

+144
-211
lines changed

16 files changed

+144
-211
lines changed

src/Confluent.Kafka/Consumer.cs

Lines changed: 47 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ public void StoreOffset(ConsumeResult<TKey, TValue> result)
422422
/// `enable.auto.offset.store` must be set to "false" when using this API.
423423
/// </remarks>
424424
/// <param name="offset">
425-
/// The offset to be commited.
425+
/// The offset to be committed.
426426
/// </param>
427427
/// <exception cref="Confluent.Kafka.KafkaException">
428428
/// Thrown if the request failed.
@@ -1014,7 +1014,7 @@ private ConsumeResult<K, V> ConsumeImpl<K,V>(
10141014
new SerializationContext(MessageComponentType.Key, topic));
10151015
}
10161016
}
1017-
catch (Exception exception)
1017+
catch (Exception ex)
10181018
{
10191019
throw new ConsumeException(
10201020
new ConsumeResult<byte[], byte[]>
@@ -1030,7 +1030,7 @@ private ConsumeResult<K, V> ConsumeImpl<K,V>(
10301030
IsPartitionEOF = false
10311031
},
10321032
new Error(ErrorCode.Local_KeyDeserialization),
1033-
exception);
1033+
ex);
10341034
}
10351035

10361036
V val;
@@ -1046,7 +1046,7 @@ private ConsumeResult<K, V> ConsumeImpl<K,V>(
10461046
new SerializationContext(MessageComponentType.Value, topic));
10471047
}
10481048
}
1049-
catch (Exception exception)
1049+
catch (Exception ex)
10501050
{
10511051
throw new ConsumeException(
10521052
new ConsumeResult<byte[], byte[]>
@@ -1062,7 +1062,7 @@ private ConsumeResult<K, V> ConsumeImpl<K,V>(
10621062
IsPartitionEOF = false
10631063
},
10641064
new Error(ErrorCode.Local_ValueDeserialization),
1065-
exception);
1065+
ex);
10661066
}
10671067

10681068
return new ConsumeResult<K, V>
@@ -1100,19 +1100,35 @@ private ConsumeResult<TKey, TValue> ConsumeViaBytes(int millisecondsTimeout)
11001100
};
11011101
}
11021102

1103-
TKey key = keyDeserializer != null
1104-
? keyDeserializer.Deserialize(rawResult.Key, rawResult.Key == null, new SerializationContext(MessageComponentType.Key, rawResult.Topic))
1105-
: Task.Run(async () => await asyncKeyDeserializer.DeserializeAsync(new ReadOnlyMemory<byte>(rawResult.Key), rawResult.Key == null, new SerializationContext(MessageComponentType.Key, rawResult.Topic)))
1106-
.ConfigureAwait(continueOnCapturedContext: false)
1107-
.GetAwaiter()
1108-
.GetResult();
1103+
TKey key;
1104+
try
1105+
{
1106+
key = keyDeserializer != null
1107+
? keyDeserializer.Deserialize(rawResult.Key, rawResult.Key == null, new SerializationContext(MessageComponentType.Key, rawResult.Topic))
1108+
: Task.Run(async () => await asyncKeyDeserializer.DeserializeAsync(new ReadOnlyMemory<byte>(rawResult.Key), rawResult.Key == null, new SerializationContext(MessageComponentType.Key, rawResult.Topic)))
1109+
.ConfigureAwait(continueOnCapturedContext: false)
1110+
.GetAwaiter()
1111+
.GetResult();
1112+
}
1113+
catch (Exception ex)
1114+
{
1115+
throw new ConsumeException(rawResult, new Error(ErrorCode.Local_KeyDeserialization), ex);
1116+
}
11091117

1110-
TValue val = valueDeserializer != null
1111-
? valueDeserializer.Deserialize(rawResult.Value, rawResult.Value == null, new SerializationContext(MessageComponentType.Value, rawResult.Topic))
1112-
: Task.Run(async () => await asyncValueDeserializer.DeserializeAsync(new ReadOnlyMemory<byte>(rawResult.Value), rawResult == null, new SerializationContext(MessageComponentType.Value, rawResult.Topic)))
1113-
.ConfigureAwait(continueOnCapturedContext: false)
1114-
.GetAwaiter()
1115-
.GetResult();
1118+
TValue val;
1119+
try
1120+
{
1121+
val = valueDeserializer != null
1122+
? valueDeserializer.Deserialize(rawResult.Value, rawResult.Value == null, new SerializationContext(MessageComponentType.Value, rawResult.Topic))
1123+
: Task.Run(async () => await asyncValueDeserializer.DeserializeAsync(new ReadOnlyMemory<byte>(rawResult.Value), rawResult == null, new SerializationContext(MessageComponentType.Value, rawResult.Topic)))
1124+
.ConfigureAwait(continueOnCapturedContext: false)
1125+
.GetAwaiter()
1126+
.GetResult();
1127+
}
1128+
catch (Exception ex)
1129+
{
1130+
throw new ConsumeException(rawResult, new Error(ErrorCode.Local_ValueDeserialization), ex);
1131+
}
11161132

11171133
return new ConsumeResult<TKey, TValue>
11181134
{
@@ -1140,10 +1156,18 @@ private ConsumeResult<TKey, TValue> ConsumeViaBytes(int millisecondsTimeout)
11401156
/// The consume result.
11411157
/// </returns>
11421158
/// <remarks>
1143-
/// OnPartitionsAssigned/Revoked and OnOffsetsCommitted events may
1159+
/// The partitions assigned/revoked and offsets committed handlers may
11441160
/// be invoked as a side-effect of calling this method (on the same
11451161
/// thread).
11461162
/// </remarks>
1163+
/// <exception cref="ConsumeException">
1164+
/// Thrown when a call to this method is unsuccessful for any reason
1165+
/// (except cancellation by user). Inspect the Error property of the
1166+
/// exception for detailed information.
1167+
/// </exception>
1168+
/// <exception cref="OperationCanceledException">
1169+
/// Thrown on cancellation.
1170+
/// </exception>
11471171
public ConsumeResult<TKey, TValue> Consume(CancellationToken cancellationToken = default(CancellationToken))
11481172
{
11491173
while (true)
@@ -1172,10 +1196,14 @@ private ConsumeResult<TKey, TValue> ConsumeViaBytes(int millisecondsTimeout)
11721196
/// The consume result.
11731197
/// </returns>
11741198
/// <remarks>
1175-
/// OnPartitionsAssigned/Revoked and OnOffsetsCommitted events may
1199+
/// The partitions assigned/revoked and offsets committed handlers may
11761200
/// be invoked as a side-effect of calling this method (on the same
11771201
/// thread).
11781202
/// </remarks>
1203+
/// <exception cref="ConsumeException">
1204+
/// Thrown when a call to this method is unsuccessful for any reason.
1205+
/// Inspect the Error property of the exception for detailed information.
1206+
/// </exception>
11791207
public ConsumeResult<TKey, TValue> Consume(TimeSpan timeout)
11801208
=> (keyDeserializer != null && valueDeserializer != null)
11811209
? ConsumeImpl<TKey, TValue>(timeout.TotalMillisecondsAsInt(), keyDeserializer, valueDeserializer) // fast path for simple case

src/Confluent.Kafka/DeserializationException.cs

Lines changed: 0 additions & 52 deletions
This file was deleted.

src/Confluent.Kafka/Deserializers.cs

Lines changed: 24 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -39,28 +39,17 @@ public string Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationCon
3939
return null;
4040
}
4141

42-
try
43-
{
44-
#if NETCOREAPP2_1
45-
return Encoding.UTF8.GetString(data);
46-
#else
47-
return Encoding.UTF8.GetString(data.ToArray());
48-
#endif
49-
}
50-
catch (Exception e)
51-
{
52-
throw new DeserializationException("Error occured deserializing UTF8 string value", e);
53-
}
42+
#if NETCOREAPP2_1
43+
return Encoding.UTF8.GetString(data);
44+
#else
45+
return Encoding.UTF8.GetString(data.ToArray());
46+
#endif
5447
}
5548
}
5649

5750
/// <summary>
5851
/// Null value deserializer.
5952
/// </summary>
60-
/// <remarks>
61-
/// Unexpected input will result in a
62-
/// <see cref="DeserializationException" />.
63-
/// </remarks>
6453
public static IDeserializer<Null> Null = new NullDeserializer();
6554

6655
private class NullDeserializer : IDeserializer<Null>
@@ -69,7 +58,7 @@ public Null Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationConte
6958
{
7059
if (!isNull)
7160
{
72-
throw new DeserializationException("Deserializer<Null> may only be used to deserialize data that is null.");
61+
throw new ArgumentException("Deserializer<Null> may only be used to deserialize data that is null.");
7362
}
7463

7564
return null;
@@ -98,12 +87,12 @@ public long Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationConte
9887
{
9988
if (isNull)
10089
{
101-
throw new DeserializationException($"Null data encountered deserializing Int64 value.");
90+
throw new ArgumentNullException($"Null data encountered deserializing Int64 value.");
10291
}
10392

10493
if (data.Length != 8)
10594
{
106-
throw new DeserializationException($"Deserializer<Long> encountered data of length {data.Length}. Expecting data length to be 8.");
95+
throw new ArgumentException($"Deserializer<Long> encountered data of length {data.Length}. Expecting data length to be 8.");
10796
}
10897

10998
// network byte order -> big endian -> most significant byte in the smallest address.
@@ -130,12 +119,12 @@ public int Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContex
130119
{
131120
if (isNull)
132121
{
133-
throw new DeserializationException($"Null data encountered deserializing an Int32 value");
122+
throw new ArgumentNullException($"Null data encountered deserializing Int32 value");
134123
}
135124

136125
if (data.Length != 4)
137126
{
138-
throw new DeserializationException($"Deserializer<Int32> encountered data of length {data.Length}. Expecting data length to be 4.");
127+
throw new ArgumentException($"Deserializer<Int32> encountered data of length {data.Length}. Expecting data length to be 4.");
139128
}
140129

141130
// network byte order -> big endian -> most significant byte in the smallest address.
@@ -158,12 +147,12 @@ public float Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationCont
158147
{
159148
if (isNull)
160149
{
161-
throw new DeserializationException($"Null data encountered deserializing an float value.");
150+
throw new ArgumentNullException($"Null data encountered deserializing float value.");
162151
}
163152

164153
if (data.Length != 4)
165154
{
166-
throw new DeserializationException($"Deserializer<float> encountered data of length {data.Length}. Expecting data length to be 4.");
155+
throw new ArgumentException($"Deserializer<float> encountered data of length {data.Length}. Expecting data length to be 4.");
167156
}
168157

169158
// network byte order -> big endian -> most significant byte in the smallest address.
@@ -182,18 +171,11 @@ public float Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationCont
182171
}
183172
else
184173
{
185-
try
186-
{
187-
#if NETCOREAPP2_1
188-
return BitConverter.ToSingle(data);
189-
#else
190-
return BitConverter.ToSingle(data.ToArray(), 0);
191-
#endif
192-
}
193-
catch (Exception e)
194-
{
195-
throw new DeserializationException("Error occured deserializing float value.", e);
196-
}
174+
#if NETCOREAPP2_1
175+
return BitConverter.ToSingle(data);
176+
#else
177+
return BitConverter.ToSingle(data.ToArray(), 0);
178+
#endif
197179
}
198180
}
199181
}
@@ -209,12 +191,12 @@ public double Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationCon
209191
{
210192
if (isNull)
211193
{
212-
throw new DeserializationException($"Null data encountered deserializing an double value.");
194+
throw new ArgumentNullException($"Null data encountered deserializing double value.");
213195
}
214196

215197
if (data.Length != 8)
216198
{
217-
throw new DeserializationException($"Deserializer<double> encountered data of length {data.Length}. Expecting data length to be 8.");
199+
throw new ArgumentException($"Deserializer<double> encountered data of length {data.Length}. Expecting data length to be 8.");
218200
}
219201

220202
// network byte order -> big endian -> most significant byte in the smallest address.
@@ -237,18 +219,11 @@ public double Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationCon
237219
}
238220
else
239221
{
240-
try
241-
{
242-
#if NETCOREAPP2_1
243-
return BitConverter.ToDouble(data);
244-
#else
245-
return BitConverter.ToDouble(data.ToArray(), 0);
246-
#endif
247-
}
248-
catch (Exception e)
249-
{
250-
throw new DeserializationException("Error occured deserializing double value.", e);
251-
}
222+
#if NETCOREAPP2_1
223+
return BitConverter.ToDouble(data);
224+
#else
225+
return BitConverter.ToDouble(data.ToArray(), 0);
226+
#endif
252227
}
253228
}
254229
}

0 commit comments

Comments
 (0)