Skip to content

Commit 7f39476

Browse files
authored
Revert "Implementing lightweight detection for new write operations (#52977)" (#53077)
This reverts commit 7fe1f68.
1 parent 2fb20f7 commit 7f39476

File tree

3 files changed

+27
-174
lines changed

3 files changed

+27
-174
lines changed

sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobLogListener.cs

Lines changed: 1 addition & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ internal class BlobLogListener
2929
private readonly StorageAnalyticsLogParser _parser;
3030
private readonly ILogger<BlobListener> _logger;
3131

32-
public BlobLogListener(BlobServiceClient blobClient, ILogger<BlobListener> logger)
32+
private BlobLogListener(BlobServiceClient blobClient, ILogger<BlobListener> logger)
3333
{
3434
_blobClient = blobClient;
3535

@@ -77,53 +77,6 @@ public async Task<IEnumerable<BlobWithContainer<BlobBaseClient>>> GetRecentBlobW
7777
return blobs;
7878
}
7979

80-
public async Task<bool> HasBlobWritesAsync(CancellationToken cancellationToken, int hoursWindow = DefaultScanHoursWindow)
81-
{
82-
if (hoursWindow <= 0)
83-
{
84-
return false;
85-
}
86-
87-
DateTime hourCursor = DateTime.UtcNow;
88-
BlobContainerClient containerClient = _blobClient.GetBlobContainerClient(LogContainer);
89-
90-
int processedCount = 0;
91-
92-
for (int hourIndex = 0; hourIndex < hoursWindow; hourIndex++)
93-
{
94-
cancellationToken.ThrowIfCancellationRequested();
95-
96-
string prefix = GetSearchPrefix("blob", hourCursor, hourCursor);
97-
98-
await foreach (var page in containerClient
99-
.GetBlobsAsync(traits: BlobTraits.Metadata, prefix: prefix, states: BlobStates.None, cancellationToken: cancellationToken)
100-
.AsPages(pageSizeHint: 200)
101-
.ConfigureAwait(false))
102-
{
103-
cancellationToken.ThrowIfCancellationRequested();
104-
105-
foreach (BlobItem blob in page.Values)
106-
{
107-
// Increment only when we actually look at a blob item.
108-
processedCount++;
109-
110-
// Examine metadata only if present.
111-
if (blob.Metadata is not null &&
112-
blob.Metadata.TryGetValue(LogType, out string logType) &&
113-
!string.IsNullOrEmpty(logType) &&
114-
logType.IndexOf("write", StringComparison.OrdinalIgnoreCase) >= 0)
115-
{
116-
return true;
117-
}
118-
}
119-
}
120-
121-
hourCursor = hourCursor.AddHours(-1);
122-
}
123-
124-
return false;
125-
}
126-
12780
internal static IEnumerable<BlobPath> GetPathsForValidBlobWrites(IEnumerable<StorageAnalyticsLogEntry> entries)
12881
{
12982
IEnumerable<BlobPath> parsedBlobPaths = from entry in entries

sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobScalerMonitorProvider.cs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ public ZeroToOneScaleMonitor(string functionId, BlobServiceClient blobServiceCli
6363
_logger = loggerFactory.CreateLogger<ZeroToOneScaleMonitor>();
6464
}
6565

66+
#pragma warning disable 0649
67+
// For tests, in PROD the value is always null
68+
private BlobWithContainer<BlobBaseClient> _recentWrite;
69+
#pragma warning restore 0649
70+
6671
public ScaleMonitorDescriptor Descriptor => _scaleMonitorDescriptor;
6772

6873
public async Task<ScaleMetrics> GetMetricsAsync()
@@ -72,15 +77,26 @@ public async Task<ScaleMetrics> GetMetricsAsync()
7277
// if new blob were detected we want to GetScaleStatus return scale out vote at least once
7378
if (Interlocked.Equals(_threadSafeWritesDetectedValue, 1))
7479
{
75-
_logger.LogInformation($"Recent writes were detected but GetScaleStatus was not called. Waiting GetScaleStatus to call.");
80+
_logger.LogInformation($"New writes were detectd but GetScaleStatus was not called. Waiting GetScaleStatus to call.");
7681
return new ScaleMetrics();
7782
}
7883

7984
var blobLogListener = await _blobLogListener.Value.ConfigureAwait(false);
80-
bool hasBlobWrites = await blobLogListener.HasBlobWritesAsync(CancellationToken.None).ConfigureAwait(false);
81-
if (hasBlobWrites)
85+
BlobWithContainer<BlobBaseClient>[] recentWrites = _recentWrite == null ? (await blobLogListener.GetRecentBlobWritesAsync(CancellationToken.None).ConfigureAwait(false)).ToArray()
86+
: new BlobWithContainer<BlobBaseClient>[] { _recentWrite };
87+
if (recentWrites.Length > 0)
8288
{
83-
_logger.LogInformation($"Recent writes were detected for '{_scaleMonitorDescriptor.FunctionId}'");
89+
StringBuilder stringBuilder = new StringBuilder();
90+
foreach (var write in recentWrites)
91+
{
92+
stringBuilder.Append($"'{write.BlobClient.Name}', ");
93+
if (stringBuilder.Length > 1000)
94+
{
95+
stringBuilder.Append("[truncated]");
96+
break;
97+
}
98+
}
99+
_logger.LogInformation($"'{recentWrites.Length}' recent writes were detected for '{_scaleMonitorDescriptor.FunctionId}': {stringBuilder}");
84100
Interlocked.CompareExchange(ref _threadSafeWritesDetectedValue, 1, 0);
85101
}
86102
else
Lines changed: 6 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -1,105 +1,36 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT License.
33

4-
using System;
54
using System.Collections.Generic;
65
using System.Linq;
7-
using System.Reflection;
8-
using System.Threading;
9-
using System.Threading.Tasks;
10-
using Azure;
11-
using Azure.Storage.Blobs;
12-
using Azure.Storage.Blobs.Models;
13-
using Microsoft.Extensions.Logging;
14-
using Microsoft.Extensions.Logging.Abstractions;
15-
using Moq;
166
using NUnit.Framework;
177

188
namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners
199
{
2010
public class BlobLogListenerTests
2111
{
22-
[TestCase("write", 1, null, 0, true, TestName = "HasBlobWritesAsync_WriteLogPresent_ReturnsTrue")]
23-
[TestCase("read", 1, null, 0, false, TestName = "HasBlobWritesAsync_NonWriteLogPresent_ReturnsFalse")]
24-
[TestCase(null, 1, null, 0, false, TestName = "HasBlobWritesAsync_NoBlobs_ReturnsFalse")]
25-
[TestCase("read", 100, "write", 1, true, TestName = "HasBlobWritesAsync_WriteLogPresentMultipleLogBlobs_ReturnsTrue")]
26-
[TestCase("delete", 100, "read", 100, false, TestName = "HasBlobWritesAsync_NonWriteLogPresentMultipleLogBlobs_ReturnsFalse")]
27-
public async Task HasBlobWritesAsync_VariousCases(string logType1, int logType1Count, string logType2, int logType2Count, bool expected)
28-
{
29-
// Arrange
30-
var blobServiceClientMock = new Mock<BlobServiceClient>(MockBehavior.Strict);
31-
var containerClientMock = new Mock<BlobContainerClient>(MockBehavior.Strict);
32-
33-
TestAsyncPageable<BlobItem> pageable;
34-
var blobItems = new List<BlobItem>();
35-
if (logType1 != null)
36-
{
37-
for (int i = 0; i < logType1Count; i++)
38-
{
39-
var blobItem = BlobItemFactory.Create(
40-
name: Guid.NewGuid().ToString(),
41-
metadata: new Dictionary<string, string> { { "LogType", logType1 } });
42-
blobItems.Add(blobItem);
43-
}
44-
}
45-
if (logType2 != null)
46-
{
47-
for (int i = 0; i < logType2Count; i++)
48-
{
49-
var blobItem = BlobItemFactory.Create(
50-
name: Guid.NewGuid().ToString(),
51-
metadata: new Dictionary<string, string> { { "LogType", logType2 } });
52-
blobItems.Add(blobItem);
53-
}
54-
}
55-
56-
if (blobItems.Count == 0)
57-
{
58-
pageable = new TestAsyncPageable<BlobItem>(Enumerable.Empty<BlobItem>());
59-
}
60-
else
61-
{
62-
pageable = new TestAsyncPageable<BlobItem>(blobItems);
63-
}
64-
65-
containerClientMock
66-
.Setup(c => c.GetBlobsAsync(
67-
It.IsAny<BlobTraits>(),
68-
It.IsAny<BlobStates>(),
69-
It.IsAny<string>(),
70-
It.IsAny<CancellationToken>()))
71-
.Returns(pageable);
72-
73-
blobServiceClientMock
74-
.Setup(c => c.GetBlobContainerClient("$logs"))
75-
.Returns(containerClientMock.Object);
76-
77-
var listener = new BlobLogListener(blobServiceClientMock.Object, NullLogger<BlobListener>.Instance);
78-
79-
// Act
80-
bool result = await listener.HasBlobWritesAsync(CancellationToken.None, hoursWindow: 1);
81-
82-
// Assert
83-
Assert.AreEqual(expected, result);
84-
}
85-
8612
[Test]
8713
public void GetPathsForValidBlobWrites_Returns_ValidBlobWritesOnly()
8814
{
8915
StorageAnalyticsLogEntry[] entries = new[]
9016
{
17+
// This is a valid write entry with a valid path
9118
new StorageAnalyticsLogEntry
9219
{
9320
ServiceType = StorageServiceType.Blob,
9421
OperationType = StorageServiceOperationType.PutBlob,
9522
RequestedObjectKey = @"/storagesample/sample-container/""0x8D199A96CB71468""/sample-blob.txt"
9623
},
24+
25+
// This is an invalid path and will be filtered out
9726
new StorageAnalyticsLogEntry
9827
{
9928
ServiceType = StorageServiceType.Blob,
10029
OperationType = StorageServiceOperationType.PutBlob,
10130
RequestedObjectKey = "/"
10231
},
32+
33+
// This does not constitute a write and will be filtered out
10334
new StorageAnalyticsLogEntry
10435
{
10536
ServiceType = StorageServiceType.Blob,
@@ -114,52 +45,5 @@ public void GetPathsForValidBlobWrites_Returns_ValidBlobWritesOnly()
11445
Assert.AreEqual("sample-container", singlePath.ContainerName);
11546
Assert.AreEqual(@"""0x8D199A96CB71468""/sample-blob.txt", singlePath.BlobName);
11647
}
117-
118-
private static class BlobItemFactory
119-
{
120-
private static readonly Type BlobItemType = typeof(BlobItem);
121-
private static readonly System.Reflection.PropertyInfo NameProp = BlobItemType.GetProperty(nameof(BlobItem.Name))!;
122-
private static readonly System.Reflection.PropertyInfo MetadataProp = BlobItemType.GetProperty(nameof(BlobItem.Metadata))!;
123-
124-
public static BlobItem Create(string name, IDictionary<string, string> metadata)
125-
{
126-
var ctor = BlobItemType.GetConstructor(BindingFlags.Instance | BindingFlags.NonPublic, binder: null, types: Type.EmptyTypes, modifiers: null)
127-
?? throw new InvalidOperationException("BlobItem internal constructor not found.");
128-
object raw = ctor.Invoke(null);
129-
130-
NameProp.SetValue(raw, name);
131-
var dict = new Dictionary<string, string>(metadata, StringComparer.OrdinalIgnoreCase);
132-
MetadataProp.SetValue(raw, dict);
133-
134-
return (BlobItem)raw;
135-
}
136-
}
137-
138-
private sealed class TestAsyncPageable<T> : AsyncPageable<T>
139-
{
140-
private readonly IReadOnlyList<Page<T>> _pages;
141-
142-
public TestAsyncPageable(IEnumerable<T> items)
143-
{
144-
var list = items.ToList();
145-
var responseMock = new Mock<Response>();
146-
responseMock.Setup(r => r.Status).Returns(200);
147-
responseMock.Setup(r => r.ClientRequestId).Returns(Guid.NewGuid().ToString());
148-
149-
_pages = new[]
150-
{
151-
Page<T>.FromValues(list, continuationToken: null, response: responseMock.Object)
152-
};
153-
}
154-
155-
public override async IAsyncEnumerable<Page<T>> AsPages(string continuationToken = null, int? pageSizeHint = null)
156-
{
157-
foreach (var p in _pages)
158-
{
159-
yield return p;
160-
await Task.Yield();
161-
}
162-
}
163-
}
16448
}
165-
}
49+
}

0 commit comments

Comments
 (0)