Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.avro.{ Conversions, LogicalTypes, Schema => SchemaAvro }
import zio.prelude.NonEmptyMap
import zio.schema.{ Fallback, FieldSet, Schema, StandardType, TypeId }
import zio.stream.ZPipeline
import zio.{ Chunk, Unsafe, ZIO }
import zio.{ Chunk, Unsafe }

object AvroCodec {

Expand All @@ -49,9 +49,8 @@ object AvroCodec {
encoded
}

override def streamEncoder: ZPipeline[Any, Nothing, A, Byte] = ZPipeline.mapChunks { chunk =>
chunk.flatMap(encode)
}
override def streamEncoder: ZPipeline[Any, Nothing, A, Byte] =
ZPipeline.mapChunks(_.flatMap(encode))

override def decode(whole: Chunk[Byte]): Either[DecodeError, A] = {
val datumReader = new GenericDatumReader[Any](avroSchema)
Expand All @@ -60,11 +59,8 @@ object AvroCodec {
decodeValue(decoded, schema)
}

override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] = ZPipeline.mapChunksZIO { chunk =>
ZIO.fromEither(
decode(chunk).map(Chunk(_))
)
}
override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] =
ZPipeline.mapChunksEither(bytes => decode(bytes).map(Chunk.single))

override def encodeGenericRecord(value: A)(implicit schema: Schema[A]): GenericData.Record =
encodeValue(value, schema).asInstanceOf[GenericData.Record]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import zio.prelude.NonEmptyMap
import zio.schema.Schema.GenericRecord
import zio.schema._
import zio.schema.annotation.{ rejectExtraFields, _ }
import zio.schema.codec.DecodeError.ReadError
import zio.schema.codec.JsonCodec.JsonDecoder.schemaDecoder
import zio.stream.{ ZChannel, ZPipeline }
import zio.{ Cause, Chunk, ChunkBuilder, ZIO, ZNothing }

