Skip to content

Tidy up Bulk Copy unmatched column name work #3205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -532,207 +532,219 @@ private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet i

string[] parts = MultipartIdentifier.ParseMultipartIdentifier(DestinationTableName, "[\"", "]\"", Strings.SQL_BulkCopyDestinationTableName, true);
updateBulkCommandText.AppendFormat("insert bulk {0} (", ADP.BuildMultiPartName(parts));
int nmatched = 0; // Number of columns that match and are accepted
int nrejected = 0; // Number of columns that match but were rejected
bool rejectColumn; // True if a column is rejected because of an excluded type

bool isInTransaction;

isInTransaction = _connection.HasLocalTransaction;
// Throw if there is a transaction but no flag is set
if (isInTransaction && _externalTransaction == null && _internalTransaction == null && (_connection.Parser != null && _connection.Parser.CurrentTransaction != null && _connection.Parser.CurrentTransaction.IsLocal))
if (_connection.HasLocalTransaction
&& _externalTransaction == null
&& _internalTransaction == null
&& _connection.Parser != null
&& _connection.Parser.CurrentTransaction != null
&& _connection.Parser.CurrentTransaction.IsLocal)
{
throw SQL.BulkLoadExistingTransaction();
}

HashSet<string> destColumnNames = new HashSet<string>();

Dictionary<string, bool> columnMappingStatusLookup = new Dictionary<string, bool>();
// Loop over the metadata for each column
// Keep track of any result columns that we don't have a local
// mapping for.
HashSet<string> unmatchedColumns = new(_localColumnMappings.Count);

// Start by assuming all locally mapped Destination columns will be
// unmatched.
for (int i = 0; i < _localColumnMappings.Count; ++i)
{
unmatchedColumns.Add(_localColumnMappings[i].DestinationColumn);
}

// Flag to remember whether or not we need to append a comma before
// the next column in the command text.
bool appendComma = false;

// Loop over the metadata for each result column.
_SqlMetaDataSet metaDataSet = internalResults[MetaDataResultId].MetaData;
_sortedColumnMappings = new List<_ColumnMapping>(metaDataSet.Length);
for (int i = 0; i < metaDataSet.Length; i++)
{
_SqlMetaData metadata = metaDataSet[i];
rejectColumn = false;

// Check for excluded types
if ((metadata.type == SqlDbType.Timestamp)
|| ((metadata.IsIdentity) && !IsCopyOption(SqlBulkCopyOptions.KeepIdentity)))
bool matched = false;
bool rejected = false;

// Look for a local match for the remote column.
for (int j = 0; j < _localColumnMappings.Count; ++j)
{
// Remove metadata for excluded columns
metaDataSet[i] = null;
rejectColumn = true;
// We still need to find a matching column association
}
var localColumn = _localColumnMappings[j];

// Find out if this column is associated
int assocId;
for (assocId = 0; assocId < _localColumnMappings.Count; assocId++)
{
if (!columnMappingStatusLookup.ContainsKey(_localColumnMappings[assocId].DestinationColumn))
{
columnMappingStatusLookup.Add(_localColumnMappings[assocId].DestinationColumn, false);
}
// Are we missing a mapping between the result column and
// this local column (by ordinal or name)?
if (localColumn._destinationColumnOrdinal != metadata.ordinal
&& UnquotedName(localColumn._destinationColumnName) != metadata.column)
{
// Yes, so move on to the next local column.
continue;
}

if ((_localColumnMappings[assocId]._destinationColumnOrdinal == metadata.ordinal) ||
(UnquotedName(_localColumnMappings[assocId]._destinationColumnName) == metadata.column))
// Ok, we found a matching local column.
matched = true;

// Remove it from our unmatched set.
unmatchedColumns.Remove(localColumn.DestinationColumn);

// Check for column types that we refuse to bulk load, even
// though we found a match.
//
// We will not process timestamp or identity columns.
//
if (metadata.type == SqlDbType.Timestamp
|| (metadata.IsIdentity && !IsCopyOption(SqlBulkCopyOptions.KeepIdentity)))
{
columnMappingStatusLookup[_localColumnMappings[assocId].DestinationColumn] = true;
rejected = true;
break;
}

if (rejectColumn)
{
nrejected++; // Count matched columns only
break;
}
_sortedColumnMappings.Add(new _ColumnMapping(localColumn._internalSourceColumnOrdinal, metadata));
destColumnNames.Add(metadata.column);

_sortedColumnMappings.Add(new _ColumnMapping(_localColumnMappings[assocId]._internalSourceColumnOrdinal, metadata));
destColumnNames.Add(metadata.column);
nmatched++;
// Append a comma for each subsequent column.
if (appendComma)
{
updateBulkCommandText.Append(", ");
}
else
{
appendComma = true;
}

if (nmatched > 1)
{
updateBulkCommandText.Append(", "); // A leading comma for all but the first one
}
// Some datatypes need special handling ...
if (metadata.type == SqlDbType.Variant)
{
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "sql_variant");
}
else if (metadata.type == SqlDbType.Udt)
{
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "varbinary");
}
else if (metadata.type == SqlDbTypeExtensions.Json)
{
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "json");
}
else
{
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, metadata.type.ToString());
}

