Skip to content

Commit 603f322

Browse files
wip: implement restore logic
Signed-off-by: Mayank Shah <mayank.shah@percona.com>
1 parent b1d371b commit 603f322

2 files changed

Lines changed: 299 additions & 17 deletions

File tree

pkg/controller/perconaservermongodbrestore/snapshots.go

Lines changed: 294 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,24 @@ import (
44
"bytes"
55
"context"
66
"encoding/json"
7+
"fmt"
78
"strings"
89

10+
volumesnapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1"
911
"github.com/percona/percona-backup-mongodb/pbm/defs"
1012
psmdbv1 "github.com/percona/percona-server-mongodb-operator/pkg/apis/psmdb/v1"
13+
"github.com/percona/percona-server-mongodb-operator/pkg/naming"
1114
"github.com/percona/percona-server-mongodb-operator/pkg/psmdb/backup"
15+
"github.com/percona/percona-server-mongodb-operator/pkg/psmdb/config"
1216
"github.com/pkg/errors"
17+
appsv1 "k8s.io/api/apps/v1"
1318
corev1 "k8s.io/api/core/v1"
19+
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
20+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1421
"k8s.io/apimachinery/pkg/types"
1522
"k8s.io/client-go/util/retry"
23+
"k8s.io/utils/ptr"
24+
"sigs.k8s.io/controller-runtime/pkg/client"
1625
logf "sigs.k8s.io/controller-runtime/pkg/log"
1726
)
1827

@@ -34,7 +43,7 @@ func (r *ReconcilePerconaServerMongoDBRestore) reconcileExternalSnapshotRestore(
3443
return r.reconcileSnapshotRequested(ctx, cr, cluster)
3544

3645
case psmdbv1.RestoreStateRunning:
37-
return r.reconcileSnapshotRunning(ctx, cr, cluster)
46+
return r.reconcileSnapshotRunning(ctx, cr, cluster, bcp)
3847
}
3948

