Skip to content

Commit 547ac08

Browse files
authored
Merge pull request #4 from dbgeek/poor_mans_tail
feat: tail command
2 parents 318118b + 0c7a28c commit 547ac08

File tree

2 files changed

+145
-4
lines changed

2 files changed

+145
-4
lines changed

cmd/tail.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package cmd
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"time"
7+
8+
"github.com/aws/aws-sdk-go/aws"
9+
"github.com/aws/aws-sdk-go/service/s3"
10+
"github.com/dbgeek/elblogcat/logcat"
11+
"github.com/dbgeek/elblogcat/logworker"
12+
"github.com/spf13/cobra"
13+
"github.com/spf13/viper"
14+
)
15+
16+
// tailCmd represents the tail command
17+
var tailCmd = &cobra.Command{
18+
Use: "tail",
19+
Short: "Porman tail pool for new accesslogs for default every 1min",
20+
Long: `
21+
`,
22+
Run: func(cmd *cobra.Command, args []string) {
23+
awsConfiguration := logworker.AWSconfiguration{Region: "eu-west-1"}
24+
configuration := logworker.NewConfiguration()
25+
accessLogFilter := logworker.NewAccessLogFilter()
26+
client := logworker.NewLogWorker(
27+
&awsConfiguration,
28+
&configuration,
29+
&accessLogFilter,
30+
)
31+
logs := make(chan string, 1)
32+
33+
client.Tail(logs)
34+
35+
for v := range logs {
36+
buff := &aws.WriteAtBuffer{}
37+
key := fmt.Sprintf("%s%s", accessLogFilter.AccesslogPath(configuration.Prefix), v)
38+
_, err := client.S3Downloader.Download(buff, &s3.GetObjectInput{
39+
Bucket: aws.String(viper.GetString("s3-bucket")),
40+
Key: aws.String(key),
41+
})
42+
if err != nil {
43+
logworker.Logger.Fatalf("Failed to Download key: %v from s3. Got error: %v",
44+
key,
45+
err)
46+
}
47+
48+
c := logcat.NewRowFilter()
49+
b := bytes.NewBuffer(buff.Bytes())
50+
a := logcat.Accesslog{
51+
Content: b,
52+
RowFilter: c,
53+
PrintFields: viper.GetString("fields"),
54+
}
55+
a.Cat()
56+
}
57+
},
58+
}
59+
60+
func init() {
61+
rootCmd.AddCommand(tailCmd)
62+
tailCmd.PersistentFlags().Duration("polling-interval", 60*time.Second, "")
63+
viper.BindPFlag("polling-interval", tailCmd.PersistentFlags().Lookup("polling-interval"))
64+
65+
}

logworker/logworker.go

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ type (
3131
Profile string
3232
}
3333
Configuration struct {
34-
Bucket string
35-
Prefix string
34+
Bucket string
35+
Prefix string
36+
PollingInterval time.Duration
3637
}
3738
AccessLogFilter struct {
3839
matchString string
@@ -138,6 +139,80 @@ func (l *LogWorker) List() []string {
138139
return accessLogs
139140
}
140141

142+
func (l *LogWorker) Tail(logch chan<- string) {
143+
go func() {
144+
accessLogFilter := NewAccessLogFilter()
145+
consumedAccessLogs := make(map[string]struct{})
146+
147+
lbAccessLogTimestamp := l.AccessLogFilter.StartTime
148+
for t := lbAccessLogTimestamp; t.Before(time.Now().UTC()); t = t.Add(5 * time.Minute) {
149+
lbAccessLogTimestamp = t
150+
lbAccessLog := fmt.Sprintf("%s_elasticloadbalancing_%s_%s_%s",
151+
accessLogFilter.AwsAccountID,
152+
accessLogFilter.Region,
153+
accessLogFilter.LoadBalancerID,
154+
t.Format("20060102T1504Z"),
155+
)
156+
s3Prefix := filepath.Join(l.AccessLogFilter.AccesslogPath(l.Configuration.Prefix), lbAccessLog)
157+
for _, accessLog := range *l.listAccessLogs(s3Prefix) {
158+
if _, ok := consumedAccessLogs[accessLog]; !ok {
159+
consumedAccessLogs[accessLog] = struct{}{}
160+
logch <- accessLog
161+
}
162+
}
163+
}
164+
165+
poller := time.Tick(l.Configuration.PollingInterval)
166+
for now := range poller {
167+
168+
lbAccessLogTimestamp = lbAccessLogTimestamp.Add(15 * time.Second)
169+
lbAccessLog := fmt.Sprintf("%s_elasticloadbalancing_%s_%s_%s",
170+
accessLogFilter.AwsAccountID,
171+
accessLogFilter.Region,
172+
accessLogFilter.LoadBalancerID,
173+
now.UTC().Format("20060102T1504Z"),
174+
)
175+
s3Prefix := filepath.Join(l.AccessLogFilter.AccesslogPath(l.Configuration.Prefix), lbAccessLog)
176+
for _, accessLog := range *l.listAccessLogs(s3Prefix) {
177+
if _, ok := consumedAccessLogs[accessLog]; !ok {
178+
consumedAccessLogs[accessLog] = struct{}{}
179+
logch <- accessLog
180+
}
181+
}
182+
for k := range consumedAccessLogs {
183+
ts := strings.Split(k, "_")
184+
t, _ := time.Parse("20060102T1504Z", ts[4])
185+
if t.Before(now.UTC().Add(-2 * time.Minute)) {
186+
delete(consumedAccessLogs, k)
187+
}
188+
189+
}
190+
}
191+
}()
192+
}
193+
194+
func (l *LogWorker) listAccessLogs(s3Prefix string) *[]string {
195+
var al []string
196+
input := &s3.ListObjectsV2Input{
197+
Bucket: aws.String(l.Configuration.Bucket),
198+
Prefix: aws.String(s3Prefix),
199+
Delimiter: aws.String("/"),
200+
MaxKeys: aws.Int64(200),
201+
}
202+
err := l.S3.ListObjectsV2Pages(input,
203+
func(page *s3.ListObjectsV2Output, lastPage bool) bool {
204+
for _, val := range page.Contents {
205+
accessLog := strings.Split(*val.Key, "/")[len(strings.Split(*val.Key, "/"))-1]
206+
al = append(al, accessLog)
207+
}
208+
return true
209+
})
210+
if err != nil {
211+
fmt.Println(err)
212+
}
213+
return &al
214+
}
215+
141216
func (a *AccessLogFilter) AccesslogPath(prefix string) string {
142217
return filepath.Join(prefix, fmt.Sprintf("AWSLogs/%s/elasticloadbalancing/%s/%s/", a.AwsAccountID, a.Region, a.StartTime.Format("2006/01/02"))) + "/"
143218

@@ -197,7 +272,8 @@ func NewAccessLogFilter() AccessLogFilter {
197272

198273
func NewConfiguration() Configuration {
199274
return Configuration{
200-
Bucket: viper.GetString("s3-bucket"),
201-
Prefix: viper.GetString("s3-prefix"),
275+
Bucket: viper.GetString("s3-bucket"),
276+
Prefix: viper.GetString("s3-prefix"),
277+
PollingInterval: viper.GetDuration("polling-interval"),
202278
}
203279
}

0 commit comments

Comments
 (0)