@@ -52,37 +52,99 @@ if Code.ensure_loaded?(Finch) do
5252 @ behaviour Tesla.Adapter
5353 alias Tesla.Multipart
5454
55+ @ defaults [
56+ receive_timeout: 15_000
57+ ]
58+
5559 @ impl Tesla.Adapter
5660 def call ( % Tesla.Env { } = env , opts ) do
57- opts = Tesla.Adapter . opts ( env , opts )
61+ opts = Tesla.Adapter . opts ( @ defaults , env , opts )
5862
5963 name = Keyword . fetch! ( opts , :name )
6064 url = Tesla . build_url ( env . url , env . query )
6165 req_opts = Keyword . take ( opts , [ :pool_timeout , :receive_timeout ] )
66+ req = build ( env . method , url , env . headers , env . body )
6267
63- case request ( name , env . method , url , env . headers , env . body , req_opts ) do
68+ case request ( req , name , req_opts , opts ) do
6469 { :ok , % Finch.Response { status: status , headers: headers , body: body } } ->
6570 { :ok , % Tesla.Env { env | status: status , headers: headers , body: body } }
6671
67- { :error , mint_error } ->
68- { :error , Exception . message ( mint_error ) }
72+ { :error , % Mint.TransportError { reason: reason } } ->
73+ { :error , reason }
74+
75+ { :error , reason } ->
76+ { :error , reason }
6977 end
7078 end
7179
72- defp request ( name , method , url , headers , % Multipart { } = mp , opts ) do
80+ defp build ( method , url , headers , % Multipart { } = mp ) do
7381 headers = headers ++ Multipart . headers ( mp )
7482 body = Multipart . body ( mp ) |> Enum . to_list ( )
7583
76- request ( name , method , url , headers , body , opts )
84+ build ( method , url , headers , body )
7785 end
7886
79- defp request ( _name , _method , _url , _headers , % Stream { } , _opts ) do
80- raise "Streaming is not supported by this adapter!"
87+ defp build ( method , url , headers , % Stream { } = body_stream ) do
88+ build ( method , url , headers , { :stream , body_stream } )
8189 end
8290
83- defp request ( name , method , url , headers , body , opts ) do
91+ defp build ( method , url , headers , body_stream_fun ) when is_function ( body_stream_fun ) do
92+ build ( method , url , headers , { :stream , body_stream_fun } )
93+ end
94+
95+ defp build ( method , url , headers , body ) do
8496 Finch . build ( method , url , headers , body )
85- |> Finch . request ( name , opts )
97+ end
98+
99+ defp request ( req , name , req_opts , opts ) do
100+ case opts [ :response ] do
101+ :stream -> stream ( req , name , req_opts )
102+ nil -> Finch . request ( req , name , req_opts )
103+ other -> raise "Unknown response option: #{ inspect ( other ) } "
104+ end
105+ end
106+
107+ defp stream ( req , name , opts ) do
108+ owner = self ( )
109+ ref = make_ref ( )
110+
111+ fun = fn
112+ { :status , status } , _acc -> status
113+ { :headers , headers } , status -> send ( owner , { ref , { :status , status , headers } } )
114+ { :data , data } , _acc -> send ( owner , { ref , { :data , data } } )
115+ end
116+
117+ task =
118+ Task . async ( fn ->
119+ case Finch . stream ( req , name , nil , fun , opts ) do
120+ { :ok , _acc } -> send ( owner , { ref , :eof } )
121+ { :error , error } -> send ( owner , { ref , { :error , error } } )
122+ end
123+ end )
124+
125+ receive do
126+ { ^ ref , { :status , status , headers } } ->
127+ body =
128+ Stream . unfold ( nil , fn _ ->
129+ receive do
130+ { ^ ref , { :data , data } } ->
131+ { data , nil }
132+
133+ { ^ ref , :eof } ->
134+ Task . await ( task )
135+ nil
136+ after
137+ opts [ :receive_timeout ] ->
138+ Task . shutdown ( task , :brutal_kill )
139+ nil
140+ end
141+ end )
142+
143+ { :ok , % Finch.Response { status: status , headers: headers , body: body } }
144+ after
145+ opts [ :receive_timeout ] ->
146+ { :error , :timeout }
147+ end
86148 end
87149 end
88150end
0 commit comments