// Some datatypes need special handling ...
if (metadata.type == SqlDbType.Variant)
{
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "sql_variant");
}
else if (metadata.type == SqlDbType.Udt)
{
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "varbinary");
}
else if (metadata.type == SqlDbTypeExtensions.Json)
{
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "json");
}
else
switch (metadata.metaType.NullableType)
{
case TdsEnums.SQLNUMERICN:
case TdsEnums.SQLDECIMALN:
// Decimal and numeric need to include precision and scale
updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0},{1})", metadata.precision, metadata.scale);
break;
case TdsEnums.SQLUDT:
{
AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, metadata.type.ToString());
if (metadata.IsLargeUdt)
{
updateBulkCommandText.Append("(max)");
}
else
{
int size = metadata.length;
updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", size);
}
break;
}

switch (metadata.metaType.NullableType)
case TdsEnums.SQLTIME:
case TdsEnums.SQLDATETIME2:
case TdsEnums.SQLDATETIMEOFFSET:
// date, dateime2, and datetimeoffset need to include scale
updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", metadata.scale);
break;
default:
{
case TdsEnums.SQLNUMERICN:
case TdsEnums.SQLDECIMALN:
// Decimal and numeric need to include precision and scale
updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0},{1})", metadata.precision, metadata.scale);
break;
case TdsEnums.SQLUDT:
{
if (metadata.IsLargeUdt)
{
updateBulkCommandText.Append("(max)");
}
else
{
int size = metadata.length;
updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", size);
}
break;
}
case TdsEnums.SQLTIME:
case TdsEnums.SQLDATETIME2:
case TdsEnums.SQLDATETIMEOFFSET:
// date, dateime2, and datetimeoffset need to include scale
updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", metadata.scale);
break;
default:
// For non-long non-fixed types we need to add the Size
if (!metadata.metaType.IsFixed && !metadata.metaType.IsLong)
{
int size = metadata.length;
switch (metadata.metaType.NullableType)
{
// For non-long non-fixed types we need to add the Size
if (!metadata.metaType.IsFixed && !metadata.metaType.IsLong)
{
int size = metadata.length;
switch (metadata.metaType.NullableType)
{
case TdsEnums.SQLNCHAR:
case TdsEnums.SQLNVARCHAR:
case TdsEnums.SQLNTEXT:
size /= 2;
break;
default:
break;
}
updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", size);
}
else if (metadata.metaType.IsPlp && metadata.metaType.SqlDbType != SqlDbType.Xml && metadata.metaType.SqlDbType != SqlDbTypeExtensions.Json)
{
// Partial length column prefix (max)
updateBulkCommandText.Append("(max)");
}
break;
case TdsEnums.SQLNCHAR:
case TdsEnums.SQLNVARCHAR:
case TdsEnums.SQLNTEXT:
size /= 2;
break;
default:
break;
}
updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", size);
}
else if (metadata.metaType.IsPlp && metadata.metaType.SqlDbType != SqlDbType.Xml && metadata.metaType.SqlDbType != SqlDbTypeExtensions.Json)
{
// Partial length column prefix (max)
updateBulkCommandText.Append("(max)");
}
break;
}
}

