1414import lombok .RequiredArgsConstructor ;
1515import lombok .extern .log4j .Log4j2 ;
1616import no .fintlabs .azure .*;
17+ import no .fintlabs .cache .FintCache ;
1718import no .fintlabs .kafka .ResourceGroup ;
1819import no .fintlabs .kafka .ResourceGroupMembership ;
1920import okhttp3 .Request ;
@@ -35,6 +36,21 @@ public class AzureClient {
3536 private final AzureUserExternalProducerService azureUserExternalProducerService ;
3637 private final AzureGroupProducerService azureGroupProducerService ;
3738 private final AzureGroupMembershipProducerService azureGroupMembershipProducerService ;
39+ private final FintCache <String , AzureUser > entraIdUserCache ;
40+ private final FintCache <String , AzureUserExternal > entraIdExternalUserCache ;
41+ //private final FintCache<String, Optional> resourceGroupMembershipCache;
42+ private final FintCache <String , AzureGroup > azureGroupCache ;
43+ private final FintCache <String , AzureGroupMembership > azureGroupMembershipCache ;
44+ AtomicInteger publishedMembers ;
45+
46+ @ Scheduled (cron = "${fint.kontroll.azure-ad-gateway.group-scheduler.clear-cache}" )
47+ public void clearCaches () {
48+ entraIdUserCache .clear ();
49+ entraIdExternalUserCache .clear ();
50+ azureGroupCache .clear ();
51+ azureGroupMembershipCache .clear ();
52+ log .info ("Caches for group, members and users has been reset to null due to scheduler. Next call will publish all users and groups from Entra ID to kafka" );
53+ }
3854
3955 @ Scheduled (
4056 initialDelayString = "${fint.kontroll.azure-ad-gateway.user-scheduler.pull.initial-delay-ms}" ,
@@ -63,31 +79,73 @@ private void pullAllUsers() {
6379 }
6480
6581 private void pageThroughUsers (UserCollectionPage inPage ) {
66- //inPageFuture.thenAccept(inPage -> {
67- int users = 0 ;
82+ AtomicInteger users = new AtomicInteger ();
83+ AtomicInteger changedUsers = new AtomicInteger ();
84+ AtomicInteger changedExtUsers = new AtomicInteger ();
6885
6986 UserCollectionPage page = inPage ;
7087 do {
7188 for (User user : page .getCurrentPage ()) {
72- users ++;
73- if (AzureUser .getAttributeValue (user , configUser .getExternaluserattribute ()) != null
74- && (AzureUser .getAttributeValue (user , configUser .getExternaluserattribute ()).equalsIgnoreCase (configUser .getExternaluservalue ()))) {
75- log .debug ("Adding external user to Kafka, {}" , user .userPrincipalName );
76- azureUserExternalProducerService .publish (new AzureUserExternal (user , configUser ));
89+ users .getAndIncrement ();
90+
91+ if (entraIdUserCache != null &&
92+ entraIdUserCache .containsKey (user .id )) {
93+ AzureUser entraIdUserObject = new AzureUser (user , configUser );
94+ if (entraIdUserObject .equals (entraIdUserCache .get (user .id ))) {
95+ log .debug ("User {} is unchanged. Skipping publishing to Kafka." , user .id );
96+ return ;
97+ }
98+ }
99+
100+ String externalUserAttribute = AzureUser .getAttributeValue (user , configUser .getExternaluserattribute ());
101+ if (externalUserAttribute != null
102+ && externalUserAttribute .equalsIgnoreCase (configUser .getExternaluservalue ())) {
103+ AzureUserExternal entraUserExtObject = new AzureUserExternal (user , configUser );
104+ if (entraIdExternalUserCache != null &&
105+ entraIdExternalUserCache .containsKey (user .id ) && entraUserExtObject .equals (entraIdExternalUserCache .get (user .id ))) {
106+ log .debug ("External User {} is unchanged. Skipping publishing to Kafka." , user .id );
107+ return ;
108+ } else {
109+ log .debug ("Publishing external user to Kafka: {}" , user .userPrincipalName );
110+ azureUserExternalProducerService .publish (new AzureUserExternal (user , configUser ));
111+ changedExtUsers .getAndIncrement ();
112+ entraIdExternalUserCache .put (user .id , new AzureUserExternal (user , configUser ));
113+ }
77114 } else {
78- log .debug ("Adding user to Kafka, {}" , user .userPrincipalName );
79- azureUserProducerService .publish (new AzureUser (user , configUser ));
115+ AzureUser azureuser = new AzureUser (user , configUser );
116+ if ((azureuser .getEmployeeId () != null && !azureuser .getEmployeeId ().isEmpty ()) ||
117+ (azureuser .getStudentId () != null && !azureuser .getStudentId ().isEmpty ())) {
118+ log .debug ("Publishing user to Kafka: {}" , user .userPrincipalName );
119+ azureUserProducerService .publish (azureuser );
120+ log .debug ("Updating cache for user: {}" , user .id );
121+ changedUsers .getAndIncrement ();
122+ entraIdUserCache .put (user .id , azureuser );
123+ } else {
124+ log .debug ("UserId: {} does not contain required employeeId or studentId. Not published to kafka" , user .id );
125+ }
80126 }
127+ // if (AzureUser.getAttributeValue(user, configUser.getExternaluserattribute()) != null
128+ // && (AzureUser.getAttributeValue(user, configUser.getExternaluserattribute()).equalsIgnoreCase(configUser.getExternaluservalue()))) {
129+ // log.debug("Adding external user to Kafka, {}", user.userPrincipalName);
130+ // azureUserExternalProducerService.publish(new AzureUserExternal(user, configUser));
131+ // } else {
132+ // log.debug("Adding user to Kafka, {}", user.userPrincipalName);
133+ // azureUserProducerService.publish(new AzureUser(user, configUser));
134+ // }
81135 }
82136 if (page .getNextPage () == null ) {
83137 break ;
84138 } else {
85- //log.info("Processing user page");
86139 page = page .getNextPage ().buildRequest ().get ();
87140 }
88141 } while (page != null );
89142 log .info ("*** <<< {} User objects detected in Microsoft Entra >>> ***" , users );
90- //});
143+ if (changedUsers .get () > 0 ) {
144+ log .info ("*** <<< {} Entra users published to kafka as they where different from user cache >>> ***" , changedUsers .get ());
145+ }
146+ if (changedExtUsers .get () > 0 ) {
147+ log .info ("*** <<< {} external users published to kafka as they where where different from external user cache >>> ***" , changedExtUsers .get ());
148+ }
91149 }
92150
93151 @ Scheduled (
@@ -97,6 +155,7 @@ private void pageThroughUsers(UserCollectionPage inPage) {
97155 public void pullAllGroups () {
98156 log .info ("*** <<< Fetching groups from Microsoft Entra >>> ***" );
99157 long startTime = System .currentTimeMillis ();
158+ publishedMembers = new AtomicInteger ();
100159
101160 try {
102161 CompletableFuture <GroupCollectionPage > initialPageFuture = graphService .groups ()
@@ -120,6 +179,7 @@ public void pullAllGroups() {
120179 long memberFetchMinutes = memberFetchElapsedTimeInSeconds / 60 ;
121180 long memberFetchSeconds = memberFetchElapsedTimeInSeconds % 60 ;
122181
182+
123183 log .info ("*** <<< Done fetching all group memberships from Microsoft Entra ID in {} minutes and {} seconds >>> ***" , memberFetchMinutes , memberFetchSeconds );
124184 return groupCount ;
125185 });
@@ -130,14 +190,25 @@ public void pullAllGroups() {
130190 } catch (ClientException e ) {
131191 log .error ("Failed when trying to get groups. " , e );
132192 }
193+ if (publishedMembers .get () > 0 ) {
194+ log .info ("*** <<< {} Entra group members published to kafka as they were not in membership cache >>> ***" , publishedMembers .get ());
195+ } else {
196+ log .info ("*** <<< All entra group members already in cache. No members published to kafka >>> ***" );
197+ }
133198 }
134199
200+
135201 private CompletableFuture <List <Group >> fetchAllGroups (GroupCollectionPage initialPage ) {
136202 List <Group > allGroups = new ArrayList <>();
137203 AtomicInteger groupCounter = new AtomicInteger (0 ); // Counter for groups
138204
139205 return fetchAllGroupsRecursive (initialPage , allGroups , groupCounter ).thenApply (v -> {
140- log .info ("*** <<< Found {} groups with suffix \" {}\" >>> ***" , groupCounter .get (), configGroup .getSuffix ());
206+ if (groupCounter .get () > 0 ) {
207+ log .info ("*** <<< Found {} groups with suffix \" {}\" not in cache, that were published to kafka >>> ***" , groupCounter .get (), configGroup .getSuffix ());
208+ }
209+ else {
210+ log .info ("*** <<< All groups already in cache. Not republishing to kafka >>> ***" );
211+ }
141212 return allGroups ;
142213 });
143214 }
@@ -146,9 +217,18 @@ private CompletableFuture<Void> fetchAllGroupsRecursive(GroupCollectionPage curr
146217 List <Group > currentPageGroups = currentPage .getCurrentPage ().stream ()
147218 .filter (group -> group .displayName != null && group .displayName .endsWith (configGroup .getSuffix ())&& (!group .additionalDataManager ().isEmpty () && group .additionalDataManager ().containsKey (configGroup .getFintkontrollidattribute ())))
148219 .peek (group -> {
149- groupCounter .incrementAndGet ();
150220 AzureGroup newGroup = new AzureGroup (group , configGroup );
151- azureGroupProducerService .publish (newGroup ); // Publish the group as soon as it is found
221+ if (azureGroupCache != null
222+ && azureGroupCache .containsKey (newGroup .getId ())
223+ && newGroup .equals (azureGroupCache .get (newGroup .getId ()))) {
224+ log .debug ("{} groupID allready published and in cache. Not replublished to kafka" , newGroup .getId ());
225+ }
226+ else
227+ {
228+ groupCounter .incrementAndGet ();
229+ azureGroupProducerService .publish (newGroup ); // Publish the group as soon as it is found
230+ azureGroupCache .put (newGroup .getId (), newGroup );
231+ }
152232 })
153233 .toList ();
154234 allGroups .addAll (currentPageGroups );
@@ -185,23 +265,36 @@ private CompletableFuture<Integer> fetchMembersForAllGroups(List<Group> groups)
185265
186266
187267 private CompletableFuture <Void > pageThroughAzureGroupAsync (AzureGroup azureGroup , DirectoryObjectCollectionWithReferencesPage inPage ) {
188- AtomicInteger members = new AtomicInteger (0 );
268+ AtomicInteger members = new AtomicInteger ();
269+
189270
190- return processPageAsync (azureGroup , inPage , members )
271+
272+ return processPageAsync (azureGroup , inPage , members , publishedMembers )
191273 .thenRun (() -> log .debug ("{} memberships detected in groupName {} with groupId {}" ,
192274 members .get (), azureGroup .getDisplayName (), azureGroup .getId ()));
193275 }
194276
195- private CompletableFuture <Void > processPageAsync (AzureGroup azureGroup , DirectoryObjectCollectionWithReferencesPage page , AtomicInteger members ) {
277+ private CompletableFuture <Void > processPageAsync (AzureGroup azureGroup , DirectoryObjectCollectionWithReferencesPage page , AtomicInteger members , AtomicInteger publishedMembers ) {
196278 if (page == null ) {
197279 return CompletableFuture .completedFuture (null );
198280 }
199281
200282 List <CompletableFuture <Void >> futures = page .getCurrentPage ().stream ()
201283 .map (member -> CompletableFuture .runAsync (() -> {
202284 members .incrementAndGet ();
203- azureGroupMembershipProducerService .publishAddedMembership (new AzureGroupMembership (azureGroup .getId (), member ));
204- log .debug ("Produced message to Kafka where userId: {} is member of groupId: {}" , member .id , azureGroup .getId ());
285+ AzureGroupMembership azureGroupMembership = new AzureGroupMembership (azureGroup .getId (), member );
286+ if (azureGroupMembershipCache != null
287+ && azureGroupMembershipCache .containsKey (azureGroupMembership .getId ())
288+ && azureGroupMembership .equals (azureGroupMembershipCache .get (azureGroupMembership .getId ())))
289+ {
290+ log .debug ("Skipping message to Kafka, as userId: {} is allready published as member of groupId: {}" , member .id , azureGroup .getId ());
291+ }
292+ else {
293+ azureGroupMembershipProducerService .publishAddedMembership (azureGroupMembership );
294+ azureGroupMembershipCache .put (azureGroupMembership .getId (), azureGroupMembership );
295+ publishedMembers .getAndIncrement ();
296+ log .debug ("Produced message to Kafka where userId: {} is member of groupId: {}" , member .id , azureGroup .getId ());
297+ }
205298 }))
206299 .toList ();
207300
@@ -211,7 +304,7 @@ private CompletableFuture<Void> processPageAsync(AzureGroup azureGroup, Director
211304
212305 return CompletableFuture .allOf (futures .toArray (new CompletableFuture [0 ]))
213306 .thenCompose (v -> nextPageFuture )
214- .thenCompose (nextPage -> processPageAsync (azureGroup , nextPage , members ));
307+ .thenCompose (nextPage -> processPageAsync (azureGroup , nextPage , members , publishedMembers ));
215308 }
216309// public void pullAllGroups() {
217310// log.info("*** <<< Fetching groups from Microsoft Entra >>> ***");
0 commit comments