Skip to content

Commit e55c0da

Browse files
chitrangpateltekton-robot
authored andcommitted
TEP-0127: Larger results using sidecar logs - parsing logs to extract results
This PR addresses a part of TEP-0127 - parsing logs to extract results into the task run CRD. The taskrun reconciler fetches the logs of the injected sidecar container sidecar-tekton-log-results. The logs are already structured and can be un-marshalled via json. The reconciler parses these logs and adds the results to the task run CRD. If the size of the result is greater than the max-result-size configured by the user then the TaskRun fails with reason TaskRunResultLargerThanAllowedLimit. `Note`: the e2e test only covers the success case of using sidecar logs to extract results. This is because the failure cases are already tested via unit tests. Before approving this PR, review tektoncd#5834
1 parent 963aa40 commit e55c0da

File tree

14 files changed

+1046
-17
lines changed

14 files changed

+1046
-17
lines changed

internal/sidecarlogresults/sidecarlogresults.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,25 @@ limitations under the License.
1717
package sidecarlogresults
1818

1919
import (
20+
"bufio"
21+
"context"
2022
"encoding/json"
23+
"errors"
2124
"fmt"
2225
"io"
2326
"os"
2427
"path/filepath"
2528

29+
"github.com/tektoncd/pipeline/pkg/apis/config"
30+
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
2631
"golang.org/x/sync/errgroup"
32+
corev1 "k8s.io/api/core/v1"
33+
"k8s.io/client-go/kubernetes"
2734
)
2835

36+
// ErrSizeExceeded indicates that the result exceeded its maximum allowed size
37+
var ErrSizeExceeded = errors.New("results size exceeds configured limit")
38+
2939
// SidecarLogResult holds fields for storing extracted results
3040
type SidecarLogResult struct {
3141
Name string
@@ -125,3 +135,59 @@ func LookForResults(w io.Writer, runDir string, resultsDir string, resultNames [
125135
}
126136
return nil
127137
}
138+
139+
// GetResultsFromSidecarLogs extracts results from the logs of the results sidecar
140+
func GetResultsFromSidecarLogs(ctx context.Context, clientset kubernetes.Interface, namespace string, name string, container string, podPhase corev1.PodPhase) ([]v1beta1.PipelineResourceResult, error) {
141+
sidecarLogResults := []v1beta1.PipelineResourceResult{}
142+
if podPhase == corev1.PodPending {
143+
return sidecarLogResults, nil
144+
}
145+
podLogOpts := corev1.PodLogOptions{Container: container}
146+
req := clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOpts)
147+
sidecarLogs, err := req.Stream(ctx)
148+
if err != nil {
149+
return sidecarLogResults, err
150+
}
151+
defer sidecarLogs.Close()
152+
maxResultLimit := config.FromContextOrDefaults(ctx).FeatureFlags.MaxResultSize
153+
return extractResultsFromLogs(sidecarLogs, sidecarLogResults, maxResultLimit)
154+
}
155+
156+
func extractResultsFromLogs(logs io.Reader, sidecarLogResults []v1beta1.PipelineResourceResult, maxResultLimit int) ([]v1beta1.PipelineResourceResult, error) {
157+
scanner := bufio.NewScanner(logs)
158+
buf := make([]byte, maxResultLimit)
159+
scanner.Buffer(buf, maxResultLimit)
160+
for scanner.Scan() {
161+
result, err := parseResults(scanner.Bytes(), maxResultLimit)
162+
if err != nil {
163+
return nil, err
164+
}
165+
sidecarLogResults = append(sidecarLogResults, result)
166+
}
167+
168+
if scanner.Err() != nil {
169+
if errors.Is(scanner.Err(), bufio.ErrTooLong) {
170+
return sidecarLogResults, ErrSizeExceeded
171+
}
172+
return nil, scanner.Err()
173+
}
174+
return sidecarLogResults, nil
175+
}
176+
177+
func parseResults(resultBytes []byte, maxResultLimit int) (v1beta1.PipelineResourceResult, error) {
178+
result := v1beta1.PipelineResourceResult{}
179+
if len(resultBytes) > maxResultLimit {
180+
return result, ErrSizeExceeded
181+
}
182+
183+
var res SidecarLogResult
184+
if err := json.Unmarshal(resultBytes, &res); err != nil {
185+
return result, fmt.Errorf("Invalid result %w", err)
186+
}
187+
result = v1beta1.PipelineResourceResult{
188+
Key: res.Name,
189+
Value: res.Value,
190+
ResultType: v1beta1.TaskRunResultType,
191+
}
192+
return result, nil
193+
}

internal/sidecarlogresults/sidecarlogresults_test.go

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,22 @@ package sidecarlogresults
22

