Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions pkg/crosscluster/logical/create_logical_replication_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ SELECT
id AS job_id,
crdb_internal.pb_to_json(
'cockroach.sql.jobs.jobspb.Payload',
payload)->'logicalReplicationDetails'->>'parentId' AS parent_id
FROM crdb_internal.system_jobs
payload)->'logicalReplicationDetails'->>'parentId' AS parent_id
FROM crdb_internal.system_jobs
WHERE job_type = 'LOGICAL REPLICATION'
) AS t
WHERE t.parent_id = $1
Expand Down Expand Up @@ -359,10 +359,12 @@ func (r *ResolvedDestObjects) TargetDescription() string {
return targetDescription
}

// TargetTableNames returns the fully qualified names of the resolved target
// tables.
func (r *ResolvedDestObjects) TargetTableNames() []string {
var targetTableNames []string
targetTableNames := make([]string, len(r.TableNames))
for i := range r.TableNames {
targetTableNames = append(targetTableNames, r.TableNames[i].Table())
targetTableNames[i] = r.TableNames[i].FQString()
}
return targetTableNames
}
Expand Down
101 changes: 97 additions & 4 deletions pkg/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1727,12 +1727,12 @@ func GetReverseJobID(
var jobID jobspb.JobID
testutils.SucceedsSoon(t, func() error {
err := db.DB.QueryRowContext(ctx, `
SELECT id
FROM system.jobs
WHERE job_type = 'LOGICAL REPLICATION'
SELECT id
FROM system.jobs
WHERE job_type = 'LOGICAL REPLICATION'
AND id != $1
AND created > $2
ORDER BY created DESC
ORDER BY created DESC
LIMIT 1`,
parentID, created).Scan(&jobID)
if err != nil {
Expand Down Expand Up @@ -2905,3 +2905,96 @@ func TestGetWriterType(t *testing.T) {
require.Equal(t, sqlclustersettings.LDRWriterTypeSQL, wt)
})
}

func TestLogicalReplicationExternalConnWithoutDBName(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

server, s, dbA, dbB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
defer server.Stopper().Stop(ctx)

dbA.Exec(t, "CREATE TABLE a.public.foo (x INT PRIMARY KEY)")
dbA.Exec(t, "INSERT INTO a.public.foo SELECT * FROM generate_series(1, 10)")
dbA.Exec(t, "CREATE USER userA WITH PASSWORD '123'")
dbA.Exec(t, "GRANT REPLICATIONSOURCE, REPLICATIONDEST ON TABLE a.public.foo TO userA")
dbAURL := replicationtestutils.GetExternalConnectionURI(
t, s, s, serverutils.ClientCerts(false), serverutils.UserPassword("userA", "123"),
)

dbB.Exec(t, "CREATE USER userB WITH PASSWORD '123'")
dbB.Exec(t, "GRANT CREATE ON DATABASE b TO userB")
dbBURL := replicationtestutils.GetExternalConnectionURI(
t, s, s, serverutils.ClientCerts(false), serverutils.UserPassword("userB", "123"),
)

dbBAsUser := sqlutils.MakeSQLRunner(s.SQLConn(
t,
serverutils.DBName("b"),
serverutils.ClientCerts(false),
serverutils.UserPassword("userB", "123"),
))

var jobID jobspb.JobID
dbBAsUser.QueryRow(
t,
"CREATE LOGICALLY REPLICATED TABLE b.public.foo FROM TABLE a.public.foo ON $1 WITH BIDIRECTIONAL ON $2",
dbAURL.String(),
dbBURL.String(),
).Scan(&jobID)
WaitUntilReplicatedTime(t, s.Clock().Now(), dbB, jobID)

reverseJobID := GetReverseJobID(ctx, t, dbA, jobID)
WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, reverseJobID)
}

func TestLogicalReplicationCapitalTableName(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

server, s, dbA, dbB := setupLogicalTestServer(t, ctx, testClusterBaseClusterArgs, 1)
defer server.Stopper().Stop(ctx)

dbA.Exec(t, `CREATE TABLE a.public."Foo" (x INT PRIMARY KEY)`)
dbA.Exec(t, `INSERT INTO a.public."Foo" SELECT * FROM generate_series(1, 10)`)
dbA.Exec(t, "CREATE USER userA WITH PASSWORD '123'")
dbA.Exec(t, `GRANT REPLICATIONSOURCE, REPLICATIONDEST ON TABLE a.public."Foo" TO userA`)
dbAURL := replicationtestutils.GetExternalConnectionURI(
t, s, s, serverutils.ClientCerts(false), serverutils.UserPassword("userA", "123"), serverutils.DBName("a"),
)

dbB.Exec(t, "CREATE USER userB WITH PASSWORD '123'")
dbB.Exec(t, "GRANT CREATE ON DATABASE b TO userB")
dbBURL := replicationtestutils.GetExternalConnectionURI(
t, s, s, serverutils.ClientCerts(false), serverutils.UserPassword("userB", "123"), serverutils.DBName("b"),
)

dbBAsUser := sqlutils.MakeSQLRunner(s.SQLConn(
t,
serverutils.DBName("b"),
serverutils.ClientCerts(false),
serverutils.UserPassword("userB", "123"),
))

// Unidirectional into B.
dbBAsUser.Exec(t, `CREATE TABLE b.public."uFoo" (x INT PRIMARY KEY)`)
var uJobID jobspb.JobID
dbBAsUser.QueryRow(t, `CREATE LOGICAL REPLICATION STREAM FROM TABLE a.public."Foo" ON $1 INTO TABLE b.public."uFoo"`, dbAURL.String()).Scan(&uJobID)
WaitUntilReplicatedTime(t, s.Clock().Now(), dbB, uJobID)

// Bidirectional
var jobID jobspb.JobID
dbBAsUser.QueryRow(
t,
`CREATE LOGICALLY REPLICATED TABLE b.public."Foo" FROM TABLE a.public."Foo" ON $1 WITH BIDIRECTIONAL ON $2`,
dbAURL.String(),
dbBURL.String(),
).Scan(&jobID)
WaitUntilReplicatedTime(t, s.Clock().Now(), dbB, jobID)

reverseJobID := GetReverseJobID(ctx, t, dbA, jobID)
WaitUntilReplicatedTime(t, s.Clock().Now(), dbA, reverseJobID)
}