@@ -32,6 +32,7 @@ import (
32
32
"github.com/apache/beam/sdks/v2/go/pkg/beam/util/execx"
33
33
)
34
34
35
+ const pipLogFlushInterval time.Duration = 15 * time .Second
35
36
const unrecoverableURL string = "https://beam.apache.org/documentation/sdks/python-unrecoverable-errors/index.html#pip-dependency-resolution-failures"
36
37
37
38
// pipInstallRequirements installs the given requirement, if present.
@@ -40,15 +41,15 @@ func pipInstallRequirements(ctx context.Context, logger *tools.Logger, files []s
40
41
if err != nil {
41
42
return err
42
43
}
43
- bufLogger := tools .NewBufferedLogger ( logger )
44
+ bufLogger := tools .NewBufferedLoggerWithFlushInterval ( ctx , logger , pipLogFlushInterval )
44
45
for _ , file := range files {
45
46
if file == name {
46
47
// We run the install process in two rounds in order to avoid as much
47
48
// as possible PyPI downloads. In the first round the --find-links
48
49
// option will make sure that only things staged in the worker will be
49
50
// used without following their dependencies.
50
51
args := []string {"-m" , "pip" , "install" , "-r" , filepath .Join (dir , name ), "--no-cache-dir" , "--disable-pip-version-check" , "--no-index" , "--no-deps" , "--find-links" , dir }
51
- if err := execx .Execute ( pythonVersion , args ... ); err != nil {
52
+ if err := execx .ExecuteEnvWithIO ( nil , os . Stdin , bufLogger , bufLogger , pythonVersion , args ... ); err != nil {
52
53
bufLogger .Printf (ctx , "Some packages could not be installed solely from the requirements cache. Installing packages from PyPI." )
53
54
}
54
55
// The second install round opens up the search for packages on PyPI and
@@ -79,8 +80,6 @@ func isPackageInstalled(pkgName string) bool {
79
80
return true
80
81
}
81
82
82
- const pipLogFlushInterval time.Duration = 15 * time .Second
83
-
84
83
// pipInstallPackage installs the given package, if present.
85
84
func pipInstallPackage (ctx context.Context , logger * tools.Logger , files []string , dir , name string , force , optional bool , extras []string ) error {
86
85
pythonVersion , err := expansionx .GetPythonVersion ()
@@ -150,7 +149,7 @@ func pipInstallPackage(ctx context.Context, logger *tools.Logger, files []string
150
149
// installExtraPackages installs all the packages declared in the extra
151
150
// packages manifest file.
152
151
func installExtraPackages (ctx context.Context , logger * tools.Logger , files []string , extraPackagesFile , dir string ) error {
153
- bufLogger := tools .NewBufferedLogger ( logger )
152
+ bufLogger := tools .NewBufferedLoggerWithFlushInterval ( ctx , logger , pipLogFlushInterval )
154
153
// First check that extra packages manifest file is present.
155
154
for _ , file := range files {
156
155
if file != extraPackagesFile {
@@ -179,7 +178,7 @@ func installExtraPackages(ctx context.Context, logger *tools.Logger, files []str
179
178
}
180
179
181
180
func findBeamSdkWhl (ctx context.Context , logger * tools.Logger , files []string , acceptableWhlSpecs []string ) string {
182
- bufLogger := tools .NewBufferedLogger ( logger )
181
+ bufLogger := tools .NewBufferedLoggerWithFlushInterval ( ctx , logger , pipLogFlushInterval )
183
182
for _ , file := range files {
184
183
if strings .HasPrefix (file , "apache_beam" ) {
185
184
for _ , s := range acceptableWhlSpecs {
@@ -200,7 +199,7 @@ func findBeamSdkWhl(ctx context.Context, logger *tools.Logger, files []string, a
200
199
// SDK from source tarball provided in sdkSrcFile.
201
200
func installSdk (ctx context.Context , logger * tools.Logger , files []string , workDir string , sdkSrcFile string , acceptableWhlSpecs []string , required bool ) error {
202
201
sdkWhlFile := findBeamSdkWhl (ctx , logger , files , acceptableWhlSpecs )
203
- bufLogger := tools .NewBufferedLogger ( logger )
202
+ bufLogger := tools .NewBufferedLoggerWithFlushInterval ( ctx , logger , pipLogFlushInterval )
204
203
if sdkWhlFile != "" {
205
204
// by default, pip rejects to install wheel if same version already installed
206
205
isDev := strings .Contains (sdkWhlFile , ".dev" )
0 commit comments