4049
return cr.Status, nil
@@ -197,23 +206,291 @@ func (r *ReconcilePerconaServerMongoDBRestore) reconcileSnapshotRunning(
197206
ctx context.Context,
198207
restore *psmdbv1.PerconaServerMongoDBRestore,
199208
cluster *psmdbv1.PerconaServerMongoDB,
209+
bcp *psmdbv1.PerconaServerMongoDBBackup,
200210
) (psmdbv1.PerconaServerMongoDBRestoreStatus, error) {
211+
status := restore.Status
212+
log := logf.FromContext(ctx)
213+
if ok, err := r.scaleDownStatefulSetsForSnapshotRestore(ctx, cluster, restore); err != nil {
214+
return restore.Status, errors.Wrapf(err, "prepare statefulsets for snapshot restore")
215+
} else if !ok {
216+
log.Info("Waiting for statefulsets to be scaled down", "ready", ok)
217+
return status, nil
218+
}
219+
220+
if ok, err := r.reconcilePVCsForSnapshotRestore(ctx, cluster, restore, bcp); err != nil {
221+
return restore.Status, errors.Wrapf(err, "reconcile pvcs for snapshot restore")
222+
} else if !ok {
223+
log.Info("Waiting for pvcs to be reconciled", "ready", ok)
224+
return status, nil
225+
}
226+
227+
if ok, err := r.scaleUpStatefulSetsForSnapshotRestore(ctx, cluster); err != nil {
228+
return restore.Status, errors.Wrapf(err, "scale up statefulsets for snapshot restore")
229+
} else if !ok {
230+
log.Info("Waiting for statefulsets to be scaled up", "ready", ok)
231+
return status, nil
232+
}
233+
234+
// TODO: exec into any of the pods and run:
235+
// ```
236+
// /opt/percona/pbm restore-finish <restore_name> -c <pbm-config.yaml>
237+
// ```
238+
239+
// Delete all statefulsets.
240+
// Resync PBM storage.
201241

202-
// The exact steps that need to take place here:
203-
// 1. Scale down cfg and rs statefulsets and wait for it to be scaled down.
204-
// 2. Update the StatefulSet volumeClaimTemplates to use the snapshot as data source
205-
// 3. Update the StatefulSet pbm-agent arg as follows:
206-
// ```
207-
// pbm-agent restore-finish <restore_name> -c <pbm-config.yaml> --rs <rs_name> --node <node_name> --db-config <db-config.yaml>
208-
// ```
209-
// * rs: Specifed in MONGODB_REPLSET environment variable
210-
// * node_name: Specifed as $POD_NAME.$SERVICE_NAME-MONGODB_REPLSET.$NAMESPACE.svc.cluster.local
211-
// 4. Scale up cfg and rs statefulsets and wait
212-
// 5. Exec the following command to finish the restore:
213-
// ```
214-
// /opt/percona/pbm restore-finish <restore_name> -c <pbm-config.yaml>
215-
// ```
216-
// 6. Delete all statefulsets
217-
// 7. Add resync storage annotation
218242
return restore.Status, nil
219243
}
244+
245+
func (r *ReconcilePerconaServerMongoDBRestore) scaleDownStatefulSetsForSnapshotRestore(
246+
ctx context.Context,
247+
cluster *psmdbv1.PerconaServerMongoDB,
248+
restore *psmdbv1.PerconaServerMongoDBRestore,
249+
) (bool, error) {
250+
replsets := cluster.Spec.Replsets
251+
if cluster.Spec.Sharding.Enabled {
252+
replsets = append(replsets, cluster.Spec.Sharding.ConfigsvrReplSet)
253+
}
254+
255+
// Collect all statefulsets that need to be scaled down.
256+
statefulsets := []types.NamespacedName{}
257+
for _, rs := range replsets {
258+
statefulsets = append(statefulsets, types.NamespacedName{Namespace: cluster.Namespace, Name: naming.MongodStatefulSetName(cluster, rs)})
259+
if rs.NonVoting.Enabled {
260+
statefulsets = append(statefulsets, types.NamespacedName{Namespace: cluster.Namespace, Name: naming.NonVotingStatefulSetName(cluster, rs)})
261+
}
262+
if rs.Hidden.Enabled {
263+
statefulsets = append(statefulsets, types.NamespacedName{Namespace: cluster.Namespace, Name: naming.HiddenStatefulSetName(cluster, rs)})
264+
}
265+
}
266+
267+
done := true
268+
for _, nn := range statefulsets {
269+
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
270+
sfs := appsv1.StatefulSet{}
271+
if err := r.client.Get(ctx, nn, &sfs); err != nil {
272+
return err
273+
}
274+
275+
if sfs.Status.ReadyReplicas > 0 {
276+
done = false
277+
}
278+
279+
if sfs.Spec.Replicas != nil && *sfs.Spec.Replicas == 0 {
280+
return nil
281+
}
282+
283+
orig := sfs.DeepCopy()
284+
285+
// Scale down the statefulset.
286+
sfs.Spec.Replicas = ptr.To(int32(0))
287+
288+
// When the pods come up, they should start pbm with the following command:
289+
sfs.Spec.Template.Spec.Containers[0].Command = []string{"/opt/percona/pbm-agent"}
290+
sfs.Spec.Template.Spec.Containers[0].Args = []string{
291+
"restore-finish",
292+
restore.Status.PBMname,
293+
"-c", "/etc/pbm/pbm_config.yaml",
294+
"--rs", "$(MONGODB_REPLSET)",
295+
"--node", "$(POD_NAME).$(SERVICE_NAME)-$(MONGODB_REPLSET).$(NAMESPACE).svc.cluster.local",
296+
// "--db-config", "/etc/pbm/db-config.yaml", // TODO
297+
}
298+
return r.client.Patch(ctx, &sfs, client.MergeFrom(orig))
299+
}); err != nil {
300+
return false, errors.Wrapf(err, "prepare statefulset %s for snapshot restore", nn.Name)
301+
}
302+
}
303+
return done, nil
304+
}
305+
306+
func (r *ReconcilePerconaServerMongoDBRestore) reconcilePVCsForSnapshotRestore(
307+
ctx context.Context,
308+
cluster *psmdbv1.PerconaServerMongoDB,
309+
restore *psmdbv1.PerconaServerMongoDBRestore,
310+
backup *psmdbv1.PerconaServerMongoDBBackup,
311+
) (bool, error) {
312+
replsets := cluster.Spec.Replsets
313+
if cluster.Spec.Sharding.Enabled {
314+
replsets = append(replsets, cluster.Spec.Sharding.ConfigsvrReplSet)
315+
}
316+
317+
type pvcInfo struct {
318+
pvcName string
319+
snapshotName string
320+
volumeClaimTemplate corev1.PersistentVolumeClaimSpec
321+
}
322+
323+
getVolumeClaimTemplate := func(sfsName string) (corev1.PersistentVolumeClaimSpec, error) {
324+
sfs := appsv1.StatefulSet{}
325+
if err := r.client.Get(ctx, types.NamespacedName{Name: sfsName, Namespace: cluster.Namespace}, &sfs); err != nil {
326+
return corev1.PersistentVolumeClaimSpec{}, errors.Wrapf(err, "get statefulset %s", sfsName)
327+
}
328+
if len(sfs.Spec.VolumeClaimTemplates) == 0 {
329+
return corev1.PersistentVolumeClaimSpec{}, errors.Errorf("no volume claim templates found for statefulset %s", sfsName)
330+
}
331+
for _, vct := range sfs.Spec.VolumeClaimTemplates {
332+
if vct.Name == config.MongodDataVolClaimName {
333+
return vct.Spec, nil
334+
}
335+
}
336+
return corev1.PersistentVolumeClaimSpec{}, errors.Errorf("volume claim template %s not found", config.MongodDataVolClaimName)
337+
}
338+
339+
// Collect all PVCs that need to be reconciled.
340+
pvcs := make([]pvcInfo, 0)
341+
for _, rs := range replsets {
342+
snapshot := backup.Status.Snapshots.GetSnapshotInfo(rs.Name)
343+
if snapshot == nil {
344+
return false, fmt.Errorf("no snapshots found for replset %s", rs.Name)
345+
}
346+
vct, err := getVolumeClaimTemplate(naming.MongodStatefulSetName(cluster, rs))
347+
if err != nil {
348+
return false, errors.Wrapf(err, "get volume claim template for statefulset %s", naming.MongodStatefulSetName(cluster, rs))
349+
}
350+
for podIdx := int32(0); podIdx < rs.Size; podIdx++ {
351+
pvcs = append(pvcs, pvcInfo{
352+
pvcName: config.MongodDataVolClaimName + "-" + rs.PodName(cluster, int(podIdx)),
353+
volumeClaimTemplate: vct,
354+
snapshotName: snapshot.SnapshotName,
355+
})
356+
}
357+
358+
if rs.NonVoting.Enabled {
359+
vct, err := getVolumeClaimTemplate(naming.NonVotingStatefulSetName(cluster, rs))
360+
if err != nil {
361+
return false, errors.Wrapf(err, "get volume claim template for statefulset %s", naming.NonVotingStatefulSetName(cluster, rs))
362+
}
363+
pvcs = append(pvcs, pvcInfo{
364+
pvcName: config.MongodDataVolClaimName + "-" + naming.NonVotingStatefulSetName(cluster, rs) + "-0",
365+
volumeClaimTemplate: vct,
366+
snapshotName: snapshot.SnapshotName,
367+
})
368+
}
369+
if rs.Hidden.Enabled {
370+
vct, err := getVolumeClaimTemplate(naming.HiddenStatefulSetName(cluster, rs))
371+
if err != nil {
372+
return false, errors.Wrapf(err, "get volume claim template for statefulset %s", naming.HiddenStatefulSetName(cluster, rs))
373+
}
374+
pvcs = append(pvcs, pvcInfo{
375+
pvcName: config.MongodDataVolClaimName + "-" + naming.HiddenStatefulSetName(cluster, rs) + "-0",
376+
volumeClaimTemplate: vct,
377+
snapshotName: snapshot.SnapshotName,
378+
})
379+
}
380+
}
381+
382+
done := true
383+
for _, info := range pvcs {
384+
if ready, err := r.reconcilePVCForSnapshotRestore(ctx, info.pvcName, info.snapshotName,
385+
info.volumeClaimTemplate, restore); err != nil {
386+
return false, errors.Wrapf(err, "reconcile pvc %s for snapshot restore", info.pvcName)
387+
} else if !ready {
388+
done = false
389+
}
390+
}
391+
return done, nil
392+
}
393+
394+
func generatePVCFromSnapshot(
395+
pvc *corev1.PersistentVolumeClaim,
396+
spec corev1.PersistentVolumeClaimSpec,
397+
snapshotName string,
398+
) {
399+
pvc.Spec = spec
400+
pvc.Spec.DataSource = &corev1.TypedLocalObjectReference{
401+
APIGroup: ptr.To(volumesnapshotv1.SchemeGroupVersion.Group),
402+
Kind: "VolumeSnapshot",
403+
Name: snapshotName,
404+
}
405+
pvc.SetAnnotations(map[string]string{
406+
naming.AnnotationRestoreName: snapshotName,
407+
})
408+
}
409+
410+
func (r *ReconcilePerconaServerMongoDBRestore) reconcilePVCForSnapshotRestore(
411+
ctx context.Context,
412+
pvcName string,
413+
snapshotName string,
414+
volumeClaimTemplate corev1.PersistentVolumeClaimSpec,
415+
restore *psmdbv1.PerconaServerMongoDBRestore,
416+
) (bool, error) {
417+
observedPVC := &corev1.PersistentVolumeClaim{
418+
ObjectMeta: metav1.ObjectMeta{
419+
Name: pvcName,
420+
Namespace: restore.GetNamespace(),
421+
},
422+
}
423+
err := r.client.Get(ctx, client.ObjectKeyFromObject(observedPVC), observedPVC)
424+
if k8sErrors.IsNotFound(err) {
425+
generatePVCFromSnapshot(observedPVC, volumeClaimTemplate, snapshotName)
426+
if err := r.client.Create(ctx, observedPVC); err != nil {
427+
return false, errors.Wrapf(err, "create pvc %s", pvcName)
428+
}
429+
return true, nil
430+
} else if err != nil {
431+
return false, errors.Wrapf(err, "get observed pvc %s", pvcName)
432+
}
433+
434+
restoreName, ok := observedPVC.GetAnnotations()[naming.AnnotationRestoreName]
435+
if ok && restoreName == restore.Name {
436+
return true, nil
437+
}
438+
439+
if !observedPVC.GetDeletionTimestamp().IsZero() {
440+
return false, nil
441+
}
442+
443+
if err := r.client.Delete(ctx, observedPVC); err != nil {
444+
return false, errors.Wrapf(err, "delete pvc %s", pvcName)
445+
}
446+
return false, nil
447+
}
448+
449+
func (r *ReconcilePerconaServerMongoDBRestore) scaleUpStatefulSetsForSnapshotRestore(
450+
ctx context.Context,
451+
cluster *psmdbv1.PerconaServerMongoDB,
452+
) (bool, error) {
453+
replsets := cluster.Spec.Replsets
454+
if cluster.Spec.Sharding.Enabled {
455+
replsets = append(replsets, cluster.Spec.Sharding.ConfigsvrReplSet)
456+
}
457+
458+
// Collect all statefulsets that need to be scaled up.
459+
sfsInfos := make(map[types.NamespacedName]int32)
460+
for _, rs := range replsets {
461+
sfsInfos[types.NamespacedName{Namespace: cluster.Namespace, Name: naming.MongodStatefulSetName(cluster, rs)}] = rs.Size
462+
if rs.NonVoting.Enabled {
463+
sfsInfos[types.NamespacedName{Namespace: cluster.Namespace, Name: naming.NonVotingStatefulSetName(cluster, rs)}] = 1
464+
}
465+
if rs.Hidden.Enabled {
466+
sfsInfos[types.NamespacedName{Namespace: cluster.Namespace, Name: naming.HiddenStatefulSetName(cluster, rs)}] = 1
467+
}
468+
}
469+
470+
done := true
471+
for nn, replicas := range sfsInfos {
472+
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
473+
sfs := appsv1.StatefulSet{}
474+
if err := r.client.Get(ctx, nn, &sfs); err != nil {
475+
return err
476+
}
477+
478+
if sfs.Status.ReadyReplicas != replicas {
479+
done = false
480+
}
481+
482+
if sfs.Spec.Replicas != nil && *sfs.Spec.Replicas == replicas {
483+
return nil
484+
}
485+
486+
orig := sfs.DeepCopy()
487+
488+
// Scale down the statefulset.
489+
sfs.Spec.Replicas = ptr.To(replicas)
490+
return r.client.Patch(ctx, &sfs, client.MergeFrom(orig))
491+
}); err != nil {
492+
return false, errors.Wrapf(err, "prepare statefulset %s for snapshot restore", nn.Name)
493+
}
494+
}
495+
return done, nil
496+
}

pkg/naming/annotations.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package naming
2+
3+
const (
4+
AnnotationRestoreName = perconaPrefix + "restore-name"
5+
)

0 commit comments

Comments
 (0)