object JsonCodec {

private val streamEncoderSeparator: Chunk[Byte] = Chunk.single('\n'.toByte)

@deprecated(
"""Use JsonCodec.Configuration instead.
JsonCodec.Configuration makes it now possible to configure en-/decoding of empty collection and nulls (Options) independently.""",
Expand Down Expand Up @@ -169,27 +170,23 @@ JsonCodec.Configuration makes it now possible to configure en-/decoding of empty
new BinaryCodec[A] {
override def decode(whole: Chunk[Byte]): Either[DecodeError, A] =
jsonCodec
.decodeJson(
new String(whole.toArray, JsonEncoder.CHARSET)
)
.decodeJson(new String(whole.toArray, JsonEncoder.CHARSET))
.left
.map(failure => DecodeError.ReadError(Cause.empty, failure))

override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] =
ZPipeline.fromChannel(
ZPipeline.utfDecode.channel.mapError(cce => ReadError(Cause.fail(cce), cce.getMessage))
ZPipeline.utfDecode.channel.mapError(cce => DecodeError.ReadError(Cause.fail(cce), cce.getMessage))
) >>> splitOnJsonBoundary >>>
ZPipeline.mapZIO { (s: String) =>
ZIO
.fromEither(jsonCodec.decodeJson(s))
.mapError(failure => DecodeError.ReadError(Cause.empty, failure))
ZPipeline.mapEitherChunked { (s: String) =>
jsonCodec.decodeJson(s).left.map(failure => DecodeError.ReadError(Cause.empty, failure))
}

override def encode(value: A): Chunk[Byte] =
JsonEncoder.charSequenceToByteChunk(jsonCodec.encodeJson(value, None))

override def streamEncoder: ZPipeline[Any, Nothing, A, Byte] =
ZPipeline.mapChunks[A, Chunk[Byte]](_.map(encode)).intersperse(Chunk.single('\n'.toByte)).flattenChunks
ZPipeline.mapChunks[A, Chunk[Byte]](_.map(encode)).intersperse(streamEncoderSeparator).flattenChunks
}

implicit def schemaBasedBinaryCodec[A](implicit schema: Schema[A]): BinaryCodec[A] =
Expand Down Expand Up @@ -324,7 +321,7 @@ JsonCodec.Configuration makes it now possible to configure en-/decoding of empty
)

override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] =
ZPipeline.utfDecode.mapError(cce => ReadError(Cause.fail(cce), cce.getMessage)) >>>
ZPipeline.utfDecode.mapError(cce => DecodeError.ReadError(Cause.fail(cce), cce.getMessage)) >>>
(if (cfg.treatStreamsAsArrays) splitJsonArrayElements else splitOnJsonBoundary) >>>
ZPipeline.mapZIO { (s: String) =>
ZIO.fromEither(JsonDecoder.decode(schema, s, cfg))
Expand Down Expand Up @@ -803,7 +800,7 @@ JsonCodec.Configuration makes it now possible to configure en-/decoding of empty

final def decode[A](schema: Schema[A], json: String, config: Configuration): Either[DecodeError, A] =
schemaDecoder(schema, config).decodeJson(json) match {
case Left(value) => Left(ReadError(Cause.empty, value))
case Left(value) => Left(DecodeError.ReadError(Cause.empty, value))
case Right(value) => Right(value)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.time._
import zio.schema.codec.DecodeError.ReadError
import zio.schema.{ Schema, StandardType }
import zio.stream.ZPipeline
import zio.{ Cause, Chunk, ZIO }
import zio.{ Cause, Chunk }

object MessagePackCodec {
implicit def messagePackCodec[A](implicit schema: Schema[A]): BinaryCodec[A] =
Expand All @@ -28,13 +28,9 @@ object MessagePackCodec {
}

override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] =
ZPipeline.mapChunksZIO { chunk =>
ZIO.fromEither(
decodeChunk(chunk).map(Chunk(_))
)
}
ZPipeline.mapChunksEither(bytes => decodeChunk(bytes).map(Chunk.single))

private def decodeChunk(chunk: Chunk[Byte]) =
private def decodeChunk(chunk: Chunk[Byte]): Either[DecodeError, A] =
new MessagePackDecoder(chunk)
.decode(schema)
.left
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import zio.schema.annotation.fieldDefaultValue
import zio.schema.codec.DecodeError.{ ExtraFields, MalformedField, MissingField }
import zio.schema.codec.ProtobufCodec.Protobuf.WireType.LengthDelimited
import zio.stream.ZPipeline
import zio.{ Cause, Chunk, ChunkBuilder, Unsafe, ZIO }
import zio.{ Cause, Chunk, ChunkBuilder, Unsafe }

object ProtobufCodec {

Expand All @@ -25,15 +25,13 @@ object ProtobufCodec {
new Decoder(whole).decode(schema)

override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] =
ZPipeline.mapChunksZIO(chunk => ZIO.fromEither(new Decoder(chunk).decode(schema).map(Chunk(_))))
ZPipeline.mapChunksEither(bytes => new Decoder(bytes).decode(schema).map(Chunk.single))

override def encode(value: A): Chunk[Byte] =
Encoder.process(schema, value)

override def streamEncoder: ZPipeline[Any, Nothing, A, Byte] =
ZPipeline.mapChunks(
_.flatMap(encode)
)
ZPipeline.mapChunks(_.flatMap(encode))
}

object Protobuf {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import zio.schema.MutableSchemaBasedValueBuilder.{ CreateValueFromSchemaError, R
import zio.schema._
import zio.schema.codec.DecodeError.{ EmptyContent, MalformedFieldWithPath, ReadError, ReadErrorWithPath }
import zio.stream.ZPipeline
import zio.{ Cause, Chunk, Unsafe, ZIO }
import zio.{ Cause, Chunk, Unsafe }

object ThriftCodec {

Expand All @@ -27,11 +27,7 @@ object ThriftCodec {
decodeChunk(whole)

override def streamDecoder: ZPipeline[Any, DecodeError, Byte, A] =
ZPipeline.mapChunksZIO { chunk =>
ZIO.fromEither(
decodeChunk(chunk).map(Chunk(_))
)
}
ZPipeline.mapChunksEither(bytes => decodeChunk(bytes).map(Chunk.single))

override def encode(value: A): Chunk[Byte] =
new Encoder().encode(schema, value)
Expand Down
Loading