@@ -1628,7 +1628,6 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
16281628 req := & storagepb.BidiReadObjectRequest {
16291629 ReadObjectSpec : spec ,
16301630 }
1631- ctx = gax .InsertMetadataIntoOutgoingContext (ctx , contextMetadataFromBidiReadObject (req )... )
16321631
16331632 // Define a function that initiates a Read with offset and length, assuming
16341633 // we have already read seen bytes.
@@ -1660,28 +1659,53 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
16601659 var decoder * readResponseDecoder
16611660
16621661 err = run (cc , func (ctx context.Context ) error {
1663- stream , err = c .raw .BidiReadObject (ctx , s .gax ... )
1664- if err != nil {
1665- return err
1666- }
1667- if err := stream .Send (req ); err != nil {
1668- return err
1662+ var databufs mem.BufferSlice
1663+ openAndSendReq := func () error {
1664+ databufs = mem.BufferSlice {}
1665+
1666+ // Insert context metadata, including routing token if this is a retry
1667+ // for a redirect.
1668+ mdCtx := gax .InsertMetadataIntoOutgoingContext (ctx , contextMetadataFromBidiReadObject (req )... )
1669+ stream , err = c .raw .BidiReadObject (mdCtx , s .gax ... )
1670+ if err != nil {
1671+ return err
1672+ }
1673+ if err := stream .Send (req ); err != nil {
1674+ return err
1675+ }
1676+ // Oneshot reads can close the client->server side immediately.
1677+ if err := stream .CloseSend (); err != nil {
1678+ return err
1679+ }
1680+
1681+ // Receive the message into databuf as a wire-encoded message so we can
1682+ // use a custom decoder to avoid an extra copy at the protobuf layer.
1683+ return stream .RecvMsg (& databufs )
16691684 }
1670- // Oneshot reads can close the client->server side immediately.
1671- if err := stream .CloseSend (); err != nil {
1672- return err
1685+
1686+ err := openAndSendReq ()
1687+
1688+ // We might get a redirect error here for an out-of-region request.
1689+ // Add the routing token and read handle to the request and do one
1690+ // retry.
1691+ if st , ok := status .FromError (err ); ok && st .Code () == codes .Aborted {
1692+ for _ , d := range st .Details () {
1693+ if e , ok := d .(* storagepb.BidiReadObjectRedirectedError ); ok {
1694+ req .ReadObjectSpec .ReadHandle = e .GetReadHandle ()
1695+ req .ReadObjectSpec .RoutingToken = e .RoutingToken
1696+ err = openAndSendReq ()
1697+ break
1698+ }
1699+ }
16731700 }
16741701
1675- // Receive the message into databuf as a wire-encoded message so we can
1676- // use a custom decoder to avoid an extra copy at the protobuf layer.
1677- databufs := mem.BufferSlice {}
1678- err := stream .RecvMsg (& databufs )
16791702 // These types of errors show up on the RecvMsg call, rather than the
16801703 // initialization of the stream via BidiReadObject above.
16811704 if s , ok := status .FromError (err ); ok && s .Code () == codes .NotFound {
1682- return formatObjectErr (err )
1705+ err = formatObjectErr (err )
16831706 }
16841707 if err != nil {
1708+ databufs .Free ()
16851709 return err
16861710 }
16871711 // Use a custom decoder that uses protobuf unmarshalling for all
@@ -2093,7 +2117,9 @@ func (r *gRPCReader) Close() error {
20932117func (r * gRPCReader ) recv () error {
20942118 databufs := mem.BufferSlice {}
20952119 err := r .stream .RecvMsg (& databufs )
2096- if err != nil && r .settings .retry .runShouldRetry (err ) {
2120+ // If we get a mid-stream error on a recv call, reopen the stream.
2121+ // ABORTED could indicate a redirect so should also trigger a reopen.
2122+ if err != nil && (r .settings .retry .runShouldRetry (err ) || status .Code (err ) == codes .Aborted ) {
20972123 // This will "close" the existing stream and immediately attempt to
20982124 // reopen the stream, but will backoff if further attempts are necessary.
20992125 // Reopening the stream Recvs the first message, so if retrying is
0 commit comments