1
1
package application
2
2
3
3
import (
4
+ "bufio"
5
+ "bytes"
4
6
"context"
5
7
"encoding/json"
6
8
"errors"
7
9
"fmt"
8
10
"github.com/devtron-labs/common-lib/utils"
9
- "net/http"
10
- "strconv"
11
- "strings"
12
-
13
11
util3 "github.com/devtron-labs/common-lib/utils/k8s"
14
12
k8sCommonBean "github.com/devtron-labs/common-lib/utils/k8s/commonBean"
15
13
"github.com/devtron-labs/common-lib/utils/k8sObjectsUtil"
@@ -27,12 +25,19 @@ import (
27
25
"github.com/devtron-labs/devtron/pkg/terminal"
28
26
"github.com/devtron-labs/devtron/util"
29
27
"github.com/devtron-labs/devtron/util/rbac"
28
+ "github.com/google/uuid"
30
29
"github.com/gorilla/mux"
31
30
errors2 "github.com/juju/errors"
32
31
"go.uber.org/zap"
33
32
"gopkg.in/go-playground/validator.v9"
33
+ "io"
34
34
errors3 "k8s.io/apimachinery/pkg/api/errors"
35
35
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36
+ "net/http"
37
+ "regexp"
38
+ "strconv"
39
+ "strings"
40
+ "time"
36
41
)
37
42
38
43
type K8sApplicationRestHandler interface {
@@ -42,6 +47,7 @@ type K8sApplicationRestHandler interface {
42
47
DeleteResource (w http.ResponseWriter , r * http.Request )
43
48
ListEvents (w http.ResponseWriter , r * http.Request )
44
49
GetPodLogs (w http.ResponseWriter , r * http.Request )
50
+ DownloadPodLogs (w http.ResponseWriter , r * http.Request )
45
51
GetTerminalSession (w http.ResponseWriter , r * http.Request )
46
52
GetResourceInfo (w http.ResponseWriter , r * http.Request )
47
53
GetHostUrlsByBatch (w http.ResponseWriter , r * http.Request )
@@ -620,11 +626,127 @@ func (handler *K8sApplicationRestHandlerImpl) GetPodLogs(w http.ResponseWriter,
620
626
common .WriteJsonResp (w , err , nil , http .StatusBadRequest )
621
627
return
622
628
}
629
+ handler .requestValidationAndRBAC (w , r , token , request )
630
+ lastEventId := r .Header .Get (bean2 .LastEventID )
631
+ isReconnect := false
632
+ if len (lastEventId ) > 0 {
633
+ lastSeenMsgId , err := strconv .ParseInt (lastEventId , bean2 .IntegerBase , bean2 .IntegerBitSize )
634
+ if err != nil {
635
+ common .WriteJsonResp (w , err , nil , http .StatusBadRequest )
636
+ return
637
+ }
638
+ lastSeenMsgId = lastSeenMsgId + bean2 .TimestampOffsetToAvoidDuplicateLogs //increased by one ns to avoid duplicate
639
+ t := v1 .Unix (0 , lastSeenMsgId )
640
+ request .K8sRequest .PodLogsRequest .SinceTime = & t
641
+ isReconnect = true
642
+ }
643
+ stream , err := handler .k8sApplicationService .GetPodLogs (r .Context (), request )
644
+ //err is handled inside StartK8sStreamWithHeartBeat method
645
+ ctx , cancel := context .WithCancel (r .Context ())
646
+ if cn , ok := w .(http.CloseNotifier ); ok {
647
+ go func (done <- chan struct {}, closed <- chan bool ) {
648
+ select {
649
+ case <- done :
650
+ case <- closed :
651
+ cancel ()
652
+ }
653
+ }(ctx .Done (), cn .CloseNotify ())
654
+ }
655
+ defer cancel ()
656
+ defer util .Close (stream , handler .logger )
657
+ handler .pump .StartK8sStreamWithHeartBeat (w , isReconnect , stream , err )
658
+ }
659
+
660
+ func (handler * K8sApplicationRestHandlerImpl ) DownloadPodLogs (w http.ResponseWriter , r * http.Request ) {
661
+ token := r .Header .Get ("token" )
662
+ request , err := handler .k8sApplicationService .ValidatePodLogsRequestQuery (r )
663
+ if err != nil {
664
+ common .WriteJsonResp (w , err , nil , http .StatusBadRequest )
665
+ return
666
+ }
667
+ handler .requestValidationAndRBAC (w , r , token , request )
668
+
669
+ // just to make sure follow flag is set to false when downloading logs
670
+ request .K8sRequest .PodLogsRequest .Follow = false
671
+
672
+ stream , err := handler .k8sApplicationService .GetPodLogs (r .Context (), request )
673
+ if err != nil {
674
+ common .WriteJsonResp (w , err , nil , http .StatusInternalServerError )
675
+ return
676
+ }
677
+ ctx , cancel := context .WithCancel (r .Context ())
678
+ if cn , ok := w .(http.CloseNotifier ); ok {
679
+ go func (done <- chan struct {}, closed <- chan bool ) {
680
+ select {
681
+ case <- done :
682
+ case <- closed :
683
+ cancel ()
684
+ }
685
+ }(ctx .Done (), cn .CloseNotify ())
686
+ }
687
+ defer cancel ()
688
+ defer util .Close (stream , handler .logger )
689
+
690
+ var dataBuffer bytes.Buffer
691
+ bufReader := bufio .NewReader (stream )
692
+ eof := false
693
+ for ! eof {
694
+ log , err := bufReader .ReadString ('\n' )
695
+ log = strings .TrimSpace (log ) // Remove trailing line ending
696
+ a := regexp .MustCompile (" " )
697
+ var res []byte
698
+ splitLog := a .Split (log , 2 )
699
+ if len (splitLog [0 ]) > 0 {
700
+ parsedTime , err := time .Parse (time .RFC3339 , splitLog [0 ])
701
+ if err != nil {
702
+ common .WriteJsonResp (w , err , nil , http .StatusInternalServerError )
703
+ return
704
+ }
705
+ gmtTimeLoc := time .FixedZone (bean2 .LocalTimezoneInGMT , bean2 .LocalTimeOffset )
706
+ humanReadableTime := parsedTime .In (gmtTimeLoc ).Format (time .RFC1123 )
707
+ res = append (res , humanReadableTime ... )
708
+ }
709
+
710
+ if len (splitLog ) == 2 {
711
+ res = append (res , " " ... )
712
+ res = append (res , splitLog [1 ]... )
713
+ }
714
+ res = append (res , "\n " ... )
715
+ if err == io .EOF {
716
+ eof = true
717
+ // stop if we reached end of stream and the next line is empty
718
+ if log == "" {
719
+ break
720
+ }
721
+ } else if err != nil && err != io .EOF {
722
+ common .WriteJsonResp (w , err , nil , http .StatusInternalServerError )
723
+ return
724
+ }
725
+ _ , err = dataBuffer .Write (res )
726
+ if err != nil {
727
+ common .WriteJsonResp (w , err , nil , http .StatusInternalServerError )
728
+ return
729
+ }
730
+ }
731
+ if len (dataBuffer .Bytes ()) == 0 {
732
+ common .WriteJsonResp (w , nil , nil , http .StatusNoContent )
733
+ return
734
+ }
735
+ podLogsFilename := generatePodLogsFilename (request .K8sRequest .ResourceIdentifier .Name )
736
+ common .WriteOctetStreamResp (w , r , dataBuffer .Bytes (), podLogsFilename )
737
+ return
738
+ }
739
+
740
+ func generatePodLogsFilename (filename string ) string {
741
+ return fmt .Sprintf ("podlogs-%s-%s.log" , filename , uuid .New ().String ())
742
+ }
743
+
744
+ func (handler * K8sApplicationRestHandlerImpl ) requestValidationAndRBAC (w http.ResponseWriter , r * http.Request , token string , request * k8s.ResourceRequestBean ) {
623
745
if request .AppIdentifier != nil {
624
746
if request .DeploymentType == bean2 .HelmInstalledType {
625
747
valid , err := handler .k8sApplicationService .ValidateResourceRequest (r .Context (), request .AppIdentifier , request .K8sRequest )
626
748
if err != nil || ! valid {
627
- handler .logger .Errorw ("error in validating resource request" , "err" , err )
749
+ handler .logger .Errorw ("error in validating resource request" , "err" , err , "request.AppIdentifier" , request . AppIdentifier , "request.K8sRequest" , request . K8sRequest )
628
750
apiError := util2.ApiError {
629
751
InternalMessage : "failed to validate the resource with error " + err .Error (),
630
752
UserMessage : "Failed to validate resource" ,
@@ -671,34 +793,6 @@ func (handler *K8sApplicationRestHandlerImpl) GetPodLogs(w http.ResponseWriter,
671
793
common .WriteJsonResp (w , errors .New ("can not get pod logs as target cluster is not provided" ), nil , http .StatusBadRequest )
672
794
return
673
795
}
674
- lastEventId := r .Header .Get ("Last-Event-ID" )
675
- isReconnect := false
676
- if len (lastEventId ) > 0 {
677
- lastSeenMsgId , err := strconv .ParseInt (lastEventId , 10 , 64 )
678
- if err != nil {
679
- common .WriteJsonResp (w , err , nil , http .StatusBadRequest )
680
- return
681
- }
682
- lastSeenMsgId = lastSeenMsgId + 1 //increased by one ns to avoid duplicate
683
- t := v1 .Unix (0 , lastSeenMsgId )
684
- request .K8sRequest .PodLogsRequest .SinceTime = & t
685
- isReconnect = true
686
- }
687
- stream , err := handler .k8sApplicationService .GetPodLogs (r .Context (), request )
688
- //err is handled inside StartK8sStreamWithHeartBeat method
689
- ctx , cancel := context .WithCancel (r .Context ())
690
- if cn , ok := w .(http.CloseNotifier ); ok {
691
- go func (done <- chan struct {}, closed <- chan bool ) {
692
- select {
693
- case <- done :
694
- case <- closed :
695
- cancel ()
696
- }
697
- }(ctx .Done (), cn .CloseNotify ())
698
- }
699
- defer cancel ()
700
- defer util .Close (stream , handler .logger )
701
- handler .pump .StartK8sStreamWithHeartBeat (w , isReconnect , stream , err )
702
796
}
703
797
704
798
func (handler * K8sApplicationRestHandlerImpl ) GetTerminalSession (w http.ResponseWriter , r * http.Request ) {
0 commit comments