Try to fix database sync with Antigravity.

This commit is contained in:
Creeper Lv
2026-06-01 06:38:20 +10:00
parent b263291dc2
commit c60540fb2b
4 changed files with 261 additions and 24 deletions
+143 -19
View File
@@ -112,9 +112,8 @@ public static class SyncEndpoints
var nodeKeys = await db.NodeKeys.ToListAsync();
var pendingShares = await db.PendingShares.ToListAsync();
var aclEntries = await db.AclEntries.ToListAsync();
var peerServers = await db.PeerServers.ToListAsync();
var dump = new DatabaseDumpPayload(users, nodes, nodeKeys, pendingShares, aclEntries, peerServers);
var dump = new DatabaseDumpPayload(users, nodes, nodeKeys, pendingShares, aclEntries);
return Results.Ok(dump);
});
@@ -234,6 +233,10 @@ public static class SyncEndpoints
db.Nodes.Add(item.Node);
db.AclEntries.AddRange(item.AclEntries);
db.NodeKeys.AddRange(item.NodeKeys);
if (item.PendingShares != null)
{
db.PendingShares.AddRange(item.PendingShares);
}
updated = true;
}
else if (localNode.UpdatedAt < item.Node.UpdatedAt)
@@ -253,6 +256,13 @@ public static class SyncEndpoints
db.NodeKeys.RemoveRange(existingKeys);
db.NodeKeys.AddRange(item.NodeKeys);
var existingShares = await db.PendingShares.Where(s => s.NodeUuid == item.Node.Uuid).ToListAsync();
db.PendingShares.RemoveRange(existingShares);
if (item.PendingShares != null)
{
db.PendingShares.AddRange(item.PendingShares);
}
updated = true;
}
@@ -273,6 +283,59 @@ public static class SyncEndpoints
return Results.Ok(new { message = "Broadcast processed.", updated });
});
// Receive broadcast of user registration or password updates
routes.MapPost("/api/sync/broadcast-user", async (UserBroadcastPayload payload, HttpContext context, ServerDbContext db, PeerCache peerCache, RsaKeyManager rsaKeyManager, HttpClient httpClient) =>
{
if (!AuthHelper.IsServerTokenValid(context, peerCache))
{
return Results.Unauthorized();
}
var broadcastId = payload.BroadcastId;
if (!string.IsNullOrEmpty(broadcastId))
{
if (!peerCache.TryProcessBroadcast(broadcastId))
{
return Results.Ok(new { message = "User broadcast already processed (loop detected).", updated = false });
}
}
var existingUser = await db.Users.FirstOrDefaultAsync(u => u.Username.ToLower() == payload.User.Username.ToLower());
bool updated = false;
if (existingUser == null)
{
db.Users.Add(payload.User);
updated = true;
}
else
{
if (existingUser.PasswordHash != payload.User.PasswordHash || existingUser.EncryptedKey != payload.User.EncryptedKey)
{
existingUser.PasswordHash = payload.User.PasswordHash;
existingUser.EncryptedKey = payload.User.EncryptedKey;
updated = true;
}
}
if (updated)
{
await db.SaveChangesAsync();
// Relay the user broadcast to other peers
if (!string.IsNullOrEmpty(broadcastId))
{
_ = Task.Run(async () =>
{
var configuration = context.RequestServices.GetRequiredService<IConfiguration>();
await BroadcastUserChangeAsync(payload.User.Username, db, peerCache, rsaKeyManager, configuration, httpClient, broadcastId);
});
}
}
return Results.Ok(new { message = "User broadcast processed.", updated });
});
}
// Handshake: Tells the destination server who we are and registers us using RSA asymmetric verification
@@ -353,32 +416,33 @@ public static class SyncEndpoints
var acls = await db.AclEntries.Where(a => a.NodeUuid == nodeUuid).ToListAsync();
var keys = await db.NodeKeys.Where(k => k.NodeUuid == nodeUuid).ToListAsync();
var pendingShares = await db.PendingShares.Where(s => s.NodeUuid == nodeUuid).ToListAsync();
var bId = string.IsNullOrEmpty(broadcastId) ? Guid.NewGuid().ToString() : broadcastId;
// Register it locally in PeerCache so we don't process it ourselves if it loops back
peerCache.TryProcessBroadcast(bId);
var payload = new SyncPushItem(node, acls, keys, bId);
var payload = new SyncPushItem(node, acls, keys, bId, pendingShares);
var json = JsonSerializer.Serialize(payload);
// Fetch database configured peers (upstream destination)
var dbPeers = await db.PeerServers.Select(p => p.Url).ToListAsync();
// Fetch dynamically handshake-registered in-memory peers (downstream child nodes)
var dynamicPeers = peerCache.GetPeers();
// Combine peers
var allPeers = dbPeers.Union(dynamicPeers, StringComparer.OrdinalIgnoreCase).ToList();
var allPeers = peerCache.GetPeers();
var localUrl = configuration["Sync:LocalServerUrl"] ?? "";
var destUrl = configuration["Sync:DestinationServerUrl"] ?? "";
var localUrl = (configuration["Sync:LocalServerUrl"] ?? "").Trim().TrimEnd('/');
var destUrl = (configuration["Sync:DestinationServerUrl"] ?? "").Trim().TrimEnd('/');
foreach (var peerUrl in allPeers)
if (!string.IsNullOrEmpty(destUrl) && !allPeers.Contains(destUrl, StringComparer.OrdinalIgnoreCase))
{
allPeers.Add(destUrl);
}
foreach (var rawPeerUrl in allPeers)
{
var peerUrl = rawPeerUrl.Trim().TrimEnd('/');
try
{
var request = new HttpRequestMessage(HttpMethod.Post, $"{peerUrl.TrimEnd('/')}/api/sync/broadcast");
var request = new HttpRequestMessage(HttpMethod.Post, $"{peerUrl}/api/sync/broadcast");
request.Content = new StringContent(json, System.Text.Encoding.UTF8, "application/json");
// Attach custom headers for secure authentication
@@ -412,6 +476,67 @@ public static class SyncEndpoints
}
}
public static async Task BroadcastUserChangeAsync(string username, ServerDbContext db, PeerCache peerCache, RsaKeyManager rsaKeyManager, IConfiguration configuration, HttpClient httpClient, string? broadcastId = null)
{
var user = await db.Users.FirstOrDefaultAsync(u => u.Username.ToLower() == username.ToLower());
if (user == null) return;
var bId = string.IsNullOrEmpty(broadcastId) ? Guid.NewGuid().ToString() : broadcastId;
// Register it locally in PeerCache so we don't process it ourselves if it loops back
peerCache.TryProcessBroadcast(bId);
var payload = new UserBroadcastPayload(user, bId);
var json = JsonSerializer.Serialize(payload);
// Fetch dynamically handshake-registered in-memory peers (downstream child nodes)
var allPeers = peerCache.GetPeers();
var localUrl = (configuration["Sync:LocalServerUrl"] ?? "").Trim().TrimEnd('/');
var destUrl = (configuration["Sync:DestinationServerUrl"] ?? "").Trim().TrimEnd('/');
if (!string.IsNullOrEmpty(destUrl) && !allPeers.Contains(destUrl, StringComparer.OrdinalIgnoreCase))
{
allPeers.Add(destUrl);
}
foreach (var rawPeerUrl in allPeers)
{
var peerUrl = rawPeerUrl.Trim().TrimEnd('/');
try
{
var request = new HttpRequestMessage(HttpMethod.Post, $"{peerUrl}/api/sync/broadcast-user");
request.Content = new StringContent(json, System.Text.Encoding.UTF8, "application/json");
// Attach custom headers for secure authentication
request.Headers.Add("X-Server-Url", localUrl);
if (string.Equals(peerUrl, destUrl, StringComparison.OrdinalIgnoreCase))
{
if (!string.IsNullOrEmpty(UpstreamSessionToken))
{
request.Headers.Add("X-Server-Token", UpstreamSessionToken);
}
}
else
{
var token = peerCache.GetToken(peerUrl);
if (!string.IsNullOrEmpty(token))
{
request.Headers.Add("X-Server-Token", token);
}
}
var response = await httpClient.SendAsync(request);
Console.WriteLine($"User broadcast to {peerUrl}: {response.StatusCode}");
}
catch (Exception ex)
{
Console.WriteLine($"Failed to broadcast user change to {peerUrl}: {ex.Message}");
}
}
}
// Periodically pings downstream peer servers in PeerCache to ensure they are online and authentic
public static async Task StartHeartbeatPingerAsync(PeerCache peerCache, RsaKeyManager rsaKeyManager, HttpClient httpClient, System.Threading.CancellationToken token)
{
@@ -607,7 +732,6 @@ public static class SyncEndpoints
db.NodeKeys.RemoveRange(await db.NodeKeys.ToListAsync());
db.PendingShares.RemoveRange(await db.PendingShares.ToListAsync());
db.AclEntries.RemoveRange(await db.AclEntries.ToListAsync());
db.PeerServers.RemoveRange(await db.PeerServers.ToListAsync());
await db.SaveChangesAsync();
db.Users.AddRange(dump.Users);
@@ -615,7 +739,6 @@ public static class SyncEndpoints
db.NodeKeys.AddRange(dump.NodeKeys);
db.PendingShares.AddRange(dump.PendingShares);
db.AclEntries.AddRange(dump.AclEntries);
db.PeerServers.AddRange(dump.PeerServers);
await db.SaveChangesAsync();
Console.WriteLine("Full database pull bootstrap completed successfully!");
@@ -641,13 +764,14 @@ public record DatabaseDumpPayload(
List<Node> Nodes,
List<NodeKey> NodeKeys,
List<PendingShare> PendingShares,
List<AclEntry> AclEntries,
List<PeerServer> PeerServers
List<AclEntry> AclEntries
);
public record SyncNodeState(string Uuid, DateTime UpdatedAt, bool IsDeleted);
public record SyncExchangeRequest(List<SyncNodeState> Items);
public record SyncPushItem(Node Node, List<AclEntry> AclEntries, List<NodeKey> NodeKeys, string? BroadcastId = null);
public record SyncPushItem(Node Node, List<AclEntry> AclEntries, List<NodeKey> NodeKeys, string? BroadcastId = null, List<PendingShare>? PendingShares = null);
public record SyncExchangeResponse(List<SyncPushItem> ToPush, List<string> ToRequest);
public record PingResponse(string PublicKey, string Timestamp, string Token);
public record UserBroadcastPayload(User User, string? BroadcastId = null);