// Get collation for column i
Result rowset = internalResults[CollationResultId];
object rowvalue = rowset[i][CollationId];
// Get collation for column i
Result rowset = internalResults[CollationResultId];
object rowvalue = rowset[i][CollationId];

bool shouldSendCollation;
switch (metadata.type)
{
case SqlDbType.Char:
case SqlDbType.NChar:
case SqlDbType.VarChar:
case SqlDbType.NVarChar:
case SqlDbType.Text:
case SqlDbType.NText:
shouldSendCollation = true;
break;

default:
shouldSendCollation = false;
break;
}
bool shouldSendCollation;
switch (metadata.type)
{
case SqlDbType.Char:
case SqlDbType.NChar:
case SqlDbType.VarChar:
case SqlDbType.NVarChar:
case SqlDbType.Text:
case SqlDbType.NText:
shouldSendCollation = true;
break;

if (rowvalue != null && shouldSendCollation)
default:
shouldSendCollation = false;
break;
}

if (rowvalue != null && shouldSendCollation)
{
Debug.Assert(rowvalue is SqlString);
SqlString collation_name = (SqlString)rowvalue;
if (!collation_name.IsNull)
{
Debug.Assert(rowvalue is SqlString);
SqlString collation_name = (SqlString)rowvalue;
if (!collation_name.IsNull)
updateBulkCommandText.Append(" COLLATE " + collation_name.Value);
// Compare collations only if the collation value was set on the metadata
if (_sqlDataReaderRowSource != null && metadata.collation != null)
{
updateBulkCommandText.Append(" COLLATE " + collation_name.Value);
// Compare collations only if the collation value was set on the metadata
if (_sqlDataReaderRowSource != null && metadata.collation != null)
// On SqlDataReader we can verify the sourcecolumn collation!
int sourceColumnId = localColumn._internalSourceColumnOrdinal;
int destinationLcid = metadata.collation.LCID;
int sourceLcid = _sqlDataReaderRowSource.GetLocaleId(sourceColumnId);
if (sourceLcid != destinationLcid)
{
// On SqlDataReader we can verify the sourcecolumn collation!
int sourceColumnId = _localColumnMappings[assocId]._internalSourceColumnOrdinal;
int destinationLcid = metadata.collation.LCID;
int sourceLcid = _sqlDataReaderRowSource.GetLocaleId(sourceColumnId);
if (sourceLcid != destinationLcid)
{
throw SQL.BulkLoadLcidMismatch(sourceLcid, _sqlDataReaderRowSource.GetName(sourceColumnId), destinationLcid, metadata.column);
}
throw SQL.BulkLoadLcidMismatch(sourceLcid, _sqlDataReaderRowSource.GetName(sourceColumnId), destinationLcid, metadata.column);
}
}
}
break;
}

// We found a match, so no need to keep looking.
break;
}

if (assocId == _localColumnMappings.Count)
// Remove metadata for unmatched and rejected columns.
if (! matched || rejected)
{
// Remove metadata for unmatched columns
metaDataSet[i] = null;
}
}

// All columnmappings should have matched up
if (nmatched + nrejected != _localColumnMappings.Count)
// Do we have any unmatched columns?
if (unmatchedColumns.Count > 0)
{
List<string> unmatchedColumns = new List<string>();

foreach(KeyValuePair<string, bool> keyValuePair in columnMappingStatusLookup)
{
if (!keyValuePair.Value)
{
unmatchedColumns.Add(keyValuePair.Key);
}
}

throw SQL.BulkLoadNonMatchingColumnName(unmatchedColumns);
throw SQL.BulkLoadNonMatchingColumnNames(unmatchedColumns);
}

updateBulkCommandText.Append(")");
Expand Down
Loading
Loading