Skip to content

Commit 7fe1f68

Browse files
authored
Implementing lightweight detection for new write operations (#52977)
* Implementing lightweight detection for new write operations * Fix typo * BlobLogListener public ctor * Adding "states: BlobStates.None" in GetBlobsAsync calls
1 parent 0f219ba commit 7fe1f68

File tree

3 files changed

+174
-27
lines changed

3 files changed

+174
-27
lines changed

sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobLogListener.cs

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ internal class BlobLogListener
2929
private readonly StorageAnalyticsLogParser _parser;
3030
private readonly ILogger<BlobListener> _logger;
3131

32-
private BlobLogListener(BlobServiceClient blobClient, ILogger<BlobListener> logger)
32+
public BlobLogListener(BlobServiceClient blobClient, ILogger<BlobListener> logger)
3333
{
3434
_blobClient = blobClient;
3535

@@ -77,6 +77,53 @@ public async Task<IEnumerable<BlobWithContainer<BlobBaseClient>>> GetRecentBlobW
7777
return blobs;
7878
}
7979

80+
public async Task<bool> HasBlobWritesAsync(CancellationToken cancellationToken, int hoursWindow = DefaultScanHoursWindow)
81+
{
82+
if (hoursWindow <= 0)
83+
{
84+
return false;
85+
}
86+
87+
DateTime hourCursor = DateTime.UtcNow;
88+
BlobContainerClient containerClient = _blobClient.GetBlobContainerClient(LogContainer);
89+
90+
int processedCount = 0;
91+
92+
for (int hourIndex = 0; hourIndex < hoursWindow; hourIndex++)
93+
{
94+
cancellationToken.ThrowIfCancellationRequested();
95+
96+
string prefix = GetSearchPrefix("blob", hourCursor, hourCursor);
97+
98+
await foreach (var page in containerClient
99+
.GetBlobsAsync(traits: BlobTraits.Metadata, prefix: prefix, states: BlobStates.None, cancellationToken: cancellationToken)
100+
.AsPages(pageSizeHint: 200)
101+
.ConfigureAwait(false))
102+
{
103+
cancellationToken.ThrowIfCancellationRequested();
104+
105+
foreach (BlobItem blob in page.Values)
106+
{
107+
// Increment only when we actually look at a blob item.
108+
processedCount++;
109+
110+
// Examine metadata only if present.
111+
if (blob.Metadata is not null &&
112+
blob.Metadata.TryGetValue(LogType, out string logType) &&
113+
!string.IsNullOrEmpty(logType) &&
114+
logType.IndexOf("write", StringComparison.OrdinalIgnoreCase) >= 0)
115+
{
116+
return true;
117+
}
118+
}
119+
}
120+
121+
hourCursor = hourCursor.AddHours(-1);
122+
}
123+
124+
return false;
125+
}
126+
80127
internal static IEnumerable<BlobPath> GetPathsForValidBlobWrites(IEnumerable<StorageAnalyticsLogEntry> entries)
81128
{
82129
IEnumerable<BlobPath> parsedBlobPaths = from entry in entries

sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobScalerMonitorProvider.cs

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,6 @@ public ZeroToOneScaleMonitor(string functionId, BlobServiceClient blobServiceCli
6363
_logger = loggerFactory.CreateLogger<ZeroToOneScaleMonitor>();
6464
}
6565

66-
#pragma warning disable 0649
67-
// For tests, in PROD the value is always null
68-
private BlobWithContainer<BlobBaseClient> _recentWrite;
69-
#pragma warning restore 0649
70-
7166
public ScaleMonitorDescriptor Descriptor => _scaleMonitorDescriptor;
7267

7368
public async Task<ScaleMetrics> GetMetricsAsync()
@@ -77,26 +72,15 @@ public async Task<ScaleMetrics> GetMetricsAsync()
7772
// if new blob were detected we want to GetScaleStatus return scale out vote at least once
7873
if (Interlocked.Equals(_threadSafeWritesDetectedValue, 1))
7974
{
80-
_logger.LogInformation($"New writes were detectd but GetScaleStatus was not called. Waiting GetScaleStatus to call.");
75+
_logger.LogInformation($"Recent writes were detected but GetScaleStatus was not called. Waiting GetScaleStatus to call.");
8176
return new ScaleMetrics();
8277
}
8378

8479
var blobLogListener = await _blobLogListener.Value.ConfigureAwait(false);
85-
BlobWithContainer<BlobBaseClient>[] recentWrites = _recentWrite == null ? (await blobLogListener.GetRecentBlobWritesAsync(CancellationToken.None).ConfigureAwait(false)).ToArray()
86-
: new BlobWithContainer<BlobBaseClient>[] { _recentWrite };
87-
if (recentWrites.Length > 0)
80+
bool hasBlobWrites = await blobLogListener.HasBlobWritesAsync(CancellationToken.None).ConfigureAwait(false);
81+
if (hasBlobWrites)
8882
{
89-
StringBuilder stringBuilder = new StringBuilder();
90-
foreach (var write in recentWrites)
91-
{
92-
stringBuilder.Append($"'{write.BlobClient.Name}', ");
93-
if (stringBuilder.Length > 1000)
94-
{
95-
stringBuilder.Append("[truncated]");
96-
break;
97-
}
98-
}
99-
_logger.LogInformation($"'{recentWrites.Length}' recent writes were detected for '{_scaleMonitorDescriptor.FunctionId}': {stringBuilder}");
83+
_logger.LogInformation($"Recent writes were detected for '{_scaleMonitorDescriptor.FunctionId}'");
10084
Interlocked.CompareExchange(ref _threadSafeWritesDetectedValue, 1, 0);
10185
}
10286
else

sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/BlobLogListenerTests.cs

Lines changed: 122 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,105 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT License.
33

4+
using System;
45
using System.Collections.Generic;
56
using System.Linq;
7+
using System.Reflection;
8+
using System.Threading;
9+
using System.Threading.Tasks;
10+
using Azure;
11+
using Azure.Storage.Blobs;
12+
using Azure.Storage.Blobs.Models;
13+
using Microsoft.Extensions.Logging;
14+
using Microsoft.Extensions.Logging.Abstractions;
15+
using Moq;
616
using NUnit.Framework;
717

818
namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners
919
{
1020
public class BlobLogListenerTests
1121
{
22+
[TestCase("write", 1, null, 0, true, TestName = "HasBlobWritesAsync_WriteLogPresent_ReturnsTrue")]
23+
[TestCase("read", 1, null, 0, false, TestName = "HasBlobWritesAsync_NonWriteLogPresent_ReturnsFalse")]
24+
[TestCase(null, 1, null, 0, false, TestName = "HasBlobWritesAsync_NoBlobs_ReturnsFalse")]
25+
[TestCase("read", 100, "write", 1, true, TestName = "HasBlobWritesAsync_WriteLogPresentMultipleLogBlobs_ReturnsTrue")]
26+
[TestCase("delete", 100, "read", 100, false, TestName = "HasBlobWritesAsync_NonWriteLogPresentMultipleLogBlobs_ReturnsFalse")]
27+
public async Task HasBlobWritesAsync_VariousCases(string logType1, int logType1Count, string logType2, int logType2Count, bool expected)
28+
{
29+
// Arrange
30+
var blobServiceClientMock = new Mock<BlobServiceClient>(MockBehavior.Strict);
31+
var containerClientMock = new Mock<BlobContainerClient>(MockBehavior.Strict);
32+
33+
TestAsyncPageable<BlobItem> pageable;
34+
var blobItems = new List<BlobItem>();
35+
if (logType1 != null)
36+
{
37+
for (int i = 0; i < logType1Count; i++)
38+
{
39+
var blobItem = BlobItemFactory.Create(
40+
name: Guid.NewGuid().ToString(),
41+
metadata: new Dictionary<string, string> { { "LogType", logType1 } });
42+
blobItems.Add(blobItem);
43+
}
44+
}
45+
if (logType2 != null)
46+
{
47+
for (int i = 0; i < logType2Count; i++)
48+
{
49+
var blobItem = BlobItemFactory.Create(
50+
name: Guid.NewGuid().ToString(),
51+
metadata: new Dictionary<string, string> { { "LogType", logType2 } });
52+
blobItems.Add(blobItem);
53+
}
54+
}
55+
56+
if (blobItems.Count == 0)
57+
{
58+
pageable = new TestAsyncPageable<BlobItem>(Enumerable.Empty<BlobItem>());
59+
}
60+
else
61+
{
62+
pageable = new TestAsyncPageable<BlobItem>(blobItems);
63+
}
64+
65+
containerClientMock
66+
.Setup(c => c.GetBlobsAsync(
67+
It.IsAny<BlobTraits>(),
68+
It.IsAny<BlobStates>(),
69+
It.IsAny<string>(),
70+
It.IsAny<CancellationToken>()))
71+
.Returns(pageable);
72+
73+
blobServiceClientMock
74+
.Setup(c => c.GetBlobContainerClient("$logs"))
75+
.Returns(containerClientMock.Object);
76+
77+
var listener = new BlobLogListener(blobServiceClientMock.Object, NullLogger<BlobListener>.Instance);
78+
79+
// Act
80+
bool result = await listener.HasBlobWritesAsync(CancellationToken.None, hoursWindow: 1);
81+
82+
// Assert
83+
Assert.AreEqual(expected, result);
84+
}
85+
1286
[Test]
1387
public void GetPathsForValidBlobWrites_Returns_ValidBlobWritesOnly()
1488
{
1589
StorageAnalyticsLogEntry[] entries = new[]
1690
{
17-
// This is a valid write entry with a valid path
1891
new StorageAnalyticsLogEntry
1992
{
2093
ServiceType = StorageServiceType.Blob,
2194
OperationType = StorageServiceOperationType.PutBlob,
2295
RequestedObjectKey = @"/storagesample/sample-container/""0x8D199A96CB71468""/sample-blob.txt"
2396
},
24-
25-
// This is an invalid path and will be filtered out
2697
new StorageAnalyticsLogEntry
2798
{
2899
ServiceType = StorageServiceType.Blob,
29100
OperationType = StorageServiceOperationType.PutBlob,
30101
RequestedObjectKey = "/"
31102
},
32-
33-
// This does not constitute a write and will be filtered out
34103
new StorageAnalyticsLogEntry
35104
{
36105
ServiceType = StorageServiceType.Blob,
@@ -45,5 +114,52 @@ public void GetPathsForValidBlobWrites_Returns_ValidBlobWritesOnly()
45114
Assert.AreEqual("sample-container", singlePath.ContainerName);
46115
Assert.AreEqual(@"""0x8D199A96CB71468""/sample-blob.txt", singlePath.BlobName);
47116
}
117+
118+
private static class BlobItemFactory
119+
{
120+
private static readonly Type BlobItemType = typeof(BlobItem);
121+
private static readonly System.Reflection.PropertyInfo NameProp = BlobItemType.GetProperty(nameof(BlobItem.Name))!;
122+
private static readonly System.Reflection.PropertyInfo MetadataProp = BlobItemType.GetProperty(nameof(BlobItem.Metadata))!;
123+
124+
public static BlobItem Create(string name, IDictionary<string, string> metadata)
125+
{
126+
var ctor = BlobItemType.GetConstructor(BindingFlags.Instance | BindingFlags.NonPublic, binder: null, types: Type.EmptyTypes, modifiers: null)
127+
?? throw new InvalidOperationException("BlobItem internal constructor not found.");
128+
object raw = ctor.Invoke(null);
129+
130+
NameProp.SetValue(raw, name);
131+
var dict = new Dictionary<string, string>(metadata, StringComparer.OrdinalIgnoreCase);
132+
MetadataProp.SetValue(raw, dict);
133+
134+
return (BlobItem)raw;
135+
}
136+
}
137+
138+
private sealed class TestAsyncPageable<T> : AsyncPageable<T>
139+
{
140+
private readonly IReadOnlyList<Page<T>> _pages;
141+
142+
public TestAsyncPageable(IEnumerable<T> items)
143+
{
144+
var list = items.ToList();
145+
var responseMock = new Mock<Response>();
146+
responseMock.Setup(r => r.Status).Returns(200);
147+
responseMock.Setup(r => r.ClientRequestId).Returns(Guid.NewGuid().ToString());
148+
149+
_pages = new[]
150+
{
151+
Page<T>.FromValues(list, continuationToken: null, response: responseMock.Object)
152+
};
153+
}
154+
155+
public override async IAsyncEnumerable<Page<T>> AsPages(string continuationToken = null, int? pageSizeHint = null)
156+
{
157+
foreach (var p in _pages)
158+
{
159+
yield return p;
160+
await Task.Yield();
161+
}
162+
}
163+
}
48164
}
49-
}
165+
}

0 commit comments

Comments
 (0)