33
import (
44
"bytes"
5+
"context"
56
"encoding/json"
7+
"fmt"
68
"os"
79
"path/filepath"
810
"sort"
11+
"strings"
912
"testing"
1013

1114
"github.com/google/go-cmp/cmp"
15+
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
1216
"github.com/tektoncd/pipeline/test/diff"
17+
corev1 "k8s.io/api/core/v1"
18+
v1 "k8s.io/api/core/v1"
19+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20+
fakekubeclientset "k8s.io/client-go/kubernetes/fake"
1321
)
1422

1523
func TestLookForResults_FanOutAndWait(t *testing.T) {
@@ -121,6 +129,184 @@ func TestLookForResults(t *testing.T) {
121129
}
122130
}
123131

132+
func TestExtractResultsFromLogs(t *testing.T) {
133+
inputResults := []SidecarLogResult{
134+
{
135+
Name: "result1",
136+
Value: "foo",
137+
}, {
138+
Name: "result2",
139+
Value: "bar",
140+
},
141+
}
142+
podLogs := ""
143+
for _, r := range inputResults {
144+
res, _ := json.Marshal(&r)
145+
podLogs = fmt.Sprintf("%s%s\n", podLogs, string(res))
146+
}
147+
logs := strings.NewReader(podLogs)
148+
149+
results, err := extractResultsFromLogs(logs, []v1beta1.PipelineResourceResult{}, 4096)
150+
if err != nil {
151+
t.Error(err)
152+
}
153+
want := []v1beta1.PipelineResourceResult{
154+
{
155+
Key: "result1",
156+
Value: "foo",
157+
ResultType: v1beta1.TaskRunResultType,
158+
}, {
159+
Key: "result2",
160+
Value: "bar",
161+
ResultType: v1beta1.TaskRunResultType,
162+
},
163+
}
164+
if d := cmp.Diff(want, results); d != "" {
165+
t.Fatal(diff.PrintWantGot(d))
166+
}
167+
}
168+
169+
func TestExtractResultsFromLogs_Failure(t *testing.T) {
170+
inputResults := []SidecarLogResult{
171+
{
172+
Name: "result1",
173+
Value: strings.Repeat("v", 4098),
174+
},
175+
}
176+
podLogs := ""
177+
for _, r := range inputResults {
178+
res, _ := json.Marshal(&r)
179+
podLogs = fmt.Sprintf("%s%s\n", podLogs, string(res))
180+
}
181+
logs := strings.NewReader(podLogs)
182+
183+
_, err := extractResultsFromLogs(logs, []v1beta1.PipelineResourceResult{}, 4096)
184+
if err != ErrSizeExceeded {
185+
t.Fatalf("Expected error %v but got %v", ErrSizeExceeded, err)
186+
}
187+
}
188+
189+
func TestParseResults(t *testing.T) {
190+
results := []SidecarLogResult{
191+
{
192+
Name: "result1",
193+
Value: "foo",
194+
}, {
195+
Name: "result2",
196+
Value: `{"IMAGE_URL":"ar.com", "IMAGE_DIGEST":"sha234"}`,
197+
}, {
198+
Name: "result3",
199+
Value: `["hello","world"]`,
200+
},
201+
}
202+
podLogs := []string{}
203+
for _, r := range results {
204+
res, _ := json.Marshal(&r)
205+
podLogs = append(podLogs, string(res))
206+
}
207+
want := []v1beta1.PipelineResourceResult{{
208+
Key: "result1",
209+
Value: "foo",
210+
ResultType: v1beta1.TaskRunResultType,
211+
}, {
212+
Key: "result2",
213+
Value: `{"IMAGE_URL":"ar.com", "IMAGE_DIGEST":"sha234"}`,
214+
ResultType: v1beta1.TaskRunResultType,
215+
}, {
216+
Key: "result3",
217+
Value: `["hello","world"]`,
218+
ResultType: v1beta1.TaskRunResultType,
219+
}}
220+
stepResults := []v1beta1.PipelineResourceResult{}
221+
for _, plog := range podLogs {
222+
res, err := parseResults([]byte(plog), 4096)
223+
if err != nil {
224+
t.Error(err)
225+
}
226+
stepResults = append(stepResults, res)
227+
}
228+
if d := cmp.Diff(want, stepResults); d != "" {
229+
t.Fatal(diff.PrintWantGot(d))
230+
}
231+
}
232+
233+
func TestParseResults_Failure(t *testing.T) {
234+
result := SidecarLogResult{
235+
Name: "result2",
236+
Value: strings.Repeat("k", 4098),
237+
}
238+
res1, _ := json.Marshal("result1 v1")
239+
res2, _ := json.Marshal(&result)
240+
podLogs := []string{string(res1), string(res2)}
241+
want := []string{
242+
"Invalid result json: cannot unmarshal string into Go value of type sidecarlogresults.SidecarLogResult",
243+
ErrSizeExceeded.Error(),
244+
}
245+
got := []string{}
246+
for _, plog := range podLogs {
247+
_, err := parseResults([]byte(plog), 4096)
248+
got = append(got, err.Error())
249+
}
250+
if d := cmp.Diff(want, got); d != "" {
251+
t.Fatal(diff.PrintWantGot(d))
252+
}
253+
}
254+
255+
func TestGetResultsFromSidecarLogs(t *testing.T) {
256+
for _, c := range []struct {
257+
desc string
258+
podPhase v1.PodPhase
259+
wantError bool
260+
}{{
261+
desc: "pod pending to start",
262+
podPhase: corev1.PodPending,
263+
wantError: false,
264+
}, {
265+
desc: "pod running extract logs",
266+
podPhase: corev1.PodRunning,
267+
wantError: true,
268+
}} {
269+
t.Run(c.desc, func(t *testing.T) {
270+
ctx := context.Background()
271+
clientset := fakekubeclientset.NewSimpleClientset()
272+
pod := &v1.Pod{
273+
TypeMeta: metav1.TypeMeta{
274+
Kind: "Pod",
275+
APIVersion: "v1",
276+
},
277+
ObjectMeta: metav1.ObjectMeta{
278+
Name: "pod",
279+
Namespace: "foo",
280+
},
281+
Spec: v1.PodSpec{
282+
Containers: []v1.Container{
283+
{
284+
Name: "container",
285+
Image: "image",
286+
},
287+
},
288+
},
289+
Status: v1.PodStatus{
290+
Phase: c.podPhase,
291+
},
292+
}
293+
pod, err := clientset.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
294+
if err != nil {
295+
t.Errorf("Error occurred while creating pod %s: %s", pod.Name, err.Error())
296+
}
297+
298+
// Fake logs are not formatted properly so there will be an error
299+
_, err = GetResultsFromSidecarLogs(ctx, clientset, "foo", "pod", "container", pod.Status.Phase)
300+
if err != nil && !c.wantError {
301+
t.Fatalf("did not expect an error but got: %v", err)
302+
}
303+
if c.wantError && err == nil {
304+
t.Fatal("expected to get an error but did not")
305+
}
306+
})
307+
}
308+
}
309+
124310
func createResult(t *testing.T, dir string, resultName string, resultValue string) {
125311
t.Helper()
126312
resultFile := filepath.Join(dir, resultName)

internal/sidecarlogsvalidation/sidecarlogs.go renamed to pkg/apis/pipeline/sidecarlogs.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,14 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package sidecarlogsvalidation
17+
package pipeline
1818

1919
const (
2020
// ReservedResultsSidecarName is the name of the results sidecar that outputs the results to stdout
2121
// when the results-from feature-flag is set to "sidecar-logs".
2222
ReservedResultsSidecarName = "tekton-log-results"
23+
24+
// ReservedResultsSidecarContainerName is the name of the results sidecar container that is injected
25+
// by the reconciler.
26+
ReservedResultsSidecarContainerName = "sidecar-tekton-log-results"
2327
)

pkg/apis/pipeline/v1beta1/taskrun_types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,8 @@ const (
186186
TaskRunReasonsResultsVerificationFailed TaskRunReason = "TaskRunResultsVerificationFailed"
187187
// AwaitingTaskRunResults is the reason set when waiting upon `TaskRun` results and signatures to verify
188188
AwaitingTaskRunResults TaskRunReason = "AwaitingTaskRunResults"
189+
// TaskRunReasonResultLargerThanAllowedLimit is the reason set when one of the results exceeds its maximum allowed limit of 1 KB
190+
TaskRunReasonResultLargerThanAllowedLimit TaskRunReason = "TaskRunResultLargerThanAllowedLimit"
189191
)
190192

191193
func (t TaskRunReason) String() string {

pkg/pod/entrypoint.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"strconv"
2727
"strings"
2828

29+
"github.com/tektoncd/pipeline/pkg/apis/config"
2930
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
3031
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
3132
"gomodules.xyz/jsonpatch/v2"
@@ -252,6 +253,12 @@ func StopSidecars(ctx context.Context, nopImage string, kubeclient kubernetes.In
252253
updated := false
253254
if newPod.Status.Phase == corev1.PodRunning {
254255
for _, s := range newPod.Status.ContainerStatuses {
256+
// If the results-from is set to sidecar logs,
257+
// a sidecar container with name `sidecar-log-results` is injected by the reconiler.
258+
// Do not kill this sidecar. Let it exit gracefully.
259+
if config.FromContextOrDefaults(ctx).FeatureFlags.ResultExtractionMethod == config.ResultExtractionMethodSidecarLogs && s.Name == pipeline.ReservedResultsSidecarContainerName {
260+
continue
261+
}
255262
// Stop any running container that isn't a step.
256263
// An injected sidecar container might not have the
257264
// "sidecar-" prefix, so we can't just look for that

0 commit comments

Comments
 (0)